diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 16d815de00..f9fdd7cadf 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -31,10 +31,8 @@ use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::process::Stdio; use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender, SyncSender}; -use std::sync::Mutex; use std::time::Duration; use std::time::Instant; use std::{fs, io}; @@ -63,7 +61,6 @@ use postgres_ffi::BLCKSZ; const N_CHANNELS: usize = 16; const CHANNEL_SIZE: usize = 1024 * 1024; const ERR_BUF_SIZE: usize = 8192; -type ChannelId = usize; /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. @@ -111,11 +108,7 @@ pub trait WalRedoManager: Send + Sync { pub struct PostgresRedoManager { // mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads // and limit size of buffer - sender: SyncSender<(ChannelId, Vec)>, - // set of receiver channels - receivers: Vec>>, - // atomicly incremented counter for choosing receiver - round_robin: AtomicUsize, + sender: SyncSender<(Sender, Vec)>, } /// Can this request be served by neon redo functions @@ -204,29 +197,22 @@ impl PostgresRedoManager { pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager { #[allow(clippy::type_complexity)] let (tx, rx): ( - SyncSender<(ChannelId, Vec)>, - Receiver<(ChannelId, Vec)>, + SyncSender<(Sender, Vec)>, + Receiver<(Sender, Vec)>, ) = mpsc::sync_channel(CHANNEL_SIZE); - let mut senders: Vec> = Vec::with_capacity(N_CHANNELS); - let mut receivers: Vec>> = Vec::with_capacity(N_CHANNELS); - for _ in 0..N_CHANNELS { - let (tx, rx) = mpsc::channel(); - senders.push(tx); - receivers.push(Mutex::new(rx)); - } let _proxy = std::thread::spawn(move || { while let Ok(mut proc) = PostgresRedoProcess::launch(conf, tenant_id) { loop { - let (id, data) = rx.recv().unwrap(); - if proc.send(id, data, &senders).is_err() { + let (sender, data) = rx.recv().unwrap(); + if proc.send(sender, data).is_err() { break; } - while let Ok((id, data)) = rx.try_recv() { - if proc.send(id, data, &senders).is_err() { + while let Ok((sender, data)) = rx.try_recv() { + if proc.send(sender, data).is_err() { break; } } - if proc.receive(&senders).is_err() { + if proc.receive().is_err() { break; } } @@ -234,11 +220,7 @@ impl PostgresRedoManager { panic!("Failed to launch wal-redo postgres"); }); - PostgresRedoManager { - sender: tx, - receivers, - round_robin: AtomicUsize::new(0), - } + PostgresRedoManager { sender: tx } } fn apply_wal_records( @@ -273,10 +255,8 @@ impl PostgresRedoManager { WAL_REDO_RECORDS_HISTOGRAM.observe(records.len() as f64); WAL_REDO_BYTES_HISTOGRAM.observe(writebuf.len() as f64); - - let id = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_CHANNELS; - let rx = self.receivers[id].lock().unwrap(); - self.sender.send((id, writebuf)).unwrap(); + let (tx, rx) = mpsc::channel(); + self.sender.send((tx, writebuf)).unwrap(); Ok(rx.recv().unwrap()) } @@ -606,7 +586,7 @@ struct PostgresRedoProcess { stderr: ChildStderr, wal_redo_timeout: Duration, // Ring buffer with channel identifiers of buffered redo requests - ring_buf: [ChannelId; N_CHANNELS], + ring_buf: [Option>; N_CHANNELS], // Head position in ring buffer ring_buf_pos: usize, // Number of buffered requests @@ -621,7 +601,7 @@ struct PostgresRedoProcess { impl PostgresRedoProcess { #[instrument(skip_all,fields(tenant_id=%self.tenant_id))] - fn receive(&mut self, senders: &[Sender]) -> Result<(), Error> { + fn receive(&mut self) -> Result<(), Error> { while self.n_buffered != 0 { let n = loop { match nix::poll::poll( @@ -662,13 +642,11 @@ impl PostgresRedoProcess { self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?; if self.page_pos == BLCKSZ as usize { assert!(self.n_buffered != 0); - let id = self.ring_buf[self.ring_buf_pos]; + let sender = self.ring_buf[self.ring_buf_pos].take().unwrap(); self.n_buffered -= 1; self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS; self.page_pos = 0; - senders[id] - .send(Bytes::copy_from_slice(&self.page)) - .unwrap(); + sender.send(Bytes::copy_from_slice(&self.page)).unwrap(); } } else if out_revents.contains(PollFlags::POLLHUP) { return Err(Error::new( @@ -681,16 +659,11 @@ impl PostgresRedoProcess { } #[instrument(skip_all,fields(tenant_id=%self.tenant_id))] - fn send( - &mut self, - id: ChannelId, - data: Vec, - senders: &[Sender], - ) -> Result<(), Error> { + fn send(&mut self, sender: Sender, data: Vec) -> Result<(), Error> { let mut written = 0usize; let data_len = data.len(); assert!(self.n_buffered < N_CHANNELS); - self.ring_buf[(self.ring_buf_pos + self.n_buffered) % N_CHANNELS] = id; + self.ring_buf[(self.ring_buf_pos + self.n_buffered) % N_CHANNELS] = Some(sender); self.n_buffered += 1; while written < data_len { let n = loop { @@ -744,13 +717,11 @@ impl PostgresRedoProcess { self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?; if self.page_pos == BLCKSZ as usize { assert!(self.n_buffered != 0); - let id = self.ring_buf[self.ring_buf_pos]; + let sender = self.ring_buf[self.ring_buf_pos].take().unwrap(); self.n_buffered -= 1; self.ring_buf_pos = (self.ring_buf_pos + 1) % N_CHANNELS; self.page_pos = 0; - senders[id] - .send(Bytes::copy_from_slice(&self.page)) - .unwrap(); + sender.send(Bytes::copy_from_slice(&self.page)).unwrap(); } } else if out_revents.contains(PollFlags::POLLHUP) { return Err(Error::new( @@ -891,7 +862,7 @@ impl PostgresRedoProcess { stdout, stderr, wal_redo_timeout: conf.wal_redo_timeout, - ring_buf: [0; N_CHANNELS], + ring_buf: Default::default(), ring_buf_pos: 0, n_buffered: 0, page: [0u8; BLCKSZ as usize],