From 0bafb2a6c703f152bdd3a6ff194720d27eff0b4b Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 23 Dec 2022 15:39:59 +0200 Subject: [PATCH] Do more on-demand downloads where needed (#3194) The PR aims to fix two missing redownloads in a flacky test_remote_storage_upload_queue_retries[local_fs] ([example](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-3190/release/3759194738/index.html#categories/80f1dcdd7c08252126be7e9f44fe84e6/8a70800f7ab13620/)) 1. missing redownload during walreceiver work ``` 2022-12-22T16:09:51.509891Z ERROR wal_connection_manager{tenant=fb62b97553e40f949de8bdeab7f93563 timeline=4f153bf6a58fd63832f6ee175638d049}: wal receiver task finished with an error: walreceiver connection handling failure Caused by: Layer needs downloading Stack backtrace: 0: pageserver::tenant::timeline::PageReconstructResult::no_ondemand_download at /__w/neon/neon/pageserver/src/tenant/timeline.rs:467:59 1: pageserver::walingest::WalIngest::new at /__w/neon/neon/pageserver/src/walingest.rs:61:32 2: pageserver::walreceiver::walreceiver_connection::handle_walreceiver_connection::{{closure}} at /__w/neon/neon/pageserver/src/walreceiver/walreceiver_connection.rs:178:25 .... ``` That looks sad, but inevitable during the current approach: seems that we need to wait for old layers to arrive in order to accept new data. For that, `WalIngest::new` now started to return the `PageReconstructResult`. Sync methods from `import_datadir.rs` use `WalIngest::new` too, but both of them import WAL during timeline creation, so no layers to download are needed there, ergo the `PageReconstructResult` is converted to `anyhow::Result` with `no_ondemand_download`. 2. missing redownload during compaction work ``` 2022-12-22T16:09:51.090296Z ERROR compaction_loop{tenant_id=fb62b97553e40f949de8bdeab7f93563}:compact_timeline{timeline=4f153bf6a58fd63832f6ee175638d049}: could not compact, repartitioning keyspace failed: Layer needs downloading Stack backtrace: 0: pageserver::tenant::timeline::PageReconstructResult::no_ondemand_download at /__w/neon/neon/pageserver/src/tenant/timeline.rs:467:59 1: pageserver::pgdatadir_mapping::::collect_keyspace::{{closure}} at /__w/neon/neon/pageserver/src/pgdatadir_mapping.rs:506:41 as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 pageserver::tenant::timeline::Timeline::repartition::{{closure}} at /__w/neon/neon/pageserver/src/tenant/timeline.rs:2161:50 as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 2: pageserver::tenant::timeline::Timeline::compact::{{closure}} at /__w/neon/neon/pageserver/src/tenant/timeline.rs:700:14 as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 3: as core::future::future::Future>::poll at /github/home/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.37/src/instrument.rs:272:9 4: pageserver::tenant::Tenant::compaction_iteration::{{closure}} at /__w/neon/neon/pageserver/src/tenant.rs:1232:85 as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 pageserver::tenant_tasks::compaction_loop::{{closure}}::{{closure}} at /__w/neon/neon/pageserver/src/tenant_tasks.rs:76:62 as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 pageserver::tenant_tasks::compaction_loop::{{closure}} at /__w/neon/neon/pageserver/src/tenant_tasks.rs:91:6 ``` --- pageserver/src/import_datadir.rs | 11 ++++++++--- pageserver/src/pgdatadir_mapping.rs | 13 ++++++------- pageserver/src/walingest.rs | 14 ++++++++------ .../src/walreceiver/walreceiver_connection.rs | 3 ++- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 76ca183c9a..588b92c13f 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -237,14 +237,19 @@ fn import_slru( /// Scan PostgreSQL WAL files in given directory and load all records between /// 'startpoint' and 'endpoint' into the repository. -fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> { +fn import_wal( + walpath: &Path, + tline: &Timeline, + startpoint: Lsn, + endpoint: Lsn, +) -> anyhow::Result<()> { let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version); let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; - let mut walingest = WalIngest::new(tline, startpoint)?; + let mut walingest = WalIngest::new(tline, startpoint).no_ondemand_download()?; while last_lsn <= endpoint { // FIXME: assume postgresql tli 1 for now @@ -362,7 +367,7 @@ pub fn import_wal_from_tar( let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE); let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE); let mut last_lsn = start_lsn; - let mut walingest = WalIngest::new(tline, start_lsn)?; + let mut walingest = WalIngest::new(tline, start_lsn).no_ondemand_download()?; // Ingest wal until end_lsn info!("importing wal until {}", end_lsn); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 793dddef01..82b1576145 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -8,7 +8,7 @@ //! use super::tenant::PageReconstructResult; use crate::keyspace::{KeySpace, KeySpaceAccum}; -use crate::tenant::Timeline; +use crate::tenant::{with_ondemand_download, Timeline}; use crate::walrecord::NeonWalRecord; use crate::{repository::*, try_no_ondemand_download}; use anyhow::Context; @@ -503,12 +503,11 @@ impl Timeline { result.add_key(relmap_file_key(spcnode, dbnode)); result.add_key(rel_dir_to_key(spcnode, dbnode)); - let mut rels: Vec = self - .list_rels(spcnode, dbnode, lsn) - .no_ondemand_download()? - .iter() - .cloned() - .collect(); + let mut rels: Vec = + with_ondemand_download(|| self.list_rels(spcnode, dbnode, lsn)) + .await? + .into_iter() + .collect(); rels.sort_unstable(); for rel in rels { let relsize_key = rel_size_to_key(rel); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 26a77c02d4..031b80a6e0 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -33,10 +33,10 @@ use tracing::*; use crate::pgdatadir_mapping::*; use crate::tenant::PageReconstructResult; use crate::tenant::Timeline; -use crate::try_no_ondemand_download; use crate::try_page_reconstruct_result as try_prr; use crate::walrecord::*; use crate::ZERO_PAGE; +use crate::{try_no_ondemand_download, try_page_reconstruct_result}; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -55,14 +55,16 @@ pub struct WalIngest<'a> { } impl<'a> WalIngest<'a> { - pub fn new(timeline: &Timeline, startpoint: Lsn) -> anyhow::Result { + pub fn new(timeline: &Timeline, startpoint: Lsn) -> PageReconstructResult { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. - let checkpoint_bytes = timeline.get_checkpoint(startpoint).no_ondemand_download()?; - let checkpoint = CheckPoint::decode(&checkpoint_bytes)?; + let checkpoint_bytes = try_no_ondemand_download!(timeline.get_checkpoint(startpoint)); + let checkpoint = try_page_reconstruct_result!( + CheckPoint::decode(&checkpoint_bytes).context("Failed to decode checkpoint bytes") + ); trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); - Ok(WalIngest { + PageReconstructResult::Success(WalIngest { timeline, checkpoint, checkpoint_modified: false, @@ -1122,7 +1124,7 @@ mod tests { m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file m.commit()?; - let walingest = WalIngest::new(tline, Lsn(0x10))?; + let walingest = WalIngest::new(tline, Lsn(0x10)).no_ondemand_download()?; Ok(walingest) } diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index cc318cccc8..a98126e683 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -175,7 +175,8 @@ pub async fn handle_walreceiver_connection( let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version); - let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?; + let mut walingest = + with_ondemand_download(|| WalIngest::new(timeline.as_ref(), startpoint)).await?; while let Some(replication_message) = { select! {