mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Do more on-demand downloads where needed (#3194)
The PR aims to fix two missing redownloads in a flacky test_remote_storage_upload_queue_retries[local_fs] ([example](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-3190/release/3759194738/index.html#categories/80f1dcdd7c08252126be7e9f44fe84e6/8a70800f7ab13620/)) 1. missing redownload during walreceiver work ``` 2022-12-22T16:09:51.509891Z ERROR wal_connection_manager{tenant=fb62b97553e40f949de8bdeab7f93563 timeline=4f153bf6a58fd63832f6ee175638d049}: wal receiver task finished with an error: walreceiver connection handling failure Caused by: Layer needs downloading Stack backtrace: 0: pageserver::tenant::timeline::PageReconstructResult<T>::no_ondemand_download at /__w/neon/neon/pageserver/src/tenant/timeline.rs:467:59 1: pageserver::walingest::WalIngest::new at /__w/neon/neon/pageserver/src/walingest.rs:61:32 2: pageserver::walreceiver::walreceiver_connection::handle_walreceiver_connection::{{closure}} at /__w/neon/neon/pageserver/src/walreceiver/walreceiver_connection.rs:178:25 .... ``` That looks sad, but inevitable during the current approach: seems that we need to wait for old layers to arrive in order to accept new data. For that, `WalIngest::new` now started to return the `PageReconstructResult`. Sync methods from `import_datadir.rs` use `WalIngest::new` too, but both of them import WAL during timeline creation, so no layers to download are needed there, ergo the `PageReconstructResult` is converted to `anyhow::Result` with `no_ondemand_download`. 2. missing redownload during compaction work ``` 2022-12-22T16:09:51.090296Z ERROR compaction_loop{tenant_id=fb62b97553e40f949de8bdeab7f93563}:compact_timeline{timeline=4f153bf6a58fd63832f6ee175638d049}: could not compact, repartitioning keyspace failed: Layer needs downloading Stack backtrace: 0: pageserver::tenant::timeline::PageReconstructResult<T>::no_ondemand_download at /__w/neon/neon/pageserver/src/tenant/timeline.rs:467:59 1: pageserver::pgdatadir_mapping::<impl pageserver::tenant::timeline::Timeline>::collect_keyspace::{{closure}} at /__w/neon/neon/pageserver/src/pgdatadir_mapping.rs:506:41 <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 pageserver::tenant::timeline::Timeline::repartition::{{closure}} at /__w/neon/neon/pageserver/src/tenant/timeline.rs:2161:50 <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 2: pageserver::tenant::timeline::Timeline::compact::{{closure}} at /__w/neon/neon/pageserver/src/tenant/timeline.rs:700:14 <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 3: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll at /github/home/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.37/src/instrument.rs:272:9 4: pageserver::tenant::Tenant::compaction_iteration::{{closure}} at /__w/neon/neon/pageserver/src/tenant.rs:1232:85 <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 pageserver::tenant_tasks::compaction_loop::{{closure}}::{{closure}} at /__w/neon/neon/pageserver/src/tenant_tasks.rs:76:62 <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll at /rustc/e092d0b6b43f2de967af0887873151bb1c0b18d3/library/core/src/future/mod.rs:91:19 pageserver::tenant_tasks::compaction_loop::{{closure}} at /__w/neon/neon/pageserver/src/tenant_tasks.rs:91:6 ```
This commit is contained in:
@@ -237,14 +237,19 @@ fn import_slru<Reader: Read>(
|
||||
|
||||
/// Scan PostgreSQL WAL files in given directory and load all records between
|
||||
/// 'startpoint' and 'endpoint' into the repository.
|
||||
fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> {
|
||||
fn import_wal(
|
||||
walpath: &Path,
|
||||
tline: &Timeline,
|
||||
startpoint: Lsn,
|
||||
endpoint: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
|
||||
|
||||
let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
|
||||
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut last_lsn = startpoint;
|
||||
|
||||
let mut walingest = WalIngest::new(tline, startpoint)?;
|
||||
let mut walingest = WalIngest::new(tline, startpoint).no_ondemand_download()?;
|
||||
|
||||
while last_lsn <= endpoint {
|
||||
// FIXME: assume postgresql tli 1 for now
|
||||
@@ -362,7 +367,7 @@ pub fn import_wal_from_tar<Reader: Read>(
|
||||
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
|
||||
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut last_lsn = start_lsn;
|
||||
let mut walingest = WalIngest::new(tline, start_lsn)?;
|
||||
let mut walingest = WalIngest::new(tline, start_lsn).no_ondemand_download()?;
|
||||
|
||||
// Ingest wal until end_lsn
|
||||
info!("importing wal until {}", end_lsn);
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
//!
|
||||
use super::tenant::PageReconstructResult;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::{with_ondemand_download, Timeline};
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use crate::{repository::*, try_no_ondemand_download};
|
||||
use anyhow::Context;
|
||||
@@ -503,12 +503,11 @@ impl Timeline {
|
||||
result.add_key(relmap_file_key(spcnode, dbnode));
|
||||
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
|
||||
let mut rels: Vec<RelTag> = self
|
||||
.list_rels(spcnode, dbnode, lsn)
|
||||
.no_ondemand_download()?
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect();
|
||||
let mut rels: Vec<RelTag> =
|
||||
with_ondemand_download(|| self.list_rels(spcnode, dbnode, lsn))
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
rels.sort_unstable();
|
||||
for rel in rels {
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
|
||||
@@ -33,10 +33,10 @@ use tracing::*;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::PageReconstructResult;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::try_no_ondemand_download;
|
||||
use crate::try_page_reconstruct_result as try_prr;
|
||||
use crate::walrecord::*;
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::{try_no_ondemand_download, try_page_reconstruct_result};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
@@ -55,14 +55,16 @@ pub struct WalIngest<'a> {
|
||||
}
|
||||
|
||||
impl<'a> WalIngest<'a> {
|
||||
pub fn new(timeline: &Timeline, startpoint: Lsn) -> anyhow::Result<WalIngest> {
|
||||
pub fn new(timeline: &Timeline, startpoint: Lsn) -> PageReconstructResult<WalIngest> {
|
||||
// Fetch the latest checkpoint into memory, so that we can compare with it
|
||||
// quickly in `ingest_record` and update it when it changes.
|
||||
let checkpoint_bytes = timeline.get_checkpoint(startpoint).no_ondemand_download()?;
|
||||
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
let checkpoint_bytes = try_no_ondemand_download!(timeline.get_checkpoint(startpoint));
|
||||
let checkpoint = try_page_reconstruct_result!(
|
||||
CheckPoint::decode(&checkpoint_bytes).context("Failed to decode checkpoint bytes")
|
||||
);
|
||||
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
||||
|
||||
Ok(WalIngest {
|
||||
PageReconstructResult::Success(WalIngest {
|
||||
timeline,
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
@@ -1122,7 +1124,7 @@ mod tests {
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
|
||||
m.commit()?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10))?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10)).no_ondemand_download()?;
|
||||
|
||||
Ok(walingest)
|
||||
}
|
||||
|
||||
@@ -175,7 +175,8 @@ pub async fn handle_walreceiver_connection(
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;
|
||||
let mut walingest =
|
||||
with_ondemand_download(|| WalIngest::new(timeline.as_ref(), startpoint)).await?;
|
||||
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
|
||||
Reference in New Issue
Block a user