diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f9fdd7cadf..1ef60c7a94 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,6 +22,8 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use nix::poll::*; use serde::Serialize; +use std::collections::VecDeque; +use std::fs; use std::fs::OpenOptions; use std::io::prelude::*; use std::io::{Error, ErrorKind}; @@ -585,12 +587,8 @@ struct PostgresRedoProcess { stdout: ChildStdout, stderr: ChildStderr, wal_redo_timeout: Duration, - // Ring buffer with channel identifiers of buffered redo requests - ring_buf: [Option>; N_CHANNELS], - // Head position in ring buffer - ring_buf_pos: usize, - // Number of buffered requests - n_buffered: usize, + // Double ended queue for buffered response senders + resp_deque: VecDeque>, // Reconstructed page page: [u8; BLCKSZ as usize], // Position in reconstructed page buufer @@ -602,7 +600,7 @@ struct PostgresRedoProcess { impl PostgresRedoProcess { #[instrument(skip_all,fields(tenant_id=%self.tenant_id))] fn receive(&mut self) -> Result<(), Error> { - while self.n_buffered != 0 { + while !self.resp_deque.is_empty() { let n = loop { match nix::poll::poll( &mut self.poll_fds[0..2], @@ -641,12 +639,15 @@ impl PostgresRedoProcess { if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?; if self.page_pos == BLCKSZ as usize { - assert!(self.n_buffered != 0); - let sender = self.ring_buf[self.ring_buf_pos].take().unwrap(); - self.n_buffered -= 1; - self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS; + if let Some(sender) = self.resp_deque.pop_front() { + sender.send(Bytes::copy_from_slice(&self.page)).unwrap(); + } else { + return Err(Error::new( + ErrorKind::BrokenPipe, + "Malformed output of walredo process", + )); + } self.page_pos = 0; - sender.send(Bytes::copy_from_slice(&self.page)).unwrap(); } } else if out_revents.contains(PollFlags::POLLHUP) { return Err(Error::new( @@ -662,9 +663,7 @@ impl PostgresRedoProcess { fn send(&mut self, sender: Sender, data: Vec) -> Result<(), Error> { let mut written = 0usize; let data_len = data.len(); - assert!(self.n_buffered < N_CHANNELS); - self.ring_buf[(self.ring_buf_pos + self.n_buffered) % N_CHANNELS] = Some(sender); - self.n_buffered += 1; + self.resp_deque.push_back(sender); while written < data_len { let n = loop { match nix::poll::poll( @@ -716,12 +715,15 @@ impl PostgresRedoProcess { if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?; if self.page_pos == BLCKSZ as usize { - assert!(self.n_buffered != 0); - let sender = self.ring_buf[self.ring_buf_pos].take().unwrap(); - self.n_buffered -= 1; - self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS; + if let Some(sender) = self.resp_deque.pop_front() { + sender.send(Bytes::copy_from_slice(&self.page)).unwrap(); + } else { + return Err(Error::new( + ErrorKind::BrokenPipe, + "Malformed output of walredo process", + )); + } self.page_pos = 0; - sender.send(Bytes::copy_from_slice(&self.page)).unwrap(); } } else if out_revents.contains(PollFlags::POLLHUP) { return Err(Error::new( @@ -862,9 +864,7 @@ impl PostgresRedoProcess { stdout, stderr, wal_redo_timeout: conf.wal_redo_timeout, - ring_buf: Default::default(), - ring_buf_pos: 0, - n_buffered: 0, + resp_deque: VecDeque::with_capacity(N_CHANNELS), page: [0u8; BLCKSZ as usize], page_pos: 0, poll_fds,