From 1aceea1bdd53018c395f64b29352f6fdfd6e0ca9 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 31 May 2021 18:32:41 +0300 Subject: [PATCH] Shutdown socket in ReplicationConn --- walkeeper/src/replication.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 4dc0717ca1..a9b1bf1f82 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -5,7 +5,7 @@ use crate::pq_protocol::{BeMessage, FeMessage}; use crate::send_wal::SendWalConn; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use bytes::{BufMut, Bytes, BytesMut}; use log::*; use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE}; @@ -14,9 +14,11 @@ use serde::{Deserialize, Serialize}; use std::cmp::min; use std::fs::File; use std::io::{BufReader, Read, Seek, SeekFrom, Write}; -use std::net::TcpStream; +use std::net::{Shutdown, TcpStream}; use std::path::Path; use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; use std::{str, thread}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; @@ -52,6 +54,14 @@ pub struct ReplicationConn { appname: Option, } +// Separate thread is reading keepalives from the socket. When main one +// finishes, tell it to get down by shutdowning the socket. +impl Drop for ReplicationConn { + fn drop(&mut self) { + let _res = self.stream_out.shutdown(Shutdown::Both); + } +} + impl ReplicationConn { /// Create a new `SendWal`, consuming the `Connection`. pub fn new(conn: SendWalConn) -> Self { @@ -131,9 +141,15 @@ impl ReplicationConn { let (mut start_pos, mut stop_pos) = Self::parse_start_stop(&cmd)?; - let wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize; - if wal_seg_size == 0 { - bail!("Can not start replication before connecting to wal_proposer"); + let mut wal_seg_size: usize; + loop { + wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize; + if wal_seg_size == 0 { + error!("Can not start replication before connecting to wal_proposer"); + sleep(Duration::from_secs(1)); + } else { + break; + } } let (wal_end, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false); if start_pos == Lsn(0) {