diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f01cd1cf8c..76906cfaf7 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -42,7 +42,6 @@ // reading these fields. We use the Debug impl for semi-structured logging, though. use std::{ - collections::HashMap, sync::Arc, time::{Duration, SystemTime}, }; @@ -125,7 +124,7 @@ pub fn launch_disk_usage_global_eviction_task( async fn disk_usage_eviction_task( state: &State, task_config: &DiskUsageEvictionTaskConfig, - _storage: &GenericRemoteStorage, + storage: &GenericRemoteStorage, tenants_dir: &Utf8Path, cancel: CancellationToken, ) { @@ -149,8 +148,14 @@ async fn disk_usage_eviction_task( let start = Instant::now(); async { - let res = - disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await; + let res = disk_usage_eviction_task_iteration( + state, + task_config, + storage, + tenants_dir, + &cancel, + ) + .await; match res { Ok(()) => {} @@ -181,12 +186,13 @@ pub trait Usage: Clone + Copy + std::fmt::Debug { async fn disk_usage_eviction_task_iteration( state: &State, task_config: &DiskUsageEvictionTaskConfig, + storage: &GenericRemoteStorage, tenants_dir: &Utf8Path, cancel: &CancellationToken, ) -> anyhow::Result<()> { let usage_pre = filesystem_level_usage::get(tenants_dir, task_config) .context("get filesystem-level disk usage before evictions")?; - let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await; + let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await; match res { Ok(outcome) => { debug!(?outcome, "disk_usage_eviction_iteration finished"); @@ -268,8 +274,9 @@ struct LayerCount { count: usize, } -pub async fn disk_usage_eviction_task_iteration_impl( +pub(crate) async fn disk_usage_eviction_task_iteration_impl( state: &State, + _storage: &GenericRemoteStorage, usage_pre: U, cancel: &CancellationToken, ) -> anyhow::Result> { @@ -321,16 +328,16 @@ pub async fn disk_usage_eviction_task_iteration_impl( // Walk through the list of candidates, until we have accumulated enough layers to get // us back under the pressure threshold. 'usage_planned' is updated so that it tracks // how much disk space would be used after evicting all the layers up to the current - // point in the list. The layers are collected in 'batched', grouped per timeline. + // point in the list. // // If we get far enough in the list that we start to evict layers that are below // the tenant's min-resident-size threshold, print a warning, and memorize the disk // usage at that point, in 'usage_planned_min_resident_size_respecting'. - let mut batched: HashMap<_, Vec<_>> = HashMap::new(); let mut warned = None; let mut usage_planned = usage_pre; - let mut max_batch_size = 0; - for (i, (partition, candidate)) in candidates.into_iter().enumerate() { + let mut evicted_amount = 0; + + for (i, (partition, candidate)) in candidates.iter().enumerate() { if !usage_planned.has_pressure() { debug!( no_candidates_evicted = i, @@ -339,25 +346,13 @@ pub async fn disk_usage_eviction_task_iteration_impl( break; } - if partition == MinResidentSizePartition::Below && warned.is_none() { + if partition == &MinResidentSizePartition::Below && warned.is_none() { warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"); warned = Some(usage_planned); } usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size); - - // FIXME: batching makes no sense anymore because of no layermap locking, should just spawn - // tasks to evict all seen layers until we have evicted enough - - let batch = batched.entry(TimelineKey(candidate.timeline)).or_default(); - - // semaphore will later be used to limit eviction concurrency, and we can express at - // most u32 number of permits. unlikely we would have u32::MAX layers to be evicted, - // but fail gracefully by not making batches larger. - if batch.len() < u32::MAX as usize { - batch.push(candidate.layer); - max_batch_size = max_batch_size.max(batch.len()); - } + evicted_amount += 1; } let usage_planned = match warned { @@ -372,100 +367,79 @@ pub async fn disk_usage_eviction_task_iteration_impl( }; debug!(?usage_planned, "usage planned"); - // phase2: evict victims batched by timeline + // phase2: evict layers let mut js = tokio::task::JoinSet::new(); + let limit = 1000; - // ratelimit to 1k files or any higher max batch size - let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size))); + let mut evicted = candidates.into_iter().take(evicted_amount).fuse(); + let mut consumed_all = false; - for (timeline, batch) in batched { - let tenant_shard_id = timeline.tenant_shard_id; - let timeline_id = timeline.timeline_id; - let batch_size = - u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning"); + // After the evictions, `usage_assumed` is the post-eviction usage, + // according to internal accounting. + let mut usage_assumed = usage_pre; + let mut evictions_failed = LayerCount::default(); - // I dislike naming of `available_permits` but it means current total amount of permits - // because permits can be added - assert!(batch_size as usize <= limit.available_permits()); + let evict_layers = async move { + loop { + let next = if js.len() >= limit || consumed_all { + js.join_next().await + } else if !js.is_empty() { + // opportunistically consume ready result, one per each new evicted + futures::future::FutureExt::now_or_never(js.join_next()).and_then(|x| x) + } else { + None + }; - debug!(%timeline_id, "evicting batch for timeline"); - - let evict = { - let limit = limit.clone(); - let cancel = cancel.clone(); - async move { - let mut evicted_bytes = 0; - let mut evictions_failed = LayerCount::default(); - - let Ok(_permit) = limit.acquire_many_owned(batch_size).await else { - // semaphore closing means cancelled - return (evicted_bytes, evictions_failed); - }; - - let results = timeline.evict_layers(&batch).await; - - match results { - Ok(results) => { - assert_eq!(results.len(), batch.len()); - for (result, layer) in results.into_iter().zip(batch.iter()) { - let file_size = layer.layer_desc().file_size; - match result { - Some(Ok(())) => { - evicted_bytes += file_size; - } - Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { - evictions_failed.file_sizes += file_size; - evictions_failed.count += 1; - } - None => { - assert!(cancel.is_cancelled()); - } - } - } + if let Some(next) = next { + match next { + Ok(Ok(file_size)) => { + usage_assumed.add_available_bytes(file_size); } - Err(e) => { - warn!("failed to evict batch: {:#}", e); + Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => { + evictions_failed.file_sizes += file_size; + evictions_failed.count += 1; } + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { /* already logged */ } + Err(je) => tracing::error!("unknown JoinError: {je:?}"), } - (evicted_bytes, evictions_failed) } - } - .instrument(tracing::info_span!("evict_batch", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, batch_size)); - js.spawn(evict); - - // spwaning multiple thousands of these is essentially blocking, so give already spawned a - // chance of making progress - tokio::task::yield_now().await; - } - - let join_all = async move { - // After the evictions, `usage_assumed` is the post-eviction usage, - // according to internal accounting. - let mut usage_assumed = usage_pre; - let mut evictions_failed = LayerCount::default(); - - while let Some(res) = js.join_next().await { - match res { - Ok((evicted_bytes, failed)) => { - usage_assumed.add_available_bytes(evicted_bytes); - evictions_failed.file_sizes += failed.file_sizes; - evictions_failed.count += failed.count; - } - Err(je) if je.is_cancelled() => unreachable!("not used"), - Err(je) if je.is_panic() => { /* already logged */ } - Err(je) => tracing::error!("unknown JoinError: {je:?}"), + if consumed_all && js.is_empty() { + break; } + + // calling again when consumed_all is fine as evicted is fused. + let Some((_partition, candidate)) = evicted.next() else { + consumed_all = true; + continue; + }; + + js.spawn(async move { + let rtc = candidate.timeline.remote_client.as_ref().expect( + "holding the witness, all timelines must have a remote timeline client", + ); + let file_size = candidate.layer.layer_desc().file_size; + candidate + .layer + .evict_and_wait(rtc) + .await + .map(|()| file_size) + .map_err(|e| (file_size, e)) + }); + + tokio::task::yield_now().await; } + (usage_assumed, evictions_failed) }; let (usage_assumed, evictions_failed) = tokio::select! { - tuple = join_all => { tuple }, + tuple = evict_layers => { tuple }, _ = cancel.cancelled() => { - // close the semaphore to stop any pending acquires - limit.close(); + // dropping joinset will abort all pending evict_and_waits and that is fine, our + // requests will still stand return Ok(IterationOutcome::Cancelled); } }; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9faacaef89..da7e8218a2 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1588,7 +1588,7 @@ async fn always_panic_handler( async fn disk_usage_eviction_run( mut r: Request, - _cancel: CancellationToken, + cancel: CancellationToken, ) -> Result, ApiError> { check_permission(&r, None)?; @@ -1625,48 +1625,26 @@ async fn disk_usage_eviction_run( freed_bytes: 0, }; - let (tx, rx) = tokio::sync::oneshot::channel(); - let state = get_state(&r); - if state.remote_storage.as_ref().is_none() { + let Some(storage) = state.remote_storage.as_ref() else { return Err(ApiError::InternalServerError(anyhow::anyhow!( "remote storage not configured, cannot run eviction iteration" ))); - } + }; let state = state.disk_usage_eviction_state.clone(); - let cancel = CancellationToken::new(); - let child_cancel = cancel.clone(); - let _g = cancel.drop_guard(); + let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( + &state, storage, usage, &cancel, + ) + .await; - crate::task_mgr::spawn( - crate::task_mgr::BACKGROUND_RUNTIME.handle(), - TaskKind::DiskUsageEviction, - None, - None, - "ondemand disk usage eviction", - false, - async move { - let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( - &state, - usage, - &child_cancel, - ) - .await; + info!(?res, "disk_usage_eviction_task_iteration_impl finished"); - info!(?res, "disk_usage_eviction_task_iteration_impl finished"); + let res = res.map_err(ApiError::InternalServerError)?; - let _ = tx.send(res); - Ok(()) - } - .in_current_span(), - ); - - let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?; - - json_response(StatusCode::OK, response) + json_response(StatusCode::OK, res) } async fn handler_404(_: Request) -> Result, ApiError> { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a2a31f395e..d0241eafd5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1099,8 +1099,9 @@ impl Timeline { Ok(Some(true)) } - /// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer. - /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. + /// Evict just one layer. + /// + /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { let _gate = self .gate @@ -1111,109 +1112,17 @@ impl Timeline { return Ok(None); }; - let Some(local_layer) = local_layer.keep_resident().await? else { - return Ok(Some(false)); - }; - - let local_layer: Layer = local_layer.into(); - - let remote_client = self + let rtc = self .remote_client .as_ref() .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?; - let results = self - .evict_layer_batch(remote_client, &[local_layer]) - .await?; - assert_eq!(results.len(), 1); - let result: Option> = results.into_iter().next().unwrap(); - match result { - None => anyhow::bail!("task_mgr shutdown requested"), - Some(Ok(())) => Ok(Some(true)), - Some(Err(e)) => Err(anyhow::Error::new(e)), + match local_layer.evict_and_wait(rtc).await { + Ok(()) => Ok(Some(true)), + Err(EvictionError::NotFound) => Ok(Some(false)), + Err(EvictionError::Downloaded) => Ok(Some(false)), } } - - /// Evict a batch of layers. - pub(crate) async fn evict_layers( - &self, - layers_to_evict: &[Layer], - ) -> anyhow::Result>>> { - let _gate = self - .gate - .enter() - .map_err(|_| anyhow::anyhow!("Shutting down"))?; - - let remote_client = self - .remote_client - .as_ref() - .context("timeline must have RemoteTimelineClient")?; - - self.evict_layer_batch(remote_client, layers_to_evict).await - } - - /// Evict multiple layers at once, continuing through errors. - /// - /// The `remote_client` should be this timeline's `self.remote_client`. - /// We make the caller provide it so that they are responsible for handling the case - /// where someone wants to evict the layer but no remote storage is configured. - /// - /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`. - /// If `Err()` is returned, no eviction was attempted. - /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`. - /// Meaning of each `result[i]`: - /// - `Some(Err(...))` if layer replacement failed for some reason - /// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks) - /// - `Some(Ok(()))` if everything went well. - /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`. - async fn evict_layer_batch( - &self, - remote_client: &Arc, - layers_to_evict: &[Layer], - ) -> anyhow::Result>>> { - { - // to avoid racing with detach and delete_timeline - let state = self.current_state(); - anyhow::ensure!( - state == TimelineState::Active, - "timeline is not active but {state:?}" - ); - } - - let mut results = Vec::with_capacity(layers_to_evict.len()); - for _ in 0..layers_to_evict.len() { - results.push(None); - } - - let mut js = tokio::task::JoinSet::new(); - - for (i, l) in layers_to_evict.iter().enumerate() { - js.spawn({ - let l = l.to_owned(); - let remote_client = remote_client.clone(); - async move { (i, l.evict_and_wait(&remote_client).await) } - }); - } - - let join = async { - while let Some(next) = js.join_next().await { - match next { - Ok((i, res)) => results[i] = Some(res), - Err(je) if je.is_cancelled() => unreachable!("not used"), - Err(je) if je.is_panic() => { /* already logged */ } - Err(je) => tracing::error!("unknown JoinError: {je:?}"), - } - } - }; - - tokio::select! { - _ = self.cancel.cancelled() => {}, - _ = join => {} - } - - assert_eq!(results.len(), layers_to_evict.len()); - Ok(results) - } } /// Number of times we will compute partition within a checkpoint distance. @@ -4586,7 +4495,7 @@ mod tests { .await .unwrap(); - let rc = timeline + let rtc = timeline .remote_client .clone() .expect("just configured this"); @@ -4599,16 +4508,12 @@ mod tests { .expect("should had been resident") .drop_eviction_guard(); - let batch = [layer]; - - let first = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() }; - let second = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() }; + let first = async { layer.evict_and_wait(&rtc).await }; + let second = async { layer.evict_and_wait(&rtc).await }; let (first, second) = tokio::join!(first, second); - let (first, second) = (only_one(first), only_one(second)); - - let res = batch[0].keep_resident().await; + let res = layer.keep_resident().await; assert!(matches!(res, Ok(None)), "{res:?}"); match (first, second) { @@ -4629,14 +4534,6 @@ mod tests { RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error) } - fn only_one(mut input: Vec>) -> T { - assert_eq!(1, input.len()); - input - .pop() - .expect("length just checked") - .expect("no cancellation") - } - async fn find_some_layer(timeline: &Timeline) -> Layer { let layers = timeline.layers.read().await; let desc = layers diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 020c5a9e9f..782e8f9e39 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -212,11 +212,21 @@ impl Timeline { // Gather layers for eviction. // NB: all the checks can be invalidated as soon as we release the layer map lock. // We don't want to hold the layer map lock during eviction. + // So, we just need to deal with this. - let candidates: Vec<_> = { + + let remote_client = match self.remote_client.as_ref() { + Some(c) => c, + None => { + error!("no remote storage configured, cannot evict layers"); + return ControlFlow::Continue(()); + } + }; + + let mut js = tokio::task::JoinSet::new(); + { let guard = self.layers.read().await; let layers = guard.layer_map(); - let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { let hist_layer = guard.get_from_desc(&hist_layer); @@ -262,54 +272,49 @@ impl Timeline { continue; } }; + let layer = guard.drop_eviction_guard(); if no_activity_for > p.threshold { - candidates.push(guard.drop_eviction_guard()) + let remote_client = remote_client.clone(); + // this could cause a lot of allocations in some cases + js.spawn(async move { layer.evict_and_wait(&remote_client).await }); + stats.candidates += 1; } } - candidates - }; - stats.candidates = candidates.len(); - - let remote_client = match self.remote_client.as_ref() { - None => { - error!( - num_candidates = candidates.len(), - "no remote storage configured, cannot evict layers" - ); - return ControlFlow::Continue(()); - } - Some(c) => c, }; - let results = match self.evict_layer_batch(remote_client, &candidates).await { - Err(pre_err) => { - stats.errors += candidates.len(); - error!("could not do any evictions: {pre_err:#}"); - return ControlFlow::Continue(()); + let join_all = async move { + while let Some(next) = js.join_next().await { + match next { + Ok(Ok(())) => stats.evicted += 1, + Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { + stats.not_evictable += 1; + } + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { + /* already logged */ + stats.errors += 1; + } + Err(je) => tracing::error!("unknown JoinError: {je:?}"), + } } - Ok(results) => results, + stats }; - assert_eq!(results.len(), candidates.len()); - for result in results { - match result { - None => { - stats.skipped_for_shutdown += 1; - } - Some(Ok(())) => { - stats.evicted += 1; - } - Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { - stats.not_evictable += 1; + + tokio::select! { + stats = join_all => { + if stats.candidates == stats.not_evictable { + debug!(stats=?stats, "eviction iteration complete"); + } else if stats.errors > 0 || stats.not_evictable > 0 { + warn!(stats=?stats, "eviction iteration complete"); + } else { + info!(stats=?stats, "eviction iteration complete"); } } + _ = cancel.cancelled() => { + // just drop the joinset to "abort" + } } - if stats.candidates == stats.not_evictable { - debug!(stats=?stats, "eviction iteration complete"); - } else if stats.errors > 0 || stats.not_evictable > 0 { - warn!(stats=?stats, "eviction iteration complete"); - } else { - info!(stats=?stats, "eviction iteration complete"); - } + ControlFlow::Continue(()) }