From e3e57579a12daa12e53cd72dc455c92d0132946b Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 15 Aug 2023 20:23:31 +0300 Subject: [PATCH] integrate: download_all_layers this time around with graceful cancellation. --- pageserver/src/tenant/timeline.rs | 240 ++++++++---------------------- 1 file changed, 60 insertions(+), 180 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 623e8db2ce..5278fe108c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -10,12 +10,10 @@ mod walreceiver; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use fail::fail_point; -use futures::StreamExt; use itertools::Itertools; use pageserver_api::models::{ - DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, - DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus, - TimelineState, + DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, + LayerResidenceStatus, TimelineState, }; use remote_storage::GenericRemoteStorage; use serde_with::serde_as; @@ -4013,155 +4011,12 @@ impl Timeline { } } - /// Download a layer file from remote storage and insert it into the layer map. - /// - /// It's safe to call this function for the same layer concurrently. In that case: - /// - If the layer has already been downloaded, `OK(...)` is returned. - /// - If the layer is currently being downloaded, we wait until that download succeeded / failed. - /// - If it succeeded, we return `Ok(...)`. - /// - If it failed, we or another concurrent caller will initiate a new download attempt. - /// - /// Download errors are classified and retried if appropriate by the underlying RemoteTimelineClient function. - /// It has an internal limit for the maximum number of retries and prints appropriate log messages. - /// If we exceed the limit, it returns an error, and this function passes it through. - /// The caller _could_ retry further by themselves by calling this function again, but _should not_ do it. - /// The reason is that they cannot distinguish permanent errors from temporary ones, whereas - /// the underlying RemoteTimelineClient can. - /// - /// There is no internal timeout or slowness detection. - /// If the caller has a deadline or needs a timeout, they can simply stop polling: - /// we're **cancellation-safe** because the download happens in a separate task_mgr task. - /// So, the current download attempt will run to completion even if we stop polling. - #[instrument(skip_all, fields(layer=%remote_layer))] - pub async fn download_remote_layer( - &self, - remote_layer: Arc, - ) -> anyhow::Result<()> { - span::debug_assert_current_span_has_tenant_and_timeline_id(); - - use std::sync::atomic::Ordering::Relaxed; - - let permit = match Arc::clone(&remote_layer.ongoing_download) - .acquire_owned() - .await - { - Ok(permit) => permit, - Err(_closed) => { - if remote_layer.download_replacement_failure.load(Relaxed) { - // this path will be hit often, in case there are upper retries. however - // hitting this error will prevent a busy loop between get_reconstruct_data and - // download, so an error is prefered. - // - // TODO: we really should poison the timeline, but panicking is not yet - // supported. Related: https://github.com/neondatabase/neon/issues/3621 - anyhow::bail!("an earlier download succeeded but LayerMap::replace failed") - } else { - info!("download of layer has already finished"); - return Ok(()); - } - } - }; - - let (sender, receiver) = tokio::sync::oneshot::channel(); - // Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline. - let self_clone = self.myself.upgrade().expect("timeline is gone"); - task_mgr::spawn( - &tokio::runtime::Handle::current(), - TaskKind::RemoteDownloadTask, - Some(self.tenant_id), - Some(self.timeline_id), - &format!("download layer {}", remote_layer), - false, - async move { - let remote_client = self_clone.remote_client.as_ref().unwrap(); - - // Does retries + exponential back-off internally. - // When this fails, don't layer further retry attempts here. - let result = remote_client - .download_layer_file(&remote_layer.filename(), &remote_layer.layer_metadata) - .await; - - if let Ok(size) = &result { - info!("layer file download finished"); - - // XXX the temp file is still around in Err() case - // and consumes space until we clean up upon pageserver restart. - self_clone.metrics.resident_physical_size_gauge.add(*size); - - // Download complete. Replace the RemoteLayer with the corresponding - // Delta- or ImageLayer in the layer map. - let mut guard = self_clone.layers.write().await; - let new_layer = - remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size); - { - let l: Arc = remote_layer.clone(); - let failure = match guard.replace_and_verify(l, new_layer) { - Ok(()) => false, - Err(e) => { - // this is a precondition failure, the layer filename derived - // attributes didn't match up, which doesn't seem likely. - error!("replacing downloaded layer into layermap failed: {e:#?}"); - true - } - }; - - if failure { - // mark the remote layer permanently failed; the timeline is most - // likely unusable after this. sadly we cannot just poison the layermap - // lock with panic, because that would create an issue with shutdown. - // - // this does not change the retry semantics on failed downloads. - // - // use of Relaxed is valid because closing of the semaphore gives - // happens-before and wakes up any waiters; we write this value before - // and any waiters (or would be waiters) will load it after closing - // semaphore. - // - // See: https://github.com/neondatabase/neon/issues/3533 - remote_layer - .download_replacement_failure - .store(true, Relaxed); - } - } - drop_wlock(guard); - - info!("on-demand download successful"); - - // Now that we've inserted the download into the layer map, - // close the semaphore. This will make other waiters for - // this download return Ok(()). - assert!(!remote_layer.ongoing_download.is_closed()); - remote_layer.ongoing_download.close(); - } else { - // Keep semaphore open. We'll drop the permit at the end of the function. - error!( - "layer file download failed: {:?}", - result.as_ref().unwrap_err() - ); - } - - // Don't treat it as an error if the task that triggered the download - // is no longer interested in the result. - sender.send(result.map(|_sz| ())).ok(); - - // In case we failed and there are other waiters, this will make one - // of them retry the download in a new task. - // XXX: This resets the exponential backoff because it's a new call to - // download_layer file. - drop(permit); - - Ok(()) - } - .in_current_span(), - ); - - receiver.await.context("download task cancelled")? - } - pub async fn spawn_download_all_remote_layers( self: Arc, request: DownloadRemoteLayersTaskSpawnRequest, ) -> Result { + use pageserver_api::models::DownloadRemoteLayersTaskState; + let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap(); if let Some(st) = &*status_guard { match &st.state { @@ -4220,21 +4075,17 @@ impl Timeline { self: &Arc, request: DownloadRemoteLayersTaskSpawnRequest, ) { - let mut downloads = Vec::new(); - { + use pageserver_api::models::DownloadRemoteLayersTaskState; + + let remaining = { let guard = self.layers.read().await; - let layers = guard.layer_map(); - layers + guard + .layer_map() .iter_historic_layers() - .map(|l| guard.get_from_desc(&l)) - .filter_map(|l| l.downcast_remote_layer()) - .map(|l| self.download_remote_layer(l)) - .for_each(|dl| downloads.push(dl)) - } - let total_layer_count = downloads.len(); - // limit download concurrency as specified in request - let downloads = futures::stream::iter(downloads); - let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get()); + .map(|desc| guard.get_from_desc(&desc)) + .collect::>() + }; + let total_layer_count = remaining.len(); macro_rules! lock_status { ($st:ident) => { @@ -4257,29 +4108,58 @@ impl Timeline { lock_status!(st); st.total_layer_count = total_layer_count as u64; } + + let mut remaining = remaining.into_iter(); + let mut have_remaining = true; + let mut js = tokio::task::JoinSet::new(); + + let cancel = task_mgr::shutdown_token(); + + let limit = request.max_concurrent_downloads; + loop { - tokio::select! { - dl = downloads.next() => { - lock_status!(st); - match dl { - None => break, - Some(Ok(())) => { - st.successful_download_count += 1; - }, - Some(Err(e)) => { - error!(error = %e, "layer download failed"); - st.failed_download_count += 1; - } + while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() { + let Some(next) = remaining.next() else { + have_remaining = false; + break; + }; + + let span = tracing::info_span!("download", layer = %next); + + js.spawn( + async move { + let res = next.get_or_download(None).await; + (next, res) } - } - _ = task_mgr::shutdown_watcher() => { - // Kind of pointless to watch for shutdowns here, - // as download_remote_layer spawns other task_mgr tasks internally. - lock_status!(st); - st.state = DownloadRemoteLayersTaskState::ShutDown; + .instrument(span), + ); + } + + while let Some(res) = js.join_next().await { + match res { + Ok((_, Ok(_))) => { + lock_status!(st); + st.successful_download_count += 1; + } + Ok((layer, Err(e))) => { + tracing::error!(%layer, "download failed: {e:#}"); + lock_status!(st); + st.failed_download_count += 1; + } + Err(je) if je.is_cancelled() => unreachable!("not used here"), + Err(je) if je.is_panic() => { + lock_status!(st); + st.failed_download_count += 1; + } + Err(je) => tracing::warn!("unknown joinerror: {je:?}"), } } + + if js.is_empty() && (!have_remaining || cancel.is_cancelled()) { + break; + } } + { lock_status!(st); st.state = DownloadRemoteLayersTaskState::Completed;