eviction: integration

- evictiontask: remove unused imports
- eviction_task and dube: cleanup
- timeline: pub(crate) eviction
- timeline: adjust to "layere: adjust eviction"
- test: remove layer_eviction_aba_fails because it can no longer happen
- test: fix up evicts later test with ability to await for eviction
- eviction_task: more unused imports
- eviction: clippy
- eviction_task: more clippy
- fixup eviction: docs
- eviction: hold only Arc<Layer> after checking downloadedness
- refactor earlier eviction: use drop_eviction_guard instead
- dube: evict in spawned tasks
- timeline, eviction: evict in spawned
- eviction: add more errors
- evict_layers: remove witness
- eviction: post-witness forgotten panic
- eviction: remove blog references
This commit is contained in:
Joonas Koivunen
2023-08-18 20:27:35 +03:00
parent c46f72d411
commit 2e686ed6ea
4 changed files with 226 additions and 321 deletions

View File

@@ -60,7 +60,12 @@ use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::PersistentLayer, timeline::EvictionError, Timeline},
tenant::{
self,
storage_layer::{AsLayerDesc, LayerE},
timeline::EvictionError,
Timeline,
},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -330,9 +335,10 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
let mut max_batch_size = 0;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
if !usage_planned.has_pressure() {
debug!(
@@ -349,10 +355,15 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
batched
.entry(TimelineKey(candidate.timeline))
.or_default()
.push(candidate.layer);
let batch = batched.entry(TimelineKey(candidate.timeline)).or_default();
if batch.len() < u32::MAX as usize {
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully.
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
}
}
let usage_planned = match warned {
@@ -369,64 +380,101 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// phase2: evict victims batched by timeline
// After the loop, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
let mut js = tokio::task::JoinSet::new();
// ratelimit to 1k files or any higher max batch size
let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size)));
for (timeline, batch) in batched {
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
let batch_size = batch.len();
let batch_size =
u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning");
// I dislike naming of `available_permits` but it means current total amount of permits
// because permits can be added
assert!(batch_size as usize <= limit.available_permits());
debug!(%timeline_id, "evicting batch for timeline");
async {
let results = timeline.evict_layers(storage, &batch, cancel.clone()).await;
let evict = {
let limit = limit.clone();
let cancel = cancel.clone();
async move {
let mut evicted_bytes = 0;
let mut evictions_failed = LayerCount::default();
match results {
Err(e) => {
warn!("failed to evict batch: {:#}", e);
}
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
usage_assumed.add_available_bytes(file_size);
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
}
Some(Err(EvictionError::FileNotFound)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_)
| e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(%layer, "failed to evict layer: {e}");
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
return;
let Ok(_permit) = limit.acquire_many_owned(batch_size).await else {
// assume semaphore closing means cancelled
return (evicted_bytes, evictions_failed);
};
let results = timeline.evict_layers(&batch, &cancel).await;
match results {
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
evicted_bytes += file_size;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
}
}
}
}
Err(e) => {
warn!("failed to evict batch: {:#}", e);
}
}
(evicted_bytes, evictions_failed)
}
}
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size))
.await;
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size));
if cancel.is_cancelled() {
js.spawn(evict);
// spwaning multiple thousands of these is essentially blocking, so give already spawned a
// chance of making progress
tokio::task::yield_now().await;
}
let join_all = async move {
// After the evictions, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
while let Some(res) = js.join_next().await {
match res {
Ok((evicted_bytes, failed)) => {
usage_assumed.add_available_bytes(evicted_bytes);
evictions_failed.file_sizes += failed.file_sizes;
evictions_failed.count += failed.count;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
}
(usage_assumed, evictions_failed)
};
let (usage_assumed, evictions_failed) = tokio::select! {
tuple = join_all => { tuple },
_ = cancel.cancelled() => {
// close the semaphore to stop any pending acquires
limit.close();
return Ok(IterationOutcome::Cancelled);
}
}
};
Ok(IterationOutcome::Finished(IterationOutcomeFinished {
before: usage_pre,
@@ -441,7 +489,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<dyn PersistentLayer>,
layer: Arc<LayerE>,
last_activity_ts: SystemTime,
}

View File

@@ -410,6 +410,12 @@ pub(crate) struct LayerE {
/// for it.
version: AtomicUsize,
have_remote_client: bool,
/// Allow subscribing to when the layer actually gets evicted.
///
/// This might never come unless eviction called periodically.
#[cfg(test)]
evicted: tokio::sync::Notify,
}
impl std::fmt::Display for LayerE {
@@ -418,6 +424,12 @@ impl std::fmt::Display for LayerE {
}
}
impl std::fmt::Debug for LayerE {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl AsLayerDesc for LayerE {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
@@ -491,6 +503,8 @@ impl LayerE {
wanted_evicted: AtomicBool::new(false),
inner: Default::default(),
version: AtomicUsize::new(0),
#[cfg(test)]
evicted: tokio::sync::Notify::default(),
}
}
@@ -522,6 +536,8 @@ impl LayerE {
wanted_evicted: AtomicBool::new(false),
inner: tokio::sync::Mutex::new(Some(ResidentOrWantedEvicted::Resident(inner))),
version: AtomicUsize::new(0),
#[cfg(test)]
evicted: tokio::sync::Notify::default(),
}
});
@@ -577,6 +593,15 @@ impl LayerE {
}
}
#[cfg(test)]
pub(crate) fn wait_evicted(&self) -> impl std::future::Future<Output = ()> + '_ {
// for this to be actually useful, we must be first able to check some status, otherwise
// we could wait here for next eviction.
//
// states => (resident wanted_evicted evicted|wanted_evicted evicted resident)* wanted_garbage_collected? dropped
self.evicted.notified()
}
/// Delete the layer file when the `self` gets dropped, also schedule a remote index upload
/// then perhaps.
pub(crate) fn garbage_collect(&self) {
@@ -957,7 +982,12 @@ impl LayerE {
}
});
match capture_mtime_and_delete.await {
let res = capture_mtime_and_delete.await;
#[cfg(test)]
this.evicted.notify_waiters();
match res {
Ok(Ok(local_layer_mtime)) => {
let duration =
std::time::SystemTime::now().duration_since(local_layer_mtime);
@@ -1004,6 +1034,7 @@ impl LayerE {
}
}
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,
NotFile,
@@ -1060,8 +1091,8 @@ impl std::fmt::Debug for ResidentLayer {
}
impl ResidentLayer {
pub(crate) fn local_path(&self) -> &std::path::Path {
&self.owner.path
pub(crate) fn drop_eviction_guard(self) -> Arc<LayerE> {
self.into()
}
}

View File

@@ -1043,14 +1043,13 @@ impl Timeline {
let Some(layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
let Some(remote_layer) = layer.downcast_remote_layer() else {
return Ok(Some(false));
};
if self.remote_client.is_none() {
return Ok(Some(false));
}
self.download_remote_layer(remote_layer).await?;
layer.guard_against_eviction(true).await?;
Ok(Some(true))
}
@@ -1060,6 +1059,11 @@ impl Timeline {
let Some(local_layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
let Ok(local_layer) = local_layer.guard_against_eviction(false).await else { return Ok(Some(false)); };
let local_layer: Arc<LayerE> = local_layer.into();
let remote_client = self
.remote_client
.as_ref()
@@ -1067,7 +1071,7 @@ impl Timeline {
let cancel = CancellationToken::new();
let results = self
.evict_layer_batch(remote_client, &[local_layer], cancel)
.evict_layer_batch(remote_client, &[local_layer], &cancel)
.await?;
assert_eq!(results.len(), 1);
let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
@@ -1079,31 +1083,22 @@ impl Timeline {
}
/// Evict a batch of layers.
///
/// GenericRemoteStorage reference is required as a (witness)[witness_article] for "remote storage is configured."
///
/// [witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
pub(crate) async fn evict_layers(
&self,
_: &GenericRemoteStorage,
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
layers_to_evict: &[Arc<LayerE>],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
let remote_client = self.remote_client.clone().expect(
"GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient",
);
let remote_client = self
.remote_client
.as_ref()
.context("timeline must have RemoteTimelineClient")?;
self.evict_layer_batch(&remote_client, layers_to_evict, cancel)
self.evict_layer_batch(remote_client, layers_to_evict, &cancel)
.await
}
/// Evict multiple layers at once, continuing through errors.
///
/// Try to evict the given `layers_to_evict` by
///
/// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object.
/// 2. Deleting the now unreferenced layer file from disk.
///
/// The `remote_client` should be this timeline's `self.remote_client`.
/// We make the caller provide it so that they are responsible for handling the case
/// where someone wants to evict the layer but no remote storage is configured.
@@ -1112,17 +1107,15 @@ impl Timeline {
/// If `Err()` is returned, no eviction was attempted.
/// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`.
/// Meaning of each `result[i]`:
/// - `Some(Err(...))` if layer replacement failed for an unexpected reason
/// - `Some(Ok(true))` if everything went well.
/// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.:
/// - evictee was not yet downloaded
/// - `Some(Err(...))` if layer replacement failed for some reason
/// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks)
/// - `Some(Ok(()))` if everything went well.
/// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`.
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
layers_to_evict: &[Arc<LayerE>],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
// ensure that the layers have finished uploading
// (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
@@ -1132,7 +1125,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
// now lock out layer removal (compaction, gc, timeline deletion)
let layer_removal_guard = self.layer_removal_cs.lock().await;
let _layer_removal_guard = self.layer_removal_cs.lock().await;
{
// to avoid racing with detach and delete_timeline
@@ -1144,145 +1137,49 @@ impl Timeline {
}
// start the batch update
let mut guard = self.layers.write().await;
let mut results = Vec::with_capacity(layers_to_evict.len());
for l in layers_to_evict.iter() {
let res = if cancel.is_cancelled() {
None
} else {
Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard))
};
results.push(res);
for _ in 0..layers_to_evict.len() {
results.push(None);
}
// commit the updates & release locks
drop_wlock(guard);
drop(layer_removal_guard);
let mut js = tokio::task::JoinSet::new();
for (i, l) in layers_to_evict.into_iter().enumerate() {
js.spawn({
let l = l.to_owned();
async move { (i, l.evict_and_wait().await) }
});
}
let join = async {
while let Some(next) = js.join_next().await {
match next {
Ok((i, res)) => results[i] = Some(res),
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
}
};
tokio::select! {
_ = cancel.cancelled() => {},
_ = join => {}
}
assert_eq!(results.len(), layers_to_evict.len());
Ok(results)
}
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<dyn PersistentLayer>,
layer_mgr: &mut LayerManager,
) -> Result<(), EvictionError> {
if local_layer.is_remote_layer() {
return Err(EvictionError::CannotEvictRemoteLayer);
}
let layer_file_size = local_layer.layer_desc().file_size;
let local_layer_mtime = local_layer
.local_path()
.expect("local layer should have a local path")
.metadata()
// when the eviction fails because we have already deleted the layer in compaction for
// example, a NotFound error bubbles up from here.
.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
EvictionError::FileNotFound
} else {
EvictionError::StatFailed(e)
}
})?
.modified()
.map_err(EvictionError::StatFailed)?;
let local_layer_residence_duration =
match SystemTime::now().duration_since(local_layer_mtime) {
Err(e) => {
warn!(layer = %local_layer, "layer mtime is in the future: {}", e);
None
}
Ok(delta) => Some(delta),
};
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
&layer_metadata,
local_layer
.access_stats()
.clone_for_residence_change(LayerResidenceStatus::Evicted),
),
});
assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
layer_mgr
.replace_and_verify(local_layer.clone(), new_remote_layer)
.map_err(EvictionError::LayerNotFound)?;
if let Err(e) = local_layer.delete_resident_layer_file() {
// this should never happen, because of layer_removal_cs usage and above stat
// access for mtime
error!("failed to remove layer file on evict after replacement: {e:#?}");
}
// Always decrement the physical size gauge, even if we failed to delete the file.
// Rationale: we already replaced the layer with a remote layer in the layer map,
// and any subsequent download_remote_layer will
// 1. overwrite the file on disk and
// 2. add the downloaded size to the resident size gauge.
//
// If there is no re-download, and we restart the pageserver, then load_layer_map
// will treat the file as a local layer again, count it towards resident size,
// and it'll be like the layer removal never happened.
// The bump in resident size is perhaps unexpected but overall a robust behavior.
self.metrics
.resident_physical_size_gauge
.sub(layer_file_size);
self.metrics.evictions.inc();
if let Some(delta) = local_layer_residence_duration {
self.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(delta);
info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
} else {
info!(layer=%local_layer, "evicted layer after unknown residence period");
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum EvictionError {
#[error("cannot evict a remote layer")]
CannotEvictRemoteLayer,
/// Most likely the to-be evicted layer has been deleted by compaction or gc which use the same
/// locks, so they got to execute before the eviction.
#[error("file backing the layer has been removed already")]
FileNotFound,
#[error("stat failed")]
StatFailed(#[source] std::io::Error),
/// In practice, this can be a number of things, but lets assume it means only this.
///
/// This case includes situations such as the Layer was evicted and redownloaded in between,
/// because the file existed before an replacement attempt was made but now the Layers are
/// different objects in memory.
#[error("layer was no longer part of LayerMap")]
LayerNotFound(#[source] anyhow::Error),
#[error("layer was already evicted")]
NotFound,
/// Evictions must always lose to downloads in races, and this time it happened.
#[error("layer was downloaded instead")]
Downloaded,
}
/// Number of times we will compute partition within a checkpoint distance.
@@ -2073,7 +1970,7 @@ impl Timeline {
}
}
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<LayerE>> {
let guard = self.layers.read().await;
for historic_layer in guard.layer_map().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
@@ -4465,15 +4362,15 @@ impl Timeline {
}
}
pub struct DiskUsageEvictionInfo {
pub(crate) struct DiskUsageEvictionInfo {
/// Timeline's largest layer (remote or resident)
pub max_layer_size: Option<u64>,
/// Timeline's resident layers
pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
}
pub struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<dyn PersistentLayer>,
pub(crate) struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<LayerE>,
pub last_activity_ts: SystemTime,
}
@@ -4483,8 +4380,14 @@ impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
// having to allocate a string to this is bad, but it will rarely be formatted
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
struct DisplayIsDebug<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
.field("layer", &self.layer)
.field("layer", &DisplayIsDebug(&self.layer))
.field("last_activity", &ts)
.finish()
}
@@ -4511,9 +4414,7 @@ impl Timeline {
let l = guard.get_from_desc(&l);
if l.is_remote_layer() {
continue;
}
let Ok(l) = l.guard_against_eviction(false).await else { continue; };
let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
@@ -4523,7 +4424,8 @@ impl Timeline {
});
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
layer: l,
// we explicitly don't want to keep this layer downloaded
layer: l.drop_eviction_guard(),
last_activity_ts,
});
}
@@ -4675,9 +4577,7 @@ mod tests {
use utils::{id::TimelineId, lsn::Lsn};
use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
use super::{EvictionError, Timeline};
use crate::tenant::{harness::TenantHarness, storage_layer::LayerE, Timeline};
#[tokio::test]
async fn two_layer_eviction_attempts_at_the_same_time() {
@@ -4711,22 +4611,25 @@ mod tests {
.expect("just configured this");
let layer = find_some_layer(&timeline).await;
let layer = layer.guard_against_eviction(false).await.unwrap();
let layer = layer.drop_eviction_guard();
let cancel = tokio_util::sync::CancellationToken::new();
let batch = [layer];
let first = {
let cancel = cancel.clone();
let cancel = cancel.child_token();
async {
let cancel = cancel;
timeline
.evict_layer_batch(&rc, &batch, cancel)
.evict_layer_batch(&rc, &batch, &cancel)
.await
.unwrap()
}
};
let second = async {
timeline
.evict_layer_batch(&rc, &batch, cancel)
.evict_layer_batch(&rc, &batch, &cancel)
.await
.unwrap()
};
@@ -4735,88 +4638,20 @@ mod tests {
let (first, second) = (only_one(first), only_one(second));
assert_eq!(batch[0].needs_download_blocking().unwrap(), None);
let layer: Arc<LayerE> = batch[0].clone();
let mut evicted = std::pin::pin!(layer.wait_evicted());
assert_eq!(futures::future::poll_immediate(&mut evicted).await, None);
// both seemingly succeed, but only one will actually evict
match (first, second) {
(Ok(()), Err(EvictionError::FileNotFound))
| (Err(EvictionError::FileNotFound), Ok(())) => {
// one of the evictions gets to do it,
// other one gets FileNotFound. all is good.
}
(Ok(()), Ok(())) => {}
other => unreachable!("unexpected {:?}", other),
}
}
#[tokio::test]
async fn layer_eviction_aba_fails() {
let harness = TenantHarness::create("layer_eviction_aba_fails").unwrap();
let remote_storage = {
// this is never used for anything, because of how the create_test_timeline works, but
// it is with us in spirit and a Some.
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind};
let path = harness.conf.workdir.join("localfs");
std::fs::create_dir_all(&path).unwrap();
let config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
storage: RemoteStorageKind::LocalFs(path),
};
GenericRemoteStorage::from_config(&config).unwrap()
};
let ctx = any_context();
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
let _e = tracing::info_span!("foobar", tenant_id = %tenant.tenant_id, timeline_id = %timeline.timeline_id).entered();
let rc = timeline.remote_client.clone().unwrap();
// TenantHarness allows uploads to happen given GenericRemoteStorage is configured
let layer = find_some_layer(&timeline).await;
let cancel = tokio_util::sync::CancellationToken::new();
let batch = [layer];
let first = {
let cancel = cancel.clone();
async {
timeline
.evict_layer_batch(&rc, &batch, cancel)
.await
.unwrap()
}
};
// lets imagine this is stuck somehow, still referencing the original `Arc<dyn PersistentLayer>`
let second = {
let cancel = cancel.clone();
async {
timeline
.evict_layer_batch(&rc, &batch, cancel)
.await
.unwrap()
}
};
// while it's stuck, we evict and end up redownloading it
only_one(first.await).expect("eviction succeeded");
let layer = find_some_layer(&timeline).await;
let layer = layer.downcast_remote_layer().unwrap();
timeline.download_remote_layer(layer).await.unwrap();
let res = only_one(second.await);
assert!(
matches!(res, Err(EvictionError::LayerNotFound(_))),
"{res:?}"
);
// no more specific asserting, outside of preconds this is the only valid replacement
// failure
// after eviction has been requested, we will eventually evict
evicted.await;
}
fn any_context() -> crate::context::RequestContext {
@@ -4833,7 +4668,7 @@ mod tests {
.expect("no cancellation")
}
async fn find_some_layer(timeline: &Timeline) -> Arc<dyn PersistentLayer> {
async fn find_some_layer(timeline: &Timeline) -> Arc<LayerE> {
let layers = timeline.layers.read().await;
let desc = layers
.layer_map()

View File

@@ -29,7 +29,6 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
},
@@ -194,15 +193,16 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let candidates: Vec<_> = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = guard.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() {
continue;
}
// funny: this is the best way to get local layers is to lock them into
// memory for the duration of eviction
let Ok(guard) = hist_layer.guard_against_eviction(false).await else { continue; };
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
@@ -233,7 +233,7 @@ impl Timeline {
}
};
if no_activity_for > p.threshold {
candidates.push(hist_layer)
candidates.push(guard.drop_eviction_guard())
}
}
candidates
@@ -252,7 +252,7 @@ impl Timeline {
};
let results = match self
.evict_layer_batch(remote_client, &candidates[..], cancel.clone())
.evict_layer_batch(remote_client, &candidates, cancel)
.await
{
Err(pre_err) => {
@@ -263,7 +263,8 @@ impl Timeline {
Ok(results) => results,
};
assert_eq!(results.len(), candidates.len());
for (l, result) in candidates.iter().zip(results) {
drop(candidates);
for result in results {
match result {
None => {
stats.skipped_for_shutdown += 1;
@@ -271,20 +272,10 @@ impl Timeline {
Some(Ok(())) => {
stats.evicted += 1;
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
stats.not_evictable += 1;
}
Some(Err(EvictionError::FileNotFound)) => {
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
// compaction/gc removed the file while we were waiting on layer_removal_cs
stats.not_evictable += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(layer = %l, "failed to evict layer: {e}");
stats.not_evictable += 1;
}
}
}
if stats.candidates == stats.not_evictable {