feat: tune constants

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-21 14:33:42 +08:00
parent 2af3951944
commit 930f70b052
7 changed files with 159 additions and 35 deletions

View File

@@ -23,7 +23,9 @@ use datafusion::error::Result as DatafusionResult;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_datasource::PartitionedFile;
@@ -94,35 +96,40 @@ impl DefaultParquetFileReaderFactory {
}
impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
// TODO(weny): Supports [`metadata_size_hint`].
// The upstream has a implementation supports [`metadata_size_hint`],
// however it coupled with Box<dyn ObjectStore>.
fn create_reader(
&self,
_partition_index: usize,
partitioned_file: PartitionedFile,
_metadata_size_hint: Option<usize>,
metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
let path = partitioned_file.path().to_string();
let object_store = self.object_store.clone();
Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
Ok(Box::new(LazyParquetFileReader::new(
object_store,
path,
metadata_size_hint,
)))
}
}
pub struct LazyParquetFileReader {
object_store: ObjectStore,
reader: Option<Compat<FuturesAsyncReader>>,
file_size: Option<u64>,
metadata_size_hint: Option<usize>,
path: String,
}
impl LazyParquetFileReader {
pub fn new(object_store: ObjectStore, path: String) -> Self {
pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option<usize>) -> Self {
LazyParquetFileReader {
object_store,
path,
reader: None,
file_size: None,
metadata_size_hint,
}
}
@@ -130,6 +137,7 @@ impl LazyParquetFileReader {
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
if self.reader.is_none() {
let meta = self.object_store.stat(&self.path).await?;
self.file_size = Some(meta.content_length());
let reader = self
.object_store
.reader(&self.path)
@@ -166,8 +174,19 @@ impl AsyncFileReader for LazyParquetFileReader {
self.maybe_initialize()
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
// Safety: Must initialized
self.reader.as_mut().unwrap().get_metadata(options).await
let metadata_opts = options.map(|o| o.metadata_options().clone());
let metadata_reader = ParquetMetaDataReader::new()
.with_metadata_options(metadata_opts)
.with_page_index_policy(PageIndexPolicy::from(
options.is_some_and(|o| o.page_index()),
))
.with_prefetch_hint(self.metadata_size_hint);
let metadata = metadata_reader
.load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap())
.await?;
Ok(Arc::new(metadata))
})
}
}

View File

@@ -288,6 +288,17 @@ pub(crate) struct FileCache {
pub(crate) type FileCacheRef = Arc<FileCache>;
impl FileCache {
/// Splits the configured total capacity between parquet and puffin caches
/// without exceeding the requested overall budget.
fn split_cache_capacities(total_capacity: u64, index_percent: u8) -> (u64, u64) {
let desired_puffin_capacity = total_capacity * u64::from(index_percent) / 100;
let min_cache_capacity = MIN_CACHE_CAPACITY.min(total_capacity / 2);
let puffin_capacity =
desired_puffin_capacity.clamp(min_cache_capacity, total_capacity - min_cache_capacity);
let parquet_capacity = total_capacity - puffin_capacity;
(parquet_capacity, puffin_capacity)
}
/// Creates a new file cache.
pub(crate) fn new(
local_store: ObjectStore,
@@ -302,14 +313,8 @@ impl FileCache {
.unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
let total_capacity = capacity.as_bytes();
// Convert percent to ratio and calculate capacity for each cache
let index_ratio = index_percent as f64 / 100.0;
let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
let parquet_capacity = total_capacity - puffin_capacity;
// Ensure both capacities are at least 512MB
let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
let (parquet_capacity, puffin_capacity) =
Self::split_cache_capacities(total_capacity, index_percent);
info!(
"Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
@@ -1064,6 +1069,28 @@ mod tests {
assert_eq!(data, bytes[3].as_ref());
}
#[test]
fn test_file_cache_capacity_respects_total_budget() {
let total_capacity = ReadableSize::mb(256).as_bytes();
let (parquet_capacity, puffin_capacity) =
FileCache::split_cache_capacities(total_capacity, 20);
assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
assert_eq!(ReadableSize::mb(128).as_bytes(), parquet_capacity);
assert_eq!(ReadableSize::mb(128).as_bytes(), puffin_capacity);
}
#[test]
fn test_file_cache_capacity_keeps_split_when_total_allows_it() {
let total_capacity = ReadableSize::gb(5).as_bytes();
let (parquet_capacity, puffin_capacity) =
FileCache::split_cache_capacities(total_capacity, 20);
assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
assert_eq!(ReadableSize::gb(4).as_bytes(), parquet_capacity);
assert_eq!(ReadableSize::gb(1).as_bytes(), puffin_capacity);
}
#[test]
fn test_cache_file_path() {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();

View File

@@ -137,7 +137,7 @@ struct CollectedParts {
/// All parts in a bulk memtable.
#[derive(Default)]
struct BulkParts {
/// Unordered small parts (< 1024 rows).
/// Unordered small parts.
unordered_part: UnorderedPart,
/// All parts (raw and encoded).
parts: Vec<BulkPartWrapper>,

View File

@@ -50,6 +50,7 @@ use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource
use crate::metrics::{
PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED,
};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
@@ -821,7 +822,11 @@ impl DataPart {
/// Reads frozen data part and yields [DataBatch]es.
pub fn read(&self) -> Result<DataPartReader> {
match self {
DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None),
// Keep encoded memtable scans aligned with mito/DataFusion batch sizing instead of
// parquet-rs's implicit 1024-row default.
DataPart::Parquet(data_bytes) => {
DataPartReader::new(data_bytes.data.clone(), Some(DEFAULT_READ_BATCH_SIZE))
}
}
}

View File

@@ -39,9 +39,17 @@ pub mod writer;
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
///
/// This is a runtime-only scan granularity, so we align it with DataFusion's
/// default execution batch size to reduce rebatching and concatenation in the
/// query pipeline.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024;
/// Default row group size for parquet files.
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
///
/// Keep the existing persisted/on-disk default stable. It intentionally stays
/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan
/// batching without changing the row group layout of newly written SSTs.
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024;
/// Parquet write options.
#[derive(Debug, Clone)]

View File

@@ -49,9 +49,6 @@ use snafu::ResultExt;
use crate::error::DeserializeSnafu;
use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
/// Maximum number of rows per output batch
const ABSENT_BATCH_SIZE: usize = 8192;
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Absent {
start: Millisecond,
@@ -390,11 +387,13 @@ impl ExecutionPlan for AbsentExec {
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let batch_size = context.session_config().batch_size();
let input = self.input.execute(partition, context)?;
Ok(Box::pin(AbsentStream {
end: self.end,
step: self.step,
batch_size,
time_index_column_index: self
.input
.schema()
@@ -441,6 +440,7 @@ impl DisplayAs for AbsentExec {
pub struct AbsentStream {
end: Millisecond,
step: Millisecond,
batch_size: usize,
time_index_column_index: usize,
output_schema: SchemaRef,
fake_labels: Vec<(String, String)>,
@@ -474,7 +474,7 @@ impl Stream for AbsentStream {
self.metric.elapsed_compute().add_elapsed(timer);
// If we have enough data for a batch, output it
if self.output_timestamps.len() >= ABSENT_BATCH_SIZE {
if self.output_timestamps.len() >= self.batch_size {
let timer = std::time::Instant::now();
let result = self.flush_output_batch();
self.metric.elapsed_compute().add_elapsed(timer);

View File

@@ -836,6 +836,7 @@ impl ExecutionPlan for RangeSelectExec {
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let batch_size = context.session_config().batch_size();
let input = self.input.execute(partition, context)?;
let schema = input.schema();
let time_index = schema
@@ -852,6 +853,7 @@ impl ExecutionPlan for RangeSelectExec {
.collect(),
)?;
Ok(Box::pin(RangeSelectStream {
batch_size,
schema: self.schema.clone(),
range_exec: self.range_exec.clone(),
input,
@@ -868,6 +870,8 @@ impl ExecutionPlan for RangeSelectExec {
metric: baseline_metric,
schema_project: self.schema_project.clone(),
schema_before_project: self.schema_before_project.clone(),
output_batch: None,
output_batch_offset: 0,
}))
}
@@ -881,6 +885,7 @@ impl ExecutionPlan for RangeSelectExec {
}
struct RangeSelectStream {
batch_size: usize,
/// the schema of output column
schema: SchemaRef,
range_exec: Vec<RangeFnExec>,
@@ -907,6 +912,8 @@ struct RangeSelectStream {
metric: BaselineMetrics,
schema_project: Option<Vec<usize>>,
schema_before_project: SchemaRef,
output_batch: Option<RecordBatch>,
output_batch_offset: usize,
}
#[derive(Debug)]
@@ -1149,6 +1156,35 @@ impl RangeSelectStream {
};
Ok(project_output)
}
fn next_output_batch(&mut self) -> DfResult<Option<RecordBatch>> {
if self.output_batch.is_none() {
self.output_batch = Some(self.generate_output()?);
self.output_batch_offset = 0;
}
let num_rows = self.output_batch.as_ref().unwrap().num_rows();
if num_rows == 0 {
self.output_batch_offset = 0;
return Ok(self.output_batch.take());
}
if self.output_batch_offset == 0 && num_rows <= self.batch_size {
return Ok(self.output_batch.take());
}
let offset = self.output_batch_offset;
let len = (num_rows - offset).min(self.batch_size);
let batch = self.output_batch.as_ref().unwrap().slice(offset, len);
self.output_batch_offset += len;
if self.output_batch_offset >= num_rows {
self.output_batch = None;
self.output_batch_offset = 0;
}
Ok(Some(batch))
}
}
enum ExecutionState {
@@ -1191,13 +1227,16 @@ impl Stream for RangeSelectStream {
}
}
ExecutionState::ProducingOutput => {
let result = self.generate_output();
let result = self.next_output_batch();
return match result {
// made output
Ok(batch) => {
self.exec_state = ExecutionState::Done;
Ok(Some(batch)) => {
if self.output_batch.is_none() {
self.exec_state = ExecutionState::Done;
}
Poll::Ready(Some(Ok(batch)))
}
Ok(None) => Poll::Ready(None),
// error making output
Err(error) => Poll::Ready(Some(Err(error))),
};
@@ -1313,15 +1352,15 @@ mod test {
))
}
async fn do_range_select_test(
async fn collect_range_select_test(
range1: Millisecond,
range2: Millisecond,
align: Millisecond,
fill: Option<Fill>,
is_float: bool,
is_gap: bool,
expected: String,
) {
batch_size: usize,
) -> Vec<RecordBatch> {
let data_type = if is_float {
DataType::Float64
} else {
@@ -1412,11 +1451,25 @@ mod test {
.into(),
range_select_exec,
);
let session_context = SessionContext::default();
let session_context = SessionContext::new_with_config(
datafusion::execution::config::SessionConfig::new().with_batch_size(batch_size),
);
datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
.await
.unwrap()
}
async fn do_range_select_test(
range1: Millisecond,
range2: Millisecond,
align: Millisecond,
fill: Option<Fill>,
is_float: bool,
is_gap: bool,
expected: String,
) {
let result =
datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
.await
.unwrap();
collect_range_select_test(range1, range2, align, fill, is_float, is_gap, 8192).await;
let result_literal = arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
@@ -1700,6 +1753,18 @@ mod test {
.await;
}
#[tokio::test]
async fn range_select_respects_session_batch_size() {
let result =
collect_range_select_test(10_000, 5_000, 5_000, Some(Fill::Null), true, false, 3).await;
let row_counts = result
.iter()
.map(|batch| batch.num_rows())
.collect::<Vec<_>>();
assert_eq!(vec![3, 3, 3, 3], row_counts);
}
#[test]
fn fill_test() {
assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());