mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
integrate: download_all_layers
this time around with graceful cancellation.
This commit is contained in:
@@ -10,12 +10,10 @@ mod walreceiver;
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
TimelineState,
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo,
|
||||
LayerResidenceStatus, TimelineState,
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde_with::serde_as;
|
||||
@@ -4013,155 +4011,12 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Download a layer file from remote storage and insert it into the layer map.
|
||||
///
|
||||
/// It's safe to call this function for the same layer concurrently. In that case:
|
||||
/// - If the layer has already been downloaded, `OK(...)` is returned.
|
||||
/// - If the layer is currently being downloaded, we wait until that download succeeded / failed.
|
||||
/// - If it succeeded, we return `Ok(...)`.
|
||||
/// - If it failed, we or another concurrent caller will initiate a new download attempt.
|
||||
///
|
||||
/// Download errors are classified and retried if appropriate by the underlying RemoteTimelineClient function.
|
||||
/// It has an internal limit for the maximum number of retries and prints appropriate log messages.
|
||||
/// If we exceed the limit, it returns an error, and this function passes it through.
|
||||
/// The caller _could_ retry further by themselves by calling this function again, but _should not_ do it.
|
||||
/// The reason is that they cannot distinguish permanent errors from temporary ones, whereas
|
||||
/// the underlying RemoteTimelineClient can.
|
||||
///
|
||||
/// There is no internal timeout or slowness detection.
|
||||
/// If the caller has a deadline or needs a timeout, they can simply stop polling:
|
||||
/// we're **cancellation-safe** because the download happens in a separate task_mgr task.
|
||||
/// So, the current download attempt will run to completion even if we stop polling.
|
||||
#[instrument(skip_all, fields(layer=%remote_layer))]
|
||||
pub async fn download_remote_layer(
|
||||
&self,
|
||||
remote_layer: Arc<RemoteLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
|
||||
let permit = match Arc::clone(&remote_layer.ongoing_download)
|
||||
.acquire_owned()
|
||||
.await
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(_closed) => {
|
||||
if remote_layer.download_replacement_failure.load(Relaxed) {
|
||||
// this path will be hit often, in case there are upper retries. however
|
||||
// hitting this error will prevent a busy loop between get_reconstruct_data and
|
||||
// download, so an error is prefered.
|
||||
//
|
||||
// TODO: we really should poison the timeline, but panicking is not yet
|
||||
// supported. Related: https://github.com/neondatabase/neon/issues/3621
|
||||
anyhow::bail!("an earlier download succeeded but LayerMap::replace failed")
|
||||
} else {
|
||||
info!("download of layer has already finished");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
// Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline.
|
||||
let self_clone = self.myself.upgrade().expect("timeline is gone");
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::RemoteDownloadTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
&format!("download layer {}", remote_layer),
|
||||
false,
|
||||
async move {
|
||||
let remote_client = self_clone.remote_client.as_ref().unwrap();
|
||||
|
||||
// Does retries + exponential back-off internally.
|
||||
// When this fails, don't layer further retry attempts here.
|
||||
let result = remote_client
|
||||
.download_layer_file(&remote_layer.filename(), &remote_layer.layer_metadata)
|
||||
.await;
|
||||
|
||||
if let Ok(size) = &result {
|
||||
info!("layer file download finished");
|
||||
|
||||
// XXX the temp file is still around in Err() case
|
||||
// and consumes space until we clean up upon pageserver restart.
|
||||
self_clone.metrics.resident_physical_size_gauge.add(*size);
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut guard = self_clone.layers.write().await;
|
||||
let new_layer =
|
||||
remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size);
|
||||
{
|
||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||
let failure = match guard.replace_and_verify(l, new_layer) {
|
||||
Ok(()) => false,
|
||||
Err(e) => {
|
||||
// this is a precondition failure, the layer filename derived
|
||||
// attributes didn't match up, which doesn't seem likely.
|
||||
error!("replacing downloaded layer into layermap failed: {e:#?}");
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if failure {
|
||||
// mark the remote layer permanently failed; the timeline is most
|
||||
// likely unusable after this. sadly we cannot just poison the layermap
|
||||
// lock with panic, because that would create an issue with shutdown.
|
||||
//
|
||||
// this does not change the retry semantics on failed downloads.
|
||||
//
|
||||
// use of Relaxed is valid because closing of the semaphore gives
|
||||
// happens-before and wakes up any waiters; we write this value before
|
||||
// and any waiters (or would be waiters) will load it after closing
|
||||
// semaphore.
|
||||
//
|
||||
// See: https://github.com/neondatabase/neon/issues/3533
|
||||
remote_layer
|
||||
.download_replacement_failure
|
||||
.store(true, Relaxed);
|
||||
}
|
||||
}
|
||||
drop_wlock(guard);
|
||||
|
||||
info!("on-demand download successful");
|
||||
|
||||
// Now that we've inserted the download into the layer map,
|
||||
// close the semaphore. This will make other waiters for
|
||||
// this download return Ok(()).
|
||||
assert!(!remote_layer.ongoing_download.is_closed());
|
||||
remote_layer.ongoing_download.close();
|
||||
} else {
|
||||
// Keep semaphore open. We'll drop the permit at the end of the function.
|
||||
error!(
|
||||
"layer file download failed: {:?}",
|
||||
result.as_ref().unwrap_err()
|
||||
);
|
||||
}
|
||||
|
||||
// Don't treat it as an error if the task that triggered the download
|
||||
// is no longer interested in the result.
|
||||
sender.send(result.map(|_sz| ())).ok();
|
||||
|
||||
// In case we failed and there are other waiters, this will make one
|
||||
// of them retry the download in a new task.
|
||||
// XXX: This resets the exponential backoff because it's a new call to
|
||||
// download_layer file.
|
||||
drop(permit);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
receiver.await.context("download task cancelled")?
|
||||
}
|
||||
|
||||
pub async fn spawn_download_all_remote_layers(
|
||||
self: Arc<Self>,
|
||||
request: DownloadRemoteLayersTaskSpawnRequest,
|
||||
) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
|
||||
use pageserver_api::models::DownloadRemoteLayersTaskState;
|
||||
|
||||
let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
|
||||
if let Some(st) = &*status_guard {
|
||||
match &st.state {
|
||||
@@ -4220,21 +4075,17 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
request: DownloadRemoteLayersTaskSpawnRequest,
|
||||
) {
|
||||
let mut downloads = Vec::new();
|
||||
{
|
||||
use pageserver_api::models::DownloadRemoteLayersTaskState;
|
||||
|
||||
let remaining = {
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
layers
|
||||
guard
|
||||
.layer_map()
|
||||
.iter_historic_layers()
|
||||
.map(|l| guard.get_from_desc(&l))
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
.map(|l| self.download_remote_layer(l))
|
||||
.for_each(|dl| downloads.push(dl))
|
||||
}
|
||||
let total_layer_count = downloads.len();
|
||||
// limit download concurrency as specified in request
|
||||
let downloads = futures::stream::iter(downloads);
|
||||
let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get());
|
||||
.map(|desc| guard.get_from_desc(&desc))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
let total_layer_count = remaining.len();
|
||||
|
||||
macro_rules! lock_status {
|
||||
($st:ident) => {
|
||||
@@ -4257,29 +4108,58 @@ impl Timeline {
|
||||
lock_status!(st);
|
||||
st.total_layer_count = total_layer_count as u64;
|
||||
}
|
||||
|
||||
let mut remaining = remaining.into_iter();
|
||||
let mut have_remaining = true;
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
|
||||
let limit = request.max_concurrent_downloads;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
dl = downloads.next() => {
|
||||
lock_status!(st);
|
||||
match dl {
|
||||
None => break,
|
||||
Some(Ok(())) => {
|
||||
st.successful_download_count += 1;
|
||||
},
|
||||
Some(Err(e)) => {
|
||||
error!(error = %e, "layer download failed");
|
||||
st.failed_download_count += 1;
|
||||
}
|
||||
while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
|
||||
let Some(next) = remaining.next() else {
|
||||
have_remaining = false;
|
||||
break;
|
||||
};
|
||||
|
||||
let span = tracing::info_span!("download", layer = %next);
|
||||
|
||||
js.spawn(
|
||||
async move {
|
||||
let res = next.get_or_download(None).await;
|
||||
(next, res)
|
||||
}
|
||||
}
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// Kind of pointless to watch for shutdowns here,
|
||||
// as download_remote_layer spawns other task_mgr tasks internally.
|
||||
lock_status!(st);
|
||||
st.state = DownloadRemoteLayersTaskState::ShutDown;
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
|
||||
while let Some(res) = js.join_next().await {
|
||||
match res {
|
||||
Ok((_, Ok(_))) => {
|
||||
lock_status!(st);
|
||||
st.successful_download_count += 1;
|
||||
}
|
||||
Ok((layer, Err(e))) => {
|
||||
tracing::error!(%layer, "download failed: {e:#}");
|
||||
lock_status!(st);
|
||||
st.failed_download_count += 1;
|
||||
}
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used here"),
|
||||
Err(je) if je.is_panic() => {
|
||||
lock_status!(st);
|
||||
st.failed_download_count += 1;
|
||||
}
|
||||
Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
lock_status!(st);
|
||||
st.state = DownloadRemoteLayersTaskState::Completed;
|
||||
|
||||
Reference in New Issue
Block a user