From d7cadf6e6d321bcd302aea9df4bc904c7f03f042 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 29 Mar 2023 17:16:53 +0800 Subject: [PATCH] fix: nyc-taxi bench tools and limit max parallel compaction task number (#1275) * limit mas parallel compaction subtask Signed-off-by: Ruihang Xia * correct type map Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- benchmarks/src/bin/nyc-taxi.rs | 9 +++++---- src/storage/src/compaction/task.rs | 19 ++++++++++++++----- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index adf8fb6fb4..1de5bf562b 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -126,6 +126,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { for (array, field) in record_batch.columns().iter().zip(fields.iter()) { let (values, datatype) = build_values(array); + let column = Column { column_name: field.name().to_owned(), values: Some(values), @@ -182,10 +183,10 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) { let values = array.values(); ( Values { - i64_values: values.to_vec(), + ts_microsecond_values: values.to_vec(), ..Default::default() }, - ColumnDataType::Int64, + ColumnDataType::TimestampMicrosecond, ) } DataType::Utf8 => { @@ -252,13 +253,13 @@ fn create_table_expr() -> CreateTableExpr { }, ColumnDef { name: "tpep_pickup_datetime".to_string(), - datatype: ColumnDataType::Int64 as i32, + datatype: ColumnDataType::TimestampMicrosecond as i32, is_nullable: true, default_constraint: vec![], }, ColumnDef { name: "tpep_dropoff_datetime".to_string(), - datatype: ColumnDataType::Int64 as i32, + datatype: ColumnDataType::TimestampMicrosecond as i32, is_nullable: true, default_constraint: vec![], }, diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index d5a95c6691..b9be241a25 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -30,6 +30,8 @@ use crate::sst::{ }; use crate::wal::Wal; +const MAX_PARALLEL_COMPACTION: usize = 8; + #[async_trait::async_trait] pub trait CompactionTask: Debug + Send + Sync + 'static { async fn run(self) -> Result<()>; @@ -80,11 +82,18 @@ impl CompactionTaskImpl { }); } - let outputs = futures::future::try_join_all(futs) - .await? - .into_iter() - .flatten() - .collect(); + let mut outputs = HashSet::with_capacity(futs.len()); + while !futs.is_empty() { + let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); + for _ in 0..MAX_PARALLEL_COMPACTION { + if let Some(task) = futs.pop() { + task_chunk.push(task); + } + } + let metas = futures::future::try_join_all(task_chunk).await?; + outputs.extend(metas.into_iter().flatten()); + } + let inputs = compacted_inputs.into_iter().collect(); Ok((outputs, inputs)) }