diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 21c6ede27e..72865ad74d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -256,52 +256,53 @@ impl PostgresRedoManager { pg_version: u32, ) -> Result { let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?; - + const MAX_RETRY_ATTEMPTS: u32 = 1; let start_time = Instant::now(); + let mut n_attempts = 0u32; + loop { + let mut proc = self.stdin.lock().unwrap(); + let lock_time = Instant::now(); - let mut proc = self.stdin.lock().unwrap(); - let lock_time = Instant::now(); + // launch the WAL redo process on first use + if proc.is_none() { + self.launch(&mut proc, pg_version)?; + } + WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); - // launch the WAL redo process on first use - if proc.is_none() { - self.launch(&mut proc, pg_version)?; - } - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); + // Relational WAL records are applied using wal-redo-postgres + let buf_tag = BufferTag { rel, blknum }; + let result = self + .apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout) + .map_err(WalRedoError::IoError); - // Relational WAL records are applied using wal-redo-postgres - let buf_tag = BufferTag { rel, blknum }; - let result = self - .apply_wal_records(proc, buf_tag, base_img, records, wal_redo_timeout) - .map_err(WalRedoError::IoError); + let end_time = Instant::now(); + let duration = end_time.duration_since(lock_time); - let end_time = Instant::now(); - let duration = end_time.duration_since(lock_time); + let len = records.len(); + let nbytes = records.iter().fold(0, |acumulator, record| { + acumulator + + match &record.1 { + NeonWalRecord::Postgres { rec, .. } => rec.len(), + _ => unreachable!("Only PostgreSQL records are accepted in this batch"), + } + }); - let len = records.len(); - let nbytes = records.iter().fold(0, |acumulator, record| { - acumulator - + match &record.1 { - NeonWalRecord::Postgres { rec, .. } => rec.len(), - _ => unreachable!("Only PostgreSQL records are accepted in this batch"), - } - }); + WAL_REDO_TIME.observe(duration.as_secs_f64()); + WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64); + WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); - WAL_REDO_TIME.observe(duration.as_secs_f64()); - WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64); - WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); + debug!( + "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", + len, + nbytes, + duration.as_micros(), + lsn + ); - debug!( - "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", - len, - nbytes, - duration.as_micros(), - lsn - ); - - // If something went wrong, don't try to reuse the process. Kill it, and - // next request will launch a new one. - if result.is_err() { - error!( + // If something went wrong, don't try to reuse the process. Kill it, and + // next request will launch a new one. + if result.is_err() { + error!( "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}", records.len(), records.first().map(|p| p.0).unwrap_or(Lsn(0)), @@ -310,24 +311,28 @@ impl PostgresRedoManager { base_img_lsn, lsn ); - // self.stdin only holds stdin & stderr as_raw_fd(). - // Dropping it as part of take() doesn't close them. - // The owning objects (ChildStdout and ChildStderr) are stored in - // self.stdout and self.stderr, respsectively. - // We intentionally keep them open here to avoid a race between - // currently running `apply_wal_records()` and a `launch()` call - // after we return here. - // The currently running `apply_wal_records()` must not read from - // the newly launched process. - // By keeping self.stdout and self.stderr open here, `launch()` will - // get other file descriptors for the new child's stdout and stderr, - // and hence the current `apply_wal_records()` calls will observe - // `output.stdout.as_raw_fd() != stdout_fd` . - if let Some(proc) = self.stdin.lock().unwrap().take() { - proc.child.kill_and_wait(); + // self.stdin only holds stdin & stderr as_raw_fd(). + // Dropping it as part of take() doesn't close them. + // The owning objects (ChildStdout and ChildStderr) are stored in + // self.stdout and self.stderr, respsectively. + // We intentionally keep them open here to avoid a race between + // currently running `apply_wal_records()` and a `launch()` call + // after we return here. + // The currently running `apply_wal_records()` must not read from + // the newly launched process. + // By keeping self.stdout and self.stderr open here, `launch()` will + // get other file descriptors for the new child's stdout and stderr, + // and hence the current `apply_wal_records()` calls will observe + // `output.stdout.as_raw_fd() != stdout_fd` . + if let Some(proc) = self.stdin.lock().unwrap().take() { + proc.child.kill_and_wait(); + } + } + n_attempts += 1; + if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() { + return result; } } - result } /// @@ -771,7 +776,7 @@ impl PostgresRedoManager { &self, mut input: MutexGuard>, tag: BufferTag, - base_img: Option, + base_img: &Option, records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> Result { @@ -787,7 +792,7 @@ impl PostgresRedoManager { let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); build_begin_redo_for_block_msg(tag, &mut writebuf); if let Some(img) = base_img { - build_push_page_msg(tag, &img, &mut writebuf); + build_push_page_msg(tag, img, &mut writebuf); } for (lsn, rec) in records.iter() { if let NeonWalRecord::Postgres {