diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 316347f229..92c8a3bc36 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -398,7 +398,9 @@ impl AccessLayer { .await? } Either::Right(flat_source) => { - writer.write_all_flat(flat_source, write_opts).await? + writer + .write_all_flat(flat_source, request.max_sequence, write_opts) + .await? } } }; diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index ddd9a6c8ca..e811d7f2f5 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -250,7 +250,11 @@ impl WriteCache { .write_all(source, write_request.max_sequence, write_opts) .await? } - either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?, + either::Right(flat_source) => { + writer + .write_all_flat(flat_source, write_request.max_sequence, write_opts) + .await? + } }; // Upload sst file to remote object store. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 4d09d84c73..b7ac340073 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -1182,7 +1182,7 @@ mod tests { .await; writer - .write_all_flat(flat_source, write_opts) + .write_all_flat(flat_source, None, write_opts) .await .unwrap() .remove(0) @@ -1293,7 +1293,7 @@ mod tests { .await; let info = writer - .write_all_flat(flat_source, &write_opts) + .write_all_flat(flat_source, None, &write_opts) .await .unwrap() .remove(0); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 568b54415c..b207f11ef8 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -321,9 +321,12 @@ where pub async fn write_all_flat( &mut self, source: FlatSource, + override_sequence: Option, opts: &WriteOptions, ) -> Result { - let res = self.write_all_flat_without_cleaning(source, opts).await; + let res = self + .write_all_flat_without_cleaning(source, override_sequence, opts) + .await; if res.is_err() { // Clean tmp files explicitly on failure. let file_id = self.current_file; @@ -337,6 +340,7 @@ where async fn write_all_flat_without_cleaning( &mut self, mut source: FlatSource, + override_sequence: Option, opts: &WriteOptions, ) -> Result { let mut results = smallvec![]; @@ -344,7 +348,7 @@ where self.metadata.clone(), &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), ) - .with_override_sequence(None); + .with_override_sequence(override_sequence); let mut stats = SourceStats::default(); while let Some(record_batch) = self