Include sender in sent message

This commit is contained in:
Konstantin Knizhnik
2022-11-11 09:54:50 +02:00
parent 32e0461863
commit e6b484dbea

View File

@@ -31,10 +31,8 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use std::{fs, io};
@@ -63,7 +61,6 @@ use postgres_ffi::BLCKSZ;
const N_CHANNELS: usize = 16;
const CHANNEL_SIZE: usize = 1024 * 1024;
const ERR_BUF_SIZE: usize = 8192;
type ChannelId = usize;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
@@ -111,11 +108,7 @@ pub trait WalRedoManager: Send + Sync {
pub struct PostgresRedoManager {
// mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads
// and limit size of buffer
sender: SyncSender<(ChannelId, Vec<u8>)>,
// set of receiver channels
receivers: Vec<Mutex<Receiver<Bytes>>>,
// atomicly incremented counter for choosing receiver
round_robin: AtomicUsize,
sender: SyncSender<(Sender<Bytes>, Vec<u8>)>,
}
/// Can this request be served by neon redo functions
@@ -204,29 +197,22 @@ impl PostgresRedoManager {
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
#[allow(clippy::type_complexity)]
let (tx, rx): (
SyncSender<(ChannelId, Vec<u8>)>,
Receiver<(ChannelId, Vec<u8>)>,
SyncSender<(Sender<Bytes>, Vec<u8>)>,
Receiver<(Sender<Bytes>, Vec<u8>)>,
) = mpsc::sync_channel(CHANNEL_SIZE);
let mut senders: Vec<Sender<Bytes>> = Vec::with_capacity(N_CHANNELS);
let mut receivers: Vec<Mutex<Receiver<Bytes>>> = Vec::with_capacity(N_CHANNELS);
for _ in 0..N_CHANNELS {
let (tx, rx) = mpsc::channel();
senders.push(tx);
receivers.push(Mutex::new(rx));
}
let _proxy = std::thread::spawn(move || {
while let Ok(mut proc) = PostgresRedoProcess::launch(conf, tenant_id) {
loop {
let (id, data) = rx.recv().unwrap();
if proc.send(id, data, &senders).is_err() {
let (sender, data) = rx.recv().unwrap();
if proc.send(sender, data).is_err() {
break;
}
while let Ok((id, data)) = rx.try_recv() {
if proc.send(id, data, &senders).is_err() {
while let Ok((sender, data)) = rx.try_recv() {
if proc.send(sender, data).is_err() {
break;
}
}
if proc.receive(&senders).is_err() {
if proc.receive().is_err() {
break;
}
}
@@ -234,11 +220,7 @@ impl PostgresRedoManager {
panic!("Failed to launch wal-redo postgres");
});
PostgresRedoManager {
sender: tx,
receivers,
round_robin: AtomicUsize::new(0),
}
PostgresRedoManager { sender: tx }
}
fn apply_wal_records(
@@ -273,10 +255,8 @@ impl PostgresRedoManager {
WAL_REDO_RECORDS_HISTOGRAM.observe(records.len() as f64);
WAL_REDO_BYTES_HISTOGRAM.observe(writebuf.len() as f64);
let id = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_CHANNELS;
let rx = self.receivers[id].lock().unwrap();
self.sender.send((id, writebuf)).unwrap();
let (tx, rx) = mpsc::channel();
self.sender.send((tx, writebuf)).unwrap();
Ok(rx.recv().unwrap())
}
@@ -606,7 +586,7 @@ struct PostgresRedoProcess {
stderr: ChildStderr,
wal_redo_timeout: Duration,
// Ring buffer with channel identifiers of buffered redo requests
ring_buf: [ChannelId; N_CHANNELS],
ring_buf: [Option<Sender<Bytes>>; N_CHANNELS],
// Head position in ring buffer
ring_buf_pos: usize,
// Number of buffered requests
@@ -621,7 +601,7 @@ struct PostgresRedoProcess {
impl PostgresRedoProcess {
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
fn receive(&mut self, senders: &[Sender<Bytes>]) -> Result<(), Error> {
fn receive(&mut self) -> Result<(), Error> {
while self.n_buffered != 0 {
let n = loop {
match nix::poll::poll(
@@ -662,13 +642,11 @@ impl PostgresRedoProcess {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
assert!(self.n_buffered != 0);
let id = self.ring_buf[self.ring_buf_pos];
let sender = self.ring_buf[self.ring_buf_pos].take().unwrap();
self.n_buffered -= 1;
self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS;
self.page_pos = 0;
senders[id]
.send(Bytes::copy_from_slice(&self.page))
.unwrap();
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
@@ -681,16 +659,11 @@ impl PostgresRedoProcess {
}
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
fn send(
&mut self,
id: ChannelId,
data: Vec<u8>,
senders: &[Sender<Bytes>],
) -> Result<(), Error> {
fn send(&mut self, sender: Sender<Bytes>, data: Vec<u8>) -> Result<(), Error> {
let mut written = 0usize;
let data_len = data.len();
assert!(self.n_buffered < N_CHANNELS);
self.ring_buf[(self.ring_buf_pos + self.n_buffered) % N_CHANNELS] = id;
self.ring_buf[(self.ring_buf_pos + self.n_buffered) % N_CHANNELS] = Some(sender);
self.n_buffered += 1;
while written < data_len {
let n = loop {
@@ -744,13 +717,11 @@ impl PostgresRedoProcess {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
assert!(self.n_buffered != 0);
let id = self.ring_buf[self.ring_buf_pos];
let sender = self.ring_buf[self.ring_buf_pos].take().unwrap();
self.n_buffered -= 1;
self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS;
self.page_pos = 0;
senders[id]
.send(Bytes::copy_from_slice(&self.page))
.unwrap();
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
@@ -891,7 +862,7 @@ impl PostgresRedoProcess {
stdout,
stderr,
wal_redo_timeout: conf.wal_redo_timeout,
ring_buf: [0; N_CHANNELS],
ring_buf: Default::default(),
ring_buf_pos: 0,
n_buffered: 0,
page: [0u8; BLCKSZ as usize],