Merge branch 'main' into cicd/debug-regress-tests-on-arm

This commit is contained in:
Andrey Taranik
2024-08-14 18:33:19 +03:00
2 changed files with 19 additions and 1 deletions

View File

@@ -4540,7 +4540,12 @@ impl Timeline {
new_images: &[ResidentLayer],
layers_to_remove: &[Layer],
) -> Result<(), CompactionError> {
let mut guard = self.layers.write().await;
let mut guard = tokio::select! {
guard = self.layers.write() => guard,
_ = self.cancel.cancelled() => {
return Err(CompactionError::ShuttingDown);
}
};
let mut duplicated_layers = HashSet::new();

View File

@@ -1048,11 +1048,22 @@ impl Timeline {
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut next_hole = 0; // index of next hole in holes vector
let mut keys = 0;
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.await
.map_err(CompactionError::Other)?
{
keys += 1;
if keys % 32_768 == 0 && self.cancel.is_cancelled() {
// avoid hitting the cancellation token on every key. in benches, we end up
// shuffling an order of million keys per layer, this means we'll check it
// around tens of times per layer.
return Err(CompactionError::ShuttingDown);
}
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -1157,6 +1168,8 @@ impl Timeline {
.await
.map_err(CompactionError::Other)?,
);
keys = 0;
}
writer