From bbec5a13bd09a00f4de82cb3d4444c997433c78a Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 28 Apr 2021 15:26:08 +0300 Subject: [PATCH] Extract appname from startup package --- walkeeper/src/pq_protocol.rs | 8 ++++++-- walkeeper/src/wal_service.rs | 18 ++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index 57517c322f..e959548a10 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -40,6 +40,7 @@ pub struct FeStartupMessage { pub version: u32, pub kind: StartupRequestCode, pub timelineid: ZTimelineId, + pub appname: Option, } #[derive(Debug)] @@ -86,6 +87,7 @@ impl FeStartupMessage { let params = params_str.split('\0'); let mut options = false; let mut timelineid: Option = None; + let mut appname: Option = None; for p in params { if p == "options" { options = true; @@ -94,8 +96,9 @@ impl FeStartupMessage { if let Some(ztimelineid_str) = opt.strip_prefix("ztimelineid=") { // FIXME: rethrow parsing error, don't unwrap timelineid = Some(ZTimelineId::from_str(ztimelineid_str).unwrap()); - break; - } + } else if let Some(val) = opt.strip_prefix("application_name=") { + appname = Some(val.to_string()); + } } break; } @@ -111,6 +114,7 @@ impl FeStartupMessage { Ok(Some(FeMessage::StartupMessage(FeStartupMessage { version, kind, + appname, timelineid: timelineid.unwrap(), }))) } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index dd894ee483..e9f58cfb0d 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -158,11 +158,12 @@ pub struct Timeline { #[derive(Debug)] struct Connection { timeline: Option>, - stream: TcpStream, /* Postgres connection */ - inbuf: BytesMut, /* input buffer */ - outbuf: BytesMut, /* output buffer */ - init_done: bool, /* startup packet proceeded */ - conf: WalAcceptorConf, /* wal acceptor configuration */ + stream: TcpStream, /* Postgres connection */ + inbuf: BytesMut, /* input buffer */ + outbuf: BytesMut, /* output buffer */ + init_done: bool, /* startup packet proceeded */ + appname: Option,/* assigned application name */ + conf: WalAcceptorConf, /* wal acceptor configuration */ } /* @@ -543,6 +544,7 @@ impl Connection { inbuf: BytesMut::with_capacity(10 * 1024), outbuf: BytesMut::with_capacity(10 * 1024), init_done: false, + appname: None, conf: conf.clone(), } } @@ -855,6 +857,7 @@ impl Connection { self.send().await?; self.init_done = true; self.set_timeline(m.timelineid)?; + self.appname = m.appname; } StartupRequestCode::Cancel => return Ok(()), } @@ -938,7 +941,7 @@ impl Connection { let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap()); let cap = caps.next().unwrap(); let mut start_pos: XLogRecPtr = (parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])?; - let stop_pos: XLogRecPtr = if let Some(cap) = caps.next() { + let mut stop_pos: XLogRecPtr = if let Some(cap) = caps.next() { (parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])? } else { 0 @@ -951,6 +954,9 @@ impl Connection { if start_pos == 0 { start_pos = wal_end; } + if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) { + stop_pos = wal_end; + } info!( "Start replication from {:X}/{:>08X} till {:X}/{:>08X}", (start_pos >> 32) as u32,