mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 00:40:40 +00:00
fix: Allow overriding sequence when writing flat SSTs (#7764)
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -398,7 +398,9 @@ impl AccessLayer {
|
||||
.await?
|
||||
}
|
||||
Either::Right(flat_source) => {
|
||||
writer.write_all_flat(flat_source, write_opts).await?
|
||||
writer
|
||||
.write_all_flat(flat_source, request.max_sequence, write_opts)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
6
src/mito2/src/cache/write_cache.rs
vendored
6
src/mito2/src/cache/write_cache.rs
vendored
@@ -250,7 +250,11 @@ impl WriteCache {
|
||||
.write_all(source, write_request.max_sequence, write_opts)
|
||||
.await?
|
||||
}
|
||||
either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
|
||||
either::Right(flat_source) => {
|
||||
writer
|
||||
.write_all_flat(flat_source, write_request.max_sequence, write_opts)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
// Upload sst file to remote object store.
|
||||
|
||||
@@ -1182,7 +1182,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
writer
|
||||
.write_all_flat(flat_source, write_opts)
|
||||
.write_all_flat(flat_source, None, write_opts)
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0)
|
||||
@@ -1293,7 +1293,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let info = writer
|
||||
.write_all_flat(flat_source, &write_opts)
|
||||
.write_all_flat(flat_source, None, &write_opts)
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0);
|
||||
|
||||
@@ -321,9 +321,12 @@ where
|
||||
pub async fn write_all_flat(
|
||||
&mut self,
|
||||
source: FlatSource,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
let res = self.write_all_flat_without_cleaning(source, opts).await;
|
||||
let res = self
|
||||
.write_all_flat_without_cleaning(source, override_sequence, opts)
|
||||
.await;
|
||||
if res.is_err() {
|
||||
// Clean tmp files explicitly on failure.
|
||||
let file_id = self.current_file;
|
||||
@@ -337,6 +340,7 @@ where
|
||||
async fn write_all_flat_without_cleaning(
|
||||
&mut self,
|
||||
mut source: FlatSource,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
let mut results = smallvec![];
|
||||
@@ -344,7 +348,7 @@ where
|
||||
self.metadata.clone(),
|
||||
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
|
||||
)
|
||||
.with_override_sequence(None);
|
||||
.with_override_sequence(override_sequence);
|
||||
let mut stats = SourceStats::default();
|
||||
|
||||
while let Some(record_batch) = self
|
||||
|
||||
Reference in New Issue
Block a user