Replace ring buffer for buffered walredp requests with VecDeqeu

This commit is contained in:
Konstantin Knizhnik
2022-11-14 17:55:43 +02:00
parent e6b484dbea
commit fa69ee3f90

View File

@@ -22,6 +22,8 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use serde::Serialize;
use std::collections::VecDeque;
use std::fs;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
@@ -585,12 +587,8 @@ struct PostgresRedoProcess {
stdout: ChildStdout,
stderr: ChildStderr,
wal_redo_timeout: Duration,
// Ring buffer with channel identifiers of buffered redo requests
ring_buf: [Option<Sender<Bytes>>; N_CHANNELS],
// Head position in ring buffer
ring_buf_pos: usize,
// Number of buffered requests
n_buffered: usize,
// Double ended queue for buffered response senders
resp_deque: VecDeque<Sender<Bytes>>,
// Reconstructed page
page: [u8; BLCKSZ as usize],
// Position in reconstructed page buufer
@@ -602,7 +600,7 @@ struct PostgresRedoProcess {
impl PostgresRedoProcess {
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
fn receive(&mut self) -> Result<(), Error> {
while self.n_buffered != 0 {
while !self.resp_deque.is_empty() {
let n = loop {
match nix::poll::poll(
&mut self.poll_fds[0..2],
@@ -641,12 +639,15 @@ impl PostgresRedoProcess {
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
assert!(self.n_buffered != 0);
let sender = self.ring_buf[self.ring_buf_pos].take().unwrap();
self.n_buffered -= 1;
self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS;
if let Some(sender) = self.resp_deque.pop_front() {
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
} else {
return Err(Error::new(
ErrorKind::BrokenPipe,
"Malformed output of walredo process",
));
}
self.page_pos = 0;
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
@@ -662,9 +663,7 @@ impl PostgresRedoProcess {
fn send(&mut self, sender: Sender<Bytes>, data: Vec<u8>) -> Result<(), Error> {
let mut written = 0usize;
let data_len = data.len();
assert!(self.n_buffered < N_CHANNELS);
self.ring_buf[(self.ring_buf_pos + self.n_buffered) % N_CHANNELS] = Some(sender);
self.n_buffered += 1;
self.resp_deque.push_back(sender);
while written < data_len {
let n = loop {
match nix::poll::poll(
@@ -716,12 +715,15 @@ impl PostgresRedoProcess {
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
assert!(self.n_buffered != 0);
let sender = self.ring_buf[self.ring_buf_pos].take().unwrap();
self.n_buffered -= 1;
self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS;
if let Some(sender) = self.resp_deque.pop_front() {
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
} else {
return Err(Error::new(
ErrorKind::BrokenPipe,
"Malformed output of walredo process",
));
}
self.page_pos = 0;
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
@@ -862,9 +864,7 @@ impl PostgresRedoProcess {
stdout,
stderr,
wal_redo_timeout: conf.wal_redo_timeout,
ring_buf: Default::default(),
ring_buf_pos: 0,
n_buffered: 0,
resp_deque: VecDeque::with_capacity(N_CHANNELS),
page: [0u8; BLCKSZ as usize],
page_pos: 0,
poll_fds,