mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-05 05:00:37 +00:00
pageserver: retry forever & cancellation token on index download
This commit is contained in:
@@ -1216,11 +1216,12 @@ impl Tenant {
|
||||
timeline_id,
|
||||
self.generation,
|
||||
);
|
||||
let cancel_clone = cancel.clone();
|
||||
part_downloads.spawn(
|
||||
async move {
|
||||
debug!("starting index part download");
|
||||
|
||||
let index_part = client.download_index_file().await;
|
||||
let index_part = client.download_index_file(cancel_clone).await;
|
||||
|
||||
debug!("finished index part download");
|
||||
|
||||
|
||||
@@ -446,7 +446,10 @@ impl RemoteTimelineClient {
|
||||
//
|
||||
|
||||
/// Download index file
|
||||
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
|
||||
pub async fn download_index_file(
|
||||
&self,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<MaybeDeletedIndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Index,
|
||||
&RemoteOpKind::Download,
|
||||
@@ -460,6 +463,7 @@ impl RemoteTimelineClient {
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
self.tenant_id,
|
||||
@@ -1633,7 +1637,11 @@ mod tests {
|
||||
let client = timeline.remote_client.as_ref().unwrap();
|
||||
|
||||
// Download back the index.json, and check that the list of files is correct
|
||||
let initial_index_part = match client.download_index_file().await.unwrap() {
|
||||
let initial_index_part = match client
|
||||
.download_index_file(CancellationToken::new())
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
|
||||
};
|
||||
@@ -1725,7 +1733,11 @@ mod tests {
|
||||
}
|
||||
|
||||
// Download back the index.json, and check that the list of files is correct
|
||||
let index_part = match client.download_index_file().await.unwrap() {
|
||||
let index_part = match client
|
||||
.download_index_file(CancellationToken::new())
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
|
||||
};
|
||||
@@ -1916,7 +1928,7 @@ mod tests {
|
||||
let client = test_state.build_client(get_generation);
|
||||
|
||||
let download_r = client
|
||||
.download_index_file()
|
||||
.download_index_file(CancellationToken::new())
|
||||
.await
|
||||
.expect("download should always succeed");
|
||||
assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
|
||||
|
||||
@@ -223,10 +223,11 @@ async fn do_download_index_part(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
index_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry(
|
||||
let index_part_bytes = download_retry_forever(
|
||||
|| async {
|
||||
let mut index_part_download = storage.download(&remote_path).await?;
|
||||
|
||||
@@ -241,6 +242,7 @@ async fn do_download_index_part(
|
||||
Ok(index_part_bytes)
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -262,19 +264,28 @@ pub(super) async fn download_index_part(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if my_generation.is_none() {
|
||||
// Operating without generations: just fetch the generation-less path
|
||||
return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
|
||||
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
|
||||
// index in our generation.
|
||||
//
|
||||
// This is an optimization to avoid doing the listing for the general case below.
|
||||
let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
|
||||
let res = do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
my_generation,
|
||||
cancel.clone(),
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(index_part) => {
|
||||
tracing::debug!(
|
||||
@@ -294,8 +305,14 @@ pub(super) async fn download_index_part(
|
||||
// we want to find the most recent index from a previous generation.
|
||||
//
|
||||
// This is an optimization to avoid doing the listing for the general case below.
|
||||
let res =
|
||||
do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await;
|
||||
let res = do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
my_generation.previous(),
|
||||
cancel.clone(),
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(index_part) => {
|
||||
tracing::debug!("Found index_part from previous generation");
|
||||
@@ -339,13 +356,14 @@ pub(super) async fn download_index_part(
|
||||
match max_previous_generation {
|
||||
Some(g) => {
|
||||
tracing::debug!("Found index_part in generation {g:?}");
|
||||
do_download_index_part(storage, tenant_id, timeline_id, g).await
|
||||
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
|
||||
}
|
||||
None => {
|
||||
// Migration from legacy pre-generation state: we have a generation but no prior
|
||||
// attached pageservers did. Try to load from a no-generation path.
|
||||
tracing::info!("No index_part.json* found");
|
||||
do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await
|
||||
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user