fix: nyc-taxi bench tools and limit max parallel compaction task number (#1275)

* limit mas parallel compaction subtask

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* correct type map

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-03-29 17:16:53 +08:00
committed by GitHub
parent d7a1435517
commit d7cadf6e6d
2 changed files with 19 additions and 9 deletions

View File

@@ -126,6 +126,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec<Column>, u32) {
for (array, field) in record_batch.columns().iter().zip(fields.iter()) {
let (values, datatype) = build_values(array);
let column = Column {
column_name: field.name().to_owned(),
values: Some(values),
@@ -182,10 +183,10 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) {
let values = array.values();
(
Values {
i64_values: values.to_vec(),
ts_microsecond_values: values.to_vec(),
..Default::default()
},
ColumnDataType::Int64,
ColumnDataType::TimestampMicrosecond,
)
}
DataType::Utf8 => {
@@ -252,13 +253,13 @@ fn create_table_expr() -> CreateTableExpr {
},
ColumnDef {
name: "tpep_pickup_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
datatype: ColumnDataType::TimestampMicrosecond as i32,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "tpep_dropoff_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
datatype: ColumnDataType::TimestampMicrosecond as i32,
is_nullable: true,
default_constraint: vec![],
},

View File

@@ -30,6 +30,8 @@ use crate::sst::{
};
use crate::wal::Wal;
const MAX_PARALLEL_COMPACTION: usize = 8;
#[async_trait::async_trait]
pub trait CompactionTask: Debug + Send + Sync + 'static {
async fn run(self) -> Result<()>;
@@ -80,11 +82,18 @@ impl<S: LogStore> CompactionTaskImpl<S> {
});
}
let outputs = futures::future::try_join_all(futs)
.await?
.into_iter()
.flatten()
.collect();
let mut outputs = HashSet::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION);
for _ in 0..MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
task_chunk.push(task);
}
}
let metas = futures::future::try_join_all(task_chunk).await?;
outputs.extend(metas.into_iter().flatten());
}
let inputs = compacted_inputs.into_iter().collect();
Ok((outputs, inputs))
}