mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Add a test for EOF in walkeeper's background thread
It would be nice to have a proper Timeline mock api, but this time we'll get by with what we have.
This commit is contained in:
@@ -42,6 +42,19 @@ pub struct ReplicationConn {
|
||||
stream_in: Option<BufReader<TcpStream>>,
|
||||
}
|
||||
|
||||
// TODO: move this to crate::timeline when there's more users
|
||||
// TODO: design a proper Timeline mock api
|
||||
trait HsFeedbackSubscriber {
|
||||
fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {}
|
||||
}
|
||||
|
||||
impl HsFeedbackSubscriber for Arc<Timeline> {
|
||||
#[inline(always)]
|
||||
fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
|
||||
Timeline::add_hs_feedback(self, feedback);
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplicationConn {
|
||||
/// Create a new `ReplicationConn`
|
||||
pub fn new(pgb: &mut PostgresBackend) -> Self {
|
||||
@@ -52,13 +65,16 @@ impl ReplicationConn {
|
||||
|
||||
/// Handle incoming messages from the network.
|
||||
/// This is spawned into the background by `handle_start_replication`.
|
||||
fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
|
||||
fn background_thread(
|
||||
mut stream_in: impl Read,
|
||||
subscriber: impl HsFeedbackSubscriber,
|
||||
) -> Result<()> {
|
||||
// Wait for replica's feedback.
|
||||
while let Some(msg) = FeMessage::read(&mut stream_in)? {
|
||||
match msg {
|
||||
FeMessage::CopyData(m) => {
|
||||
let feedback = HotStandbyFeedback::des(&m)?;
|
||||
timeline.add_hs_feedback(feedback);
|
||||
subscriber.add_hs_feedback(feedback);
|
||||
}
|
||||
_ => {
|
||||
// We only handle `CopyData` messages. Anything else is ignored.
|
||||
@@ -212,3 +228,15 @@ impl ReplicationConn {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
mod tests {
|
||||
// A no-op impl for tests
|
||||
impl super::HsFeedbackSubscriber for () {}
|
||||
|
||||
#[test]
|
||||
fn test_replication_conn_background_thread_eof() {
|
||||
// Test that background_thread recognizes EOF
|
||||
let stream: &[u8] = &[];
|
||||
super::ReplicationConn::background_thread(stream, ()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user