refactor check_checkpoint_distance to prepare for async Timeline::layers (#4476)

This is preliminary work for/from #4220 (async
`Layer::get_value_reconstruct_data`).

There, we want to switch `Timeline::layers` to be a
`tokio::sync::RwLock`.

That will require the `TimelineWriter` to become async.

That will require `freeze_inmem_layer` to become async.

So, inside check_checkpoint_distance, we will have
`freeze_inmem_layer().await`.

But current rustc isn't smart enough to understand that we
`drop(layers)` earlier, and hence, will complain about the `!Send`
`layers` being held across the `freeze_inmem_layer().await`-point.

This patch puts the guard into a scope, so rustc will shut up in the
next patch where we make the transition for `TimelineWriter`.

obsoletes https://github.com/neondatabase/neon/pull/4474
This commit is contained in:
Christian Schwarz
2023-06-12 17:10:52 +01:00
committed by Christian Schwarz
parent 2011cc05cd
commit 939593d0d3

View File

@@ -907,35 +907,37 @@ impl Timeline {
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
let open_layer_size = {
let layers = self.layers.read().unwrap();
let Some(open_layer) = layers.open_layer.as_ref() else {
return Ok(());
};
open_layer.size()?
};
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Wake up the layer flusher
self.flush_frozen_layers();
}
// Wake up the layer flusher
self.flush_frozen_layers();
}
Ok(())
}