diff --git a/src/sst-convert/Cargo.toml b/src/sst-convert/Cargo.toml index 7a6db63d28..771c3ae27b 100644 --- a/src/sst-convert/Cargo.toml +++ b/src/sst-convert/Cargo.toml @@ -14,9 +14,9 @@ common-macro.workspace = true common-meta.workspace = true common-recordbatch.workspace = true datatypes.workspace = true +futures.workspace = true futures-util.workspace = true meta-client.workspace = true -futures.workspace = true metric-engine.workspace = true mito2.workspace = true object-store.workspace = true diff --git a/src/sst-convert/src/reader/parquet.rs b/src/sst-convert/src/reader/parquet.rs index 8c847bc30d..748c85d810 100644 --- a/src/sst-convert/src/reader/parquet.rs +++ b/src/sst-convert/src/reader/parquet.rs @@ -14,7 +14,7 @@ //! Parquet file format support. -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; @@ -212,10 +212,12 @@ pub fn extract_to_batches( builder.push_row(&row).map_err(BoxedError::new)?; } - let mut batches = Vec::new(); + // sort batches by primary key + let mut batches = BTreeMap::new(); for (pk, builder) in pk_to_batchs { - batches.push(builder.finish(pk).map_err(BoxedError::new)?); + batches.insert(pk.clone(), builder.finish(pk).map_err(BoxedError::new)?); } + let batches = batches.into_values().collect(); Ok(batches) }