mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
refactor: allow for eviction of layers in a batch
The auto-eviction PR (#3552) operates in two phaes: 1. find candidate layers 2. evict them. For (2), a batch API like the one added in this commit is useful. Note that this PR requires #3558 to be merged first. Otherwise, the tests won't pass.
This commit is contained in:
committed by
Christian Schwarz
parent
a6dffb6ef9
commit
7ed93fff06
@@ -867,30 +867,98 @@ impl Timeline {
|
||||
Ok(Some(true))
|
||||
}
|
||||
|
||||
/// Evicts one layer as in replaces a downloaded layer with a remote layer
|
||||
///
|
||||
/// Returns:
|
||||
/// - `Ok(Some(true))` when the layer was replaced
|
||||
/// - `Ok(Some(false))` when the layer was found, but no changes were made
|
||||
/// - evictee was not yet downloaded
|
||||
/// - layermap replacement failed
|
||||
/// - `Ok(None)` when the layer is not found
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
use super::layer_map::Replacement;
|
||||
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
if local_layer.is_remote_layer() {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
|
||||
// ensure the current layer is uploaded for sure
|
||||
self.remote_client
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?
|
||||
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let results = self
|
||||
.evict_layer_batch(remote_client, &[local_layer], cancel)
|
||||
.await?;
|
||||
assert_eq!(results.len(), 1);
|
||||
let result: Option<anyhow::Result<bool>> = results.into_iter().next().unwrap();
|
||||
match result {
|
||||
None => anyhow::bail!("task_mgr shutdown requested"),
|
||||
Some(Ok(b)) => Ok(Some(b)),
|
||||
Some(Err(e)) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to evict the given `layers_to_evict` by
|
||||
/// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object.
|
||||
/// 2. Deleting the now unreferenced layer file from disk.
|
||||
///
|
||||
/// The `remote_client` should be this timeline's `self.remote_client`.
|
||||
/// We make the caller provide it so that they are responsible for handling the case
|
||||
/// where someone wants to evict the layer but no remote storage is configured.
|
||||
///
|
||||
/// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`.
|
||||
/// If `Err()` is returned, no eviction was attempted.
|
||||
/// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`.
|
||||
/// Meaning of each `result[i]`:
|
||||
/// - `Some(Err(...))` if layer replacement failed for an unexpected reason
|
||||
/// - `Some(Ok(true))` if everything went well.
|
||||
/// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.:
|
||||
/// - evictee was not yet downloaded
|
||||
/// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks)
|
||||
/// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`.
|
||||
async fn evict_layer_batch(
|
||||
&self,
|
||||
remote_client: &Arc<RemoteTimelineClient>,
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
// ensure that the layers have finished uploading
|
||||
// (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
|
||||
// now lock out layer removal (compaction, gc, timeline deletion)
|
||||
let layer_removal_guard = self.layer_removal_cs.lock().await;
|
||||
|
||||
// start the batch update
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
|
||||
let mut results = Vec::with_capacity(layers_to_evict.len());
|
||||
|
||||
for l in layers_to_evict.iter() {
|
||||
let res = if cancel.is_cancelled() {
|
||||
None
|
||||
} else {
|
||||
Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates))
|
||||
};
|
||||
results.push(res);
|
||||
}
|
||||
|
||||
// commit the updates & release locks
|
||||
batch_updates.flush();
|
||||
drop(layer_map);
|
||||
drop(layer_removal_guard);
|
||||
|
||||
assert_eq!(results.len(), layers_to_evict.len());
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn evict_layer_batch_impl(
|
||||
&self,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
local_layer: &Arc<dyn PersistentLayer>,
|
||||
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
) -> anyhow::Result<bool> {
|
||||
use super::layer_map::Replacement;
|
||||
|
||||
if local_layer.is_remote_layer() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let layer_metadata = LayerFileMetadata::new(
|
||||
local_layer
|
||||
.file_size()
|
||||
@@ -917,11 +985,7 @@ impl Timeline {
|
||||
),
|
||||
});
|
||||
|
||||
let gc_lock = self.layer_removal_cs.lock().await;
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut updates = layers.batch_update();
|
||||
|
||||
let replaced = match updates.replace_historic(&local_layer, new_remote_layer)? {
|
||||
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
|
||||
Replacement::Replaced { .. } => {
|
||||
let layer_size = local_layer.file_size();
|
||||
|
||||
@@ -944,7 +1008,7 @@ impl Timeline {
|
||||
}
|
||||
Replacement::Unexpected(other) => {
|
||||
error!(
|
||||
local_layer.ptr=?Arc::as_ptr(&local_layer),
|
||||
local_layer.ptr=?Arc::as_ptr(local_layer),
|
||||
other.ptr=?Arc::as_ptr(&other),
|
||||
?other,
|
||||
"failed to replace");
|
||||
@@ -952,11 +1016,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
drop(gc_lock);
|
||||
|
||||
Ok(Some(replaced))
|
||||
Ok(replaced)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user