refactor: remove eviction batching (#6060)

We no longer have `layer_removal_cs` since #5108, we no longer need
batching.
This commit is contained in:
Joonas Koivunen
2023-12-13 18:05:33 +02:00
committed by GitHub
parent 2d22661061
commit a919b863d1
4 changed files with 141 additions and 287 deletions

View File

@@ -42,7 +42,6 @@
// reading these fields. We use the Debug impl for semi-structured logging, though.
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};
@@ -125,7 +124,7 @@ pub fn launch_disk_usage_global_eviction_task(
async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
_storage: &GenericRemoteStorage,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
cancel: CancellationToken,
) {
@@ -149,8 +148,14 @@ async fn disk_usage_eviction_task(
let start = Instant::now();
async {
let res =
disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await;
let res = disk_usage_eviction_task_iteration(
state,
task_config,
storage,
tenants_dir,
&cancel,
)
.await;
match res {
Ok(()) => {}
@@ -181,12 +186,13 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await;
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
@@ -268,8 +274,9 @@ struct LayerCount {
count: usize,
}
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
_storage: &GenericRemoteStorage,
usage_pre: U,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
@@ -321,16 +328,16 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// Walk through the list of candidates, until we have accumulated enough layers to get
// us back under the pressure threshold. 'usage_planned' is updated so that it tracks
// how much disk space would be used after evicting all the layers up to the current
// point in the list. The layers are collected in 'batched', grouped per timeline.
// point in the list.
//
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
let mut max_batch_size = 0;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
let mut evicted_amount = 0;
for (i, (partition, candidate)) in candidates.iter().enumerate() {
if !usage_planned.has_pressure() {
debug!(
no_candidates_evicted = i,
@@ -339,25 +346,13 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
break;
}
if partition == MinResidentSizePartition::Below && warned.is_none() {
if partition == &MinResidentSizePartition::Below && warned.is_none() {
warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy");
warned = Some(usage_planned);
}
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
// FIXME: batching makes no sense anymore because of no layermap locking, should just spawn
// tasks to evict all seen layers until we have evicted enough
let batch = batched.entry(TimelineKey(candidate.timeline)).or_default();
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
}
evicted_amount += 1;
}
let usage_planned = match warned {
@@ -372,100 +367,79 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
};
debug!(?usage_planned, "usage planned");
// phase2: evict victims batched by timeline
// phase2: evict layers
let mut js = tokio::task::JoinSet::new();
let limit = 1000;
// ratelimit to 1k files or any higher max batch size
let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size)));
let mut evicted = candidates.into_iter().take(evicted_amount).fuse();
let mut consumed_all = false;
for (timeline, batch) in batched {
let tenant_shard_id = timeline.tenant_shard_id;
let timeline_id = timeline.timeline_id;
let batch_size =
u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning");
// After the evictions, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
// I dislike naming of `available_permits` but it means current total amount of permits
// because permits can be added
assert!(batch_size as usize <= limit.available_permits());
let evict_layers = async move {
loop {
let next = if js.len() >= limit || consumed_all {
js.join_next().await
} else if !js.is_empty() {
// opportunistically consume ready result, one per each new evicted
futures::future::FutureExt::now_or_never(js.join_next()).and_then(|x| x)
} else {
None
};
debug!(%timeline_id, "evicting batch for timeline");
let evict = {
let limit = limit.clone();
let cancel = cancel.clone();
async move {
let mut evicted_bytes = 0;
let mut evictions_failed = LayerCount::default();
let Ok(_permit) = limit.acquire_many_owned(batch_size).await else {
// semaphore closing means cancelled
return (evicted_bytes, evictions_failed);
};
let results = timeline.evict_layers(&batch).await;
match results {
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
evicted_bytes += file_size;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
}
}
}
if let Some(next) = next {
match next {
Ok(Ok(file_size)) => {
usage_assumed.add_available_bytes(file_size);
}
Err(e) => {
warn!("failed to evict batch: {:#}", e);
Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
(evicted_bytes, evictions_failed)
}
}
.instrument(tracing::info_span!("evict_batch", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, batch_size));
js.spawn(evict);
// spwaning multiple thousands of these is essentially blocking, so give already spawned a
// chance of making progress
tokio::task::yield_now().await;
}
let join_all = async move {
// After the evictions, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
while let Some(res) = js.join_next().await {
match res {
Ok((evicted_bytes, failed)) => {
usage_assumed.add_available_bytes(evicted_bytes);
evictions_failed.file_sizes += failed.file_sizes;
evictions_failed.count += failed.count;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
if consumed_all && js.is_empty() {
break;
}
// calling again when consumed_all is fine as evicted is fused.
let Some((_partition, candidate)) = evicted.next() else {
consumed_all = true;
continue;
};
js.spawn(async move {
let rtc = candidate.timeline.remote_client.as_ref().expect(
"holding the witness, all timelines must have a remote timeline client",
);
let file_size = candidate.layer.layer_desc().file_size;
candidate
.layer
.evict_and_wait(rtc)
.await
.map(|()| file_size)
.map_err(|e| (file_size, e))
});
tokio::task::yield_now().await;
}
(usage_assumed, evictions_failed)
};
let (usage_assumed, evictions_failed) = tokio::select! {
tuple = join_all => { tuple },
tuple = evict_layers => { tuple },
_ = cancel.cancelled() => {
// close the semaphore to stop any pending acquires
limit.close();
// dropping joinset will abort all pending evict_and_waits and that is fine, our
// requests will still stand
return Ok(IterationOutcome::Cancelled);
}
};

View File

@@ -1588,7 +1588,7 @@ async fn always_panic_handler(
async fn disk_usage_eviction_run(
mut r: Request<Body>,
_cancel: CancellationToken,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
@@ -1625,48 +1625,26 @@ async fn disk_usage_eviction_run(
freed_bytes: 0,
};
let (tx, rx) = tokio::sync::oneshot::channel();
let state = get_state(&r);
if state.remote_storage.as_ref().is_none() {
let Some(storage) = state.remote_storage.as_ref() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run eviction iteration"
)));
}
};
let state = state.disk_usage_eviction_state.clone();
let cancel = CancellationToken::new();
let child_cancel = cancel.clone();
let _g = cancel.drop_guard();
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state, storage, usage, &cancel,
)
.await;
crate::task_mgr::spawn(
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,
"ondemand disk usage eviction",
false,
async move {
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
usage,
&child_cancel,
)
.await;
info!(?res, "disk_usage_eviction_task_iteration_impl finished");
info!(?res, "disk_usage_eviction_task_iteration_impl finished");
let res = res.map_err(ApiError::InternalServerError)?;
let _ = tx.send(res);
Ok(())
}
.in_current_span(),
);
let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
json_response(StatusCode::OK, res)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -1099,8 +1099,9 @@ impl Timeline {
Ok(Some(true))
}
/// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
/// Evict just one layer.
///
/// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let _gate = self
.gate
@@ -1111,109 +1112,17 @@ impl Timeline {
return Ok(None);
};
let Some(local_layer) = local_layer.keep_resident().await? else {
return Ok(Some(false));
};
let local_layer: Layer = local_layer.into();
let remote_client = self
let rtc = self
.remote_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
let results = self
.evict_layer_batch(remote_client, &[local_layer])
.await?;
assert_eq!(results.len(), 1);
let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
match result {
None => anyhow::bail!("task_mgr shutdown requested"),
Some(Ok(())) => Ok(Some(true)),
Some(Err(e)) => Err(anyhow::Error::new(e)),
match local_layer.evict_and_wait(rtc).await {
Ok(()) => Ok(Some(true)),
Err(EvictionError::NotFound) => Ok(Some(false)),
Err(EvictionError::Downloaded) => Ok(Some(false)),
}
}
/// Evict a batch of layers.
pub(crate) async fn evict_layers(
&self,
layers_to_evict: &[Layer],
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
let _gate = self
.gate
.enter()
.map_err(|_| anyhow::anyhow!("Shutting down"))?;
let remote_client = self
.remote_client
.as_ref()
.context("timeline must have RemoteTimelineClient")?;
self.evict_layer_batch(remote_client, layers_to_evict).await
}
/// Evict multiple layers at once, continuing through errors.
///
/// The `remote_client` should be this timeline's `self.remote_client`.
/// We make the caller provide it so that they are responsible for handling the case
/// where someone wants to evict the layer but no remote storage is configured.
///
/// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`.
/// If `Err()` is returned, no eviction was attempted.
/// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`.
/// Meaning of each `result[i]`:
/// - `Some(Err(...))` if layer replacement failed for some reason
/// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks)
/// - `Some(Ok(()))` if everything went well.
/// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`.
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Layer],
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
{
// to avoid racing with detach and delete_timeline
let state = self.current_state();
anyhow::ensure!(
state == TimelineState::Active,
"timeline is not active but {state:?}"
);
}
let mut results = Vec::with_capacity(layers_to_evict.len());
for _ in 0..layers_to_evict.len() {
results.push(None);
}
let mut js = tokio::task::JoinSet::new();
for (i, l) in layers_to_evict.iter().enumerate() {
js.spawn({
let l = l.to_owned();
let remote_client = remote_client.clone();
async move { (i, l.evict_and_wait(&remote_client).await) }
});
}
let join = async {
while let Some(next) = js.join_next().await {
match next {
Ok((i, res)) => results[i] = Some(res),
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
}
};
tokio::select! {
_ = self.cancel.cancelled() => {},
_ = join => {}
}
assert_eq!(results.len(), layers_to_evict.len());
Ok(results)
}
}
/// Number of times we will compute partition within a checkpoint distance.
@@ -4586,7 +4495,7 @@ mod tests {
.await
.unwrap();
let rc = timeline
let rtc = timeline
.remote_client
.clone()
.expect("just configured this");
@@ -4599,16 +4508,12 @@ mod tests {
.expect("should had been resident")
.drop_eviction_guard();
let batch = [layer];
let first = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() };
let second = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() };
let first = async { layer.evict_and_wait(&rtc).await };
let second = async { layer.evict_and_wait(&rtc).await };
let (first, second) = tokio::join!(first, second);
let (first, second) = (only_one(first), only_one(second));
let res = batch[0].keep_resident().await;
let res = layer.keep_resident().await;
assert!(matches!(res, Ok(None)), "{res:?}");
match (first, second) {
@@ -4629,14 +4534,6 @@ mod tests {
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
}
fn only_one<T>(mut input: Vec<Option<T>>) -> T {
assert_eq!(1, input.len());
input
.pop()
.expect("length just checked")
.expect("no cancellation")
}
async fn find_some_layer(timeline: &Timeline) -> Layer {
let layers = timeline.layers.read().await;
let desc = layers

View File

@@ -212,11 +212,21 @@ impl Timeline {
// Gather layers for eviction.
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<_> = {
let remote_client = match self.remote_client.as_ref() {
Some(c) => c,
None => {
error!("no remote storage configured, cannot evict layers");
return ControlFlow::Continue(());
}
};
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read().await;
let layers = guard.layer_map();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = guard.get_from_desc(&hist_layer);
@@ -262,54 +272,49 @@ impl Timeline {
continue;
}
};
let layer = guard.drop_eviction_guard();
if no_activity_for > p.threshold {
candidates.push(guard.drop_eviction_guard())
let remote_client = remote_client.clone();
// this could cause a lot of allocations in some cases
js.spawn(async move { layer.evict_and_wait(&remote_client).await });
stats.candidates += 1;
}
}
candidates
};
stats.candidates = candidates.len();
let remote_client = match self.remote_client.as_ref() {
None => {
error!(
num_candidates = candidates.len(),
"no remote storage configured, cannot evict layers"
);
return ControlFlow::Continue(());
}
Some(c) => c,
};
let results = match self.evict_layer_batch(remote_client, &candidates).await {
Err(pre_err) => {
stats.errors += candidates.len();
error!("could not do any evictions: {pre_err:#}");
return ControlFlow::Continue(());
let join_all = async move {
while let Some(next) = js.join_next().await {
match next {
Ok(Ok(())) => stats.evicted += 1,
Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
stats.not_evictable += 1;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
/* already logged */
stats.errors += 1;
}
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
}
Ok(results) => results,
stats
};
assert_eq!(results.len(), candidates.len());
for result in results {
match result {
None => {
stats.skipped_for_shutdown += 1;
}
Some(Ok(())) => {
stats.evicted += 1;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
stats.not_evictable += 1;
tokio::select! {
stats = join_all => {
if stats.candidates == stats.not_evictable {
debug!(stats=?stats, "eviction iteration complete");
} else if stats.errors > 0 || stats.not_evictable > 0 {
warn!(stats=?stats, "eviction iteration complete");
} else {
info!(stats=?stats, "eviction iteration complete");
}
}
_ = cancel.cancelled() => {
// just drop the joinset to "abort"
}
}
if stats.candidates == stats.not_evictable {
debug!(stats=?stats, "eviction iteration complete");
} else if stats.errors > 0 || stats.not_evictable > 0 {
warn!(stats=?stats, "eviction iteration complete");
} else {
info!(stats=?stats, "eviction iteration complete");
}
ControlFlow::Continue(())
}