Extract appname from startup package

This commit is contained in:
Konstantin Knizhnik
2021-04-28 15:26:08 +03:00
parent 421d586953
commit bbec5a13bd
2 changed files with 18 additions and 8 deletions

View File

@@ -40,6 +40,7 @@ pub struct FeStartupMessage {
pub version: u32,
pub kind: StartupRequestCode,
pub timelineid: ZTimelineId,
pub appname: Option<String>,
}
#[derive(Debug)]
@@ -86,6 +87,7 @@ impl FeStartupMessage {
let params = params_str.split('\0');
let mut options = false;
let mut timelineid: Option<ZTimelineId> = None;
let mut appname: Option<String> = None;
for p in params {
if p == "options" {
options = true;
@@ -94,8 +96,9 @@ impl FeStartupMessage {
if let Some(ztimelineid_str) = opt.strip_prefix("ztimelineid=") {
// FIXME: rethrow parsing error, don't unwrap
timelineid = Some(ZTimelineId::from_str(ztimelineid_str).unwrap());
break;
}
} else if let Some(val) = opt.strip_prefix("application_name=") {
appname = Some(val.to_string());
}
}
break;
}
@@ -111,6 +114,7 @@ impl FeStartupMessage {
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
version,
kind,
appname,
timelineid: timelineid.unwrap(),
})))
}

View File

@@ -158,11 +158,12 @@ pub struct Timeline {
#[derive(Debug)]
struct Connection {
timeline: Option<Arc<Timeline>>,
stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */
init_done: bool, /* startup packet proceeded */
conf: WalAcceptorConf, /* wal acceptor configuration */
stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */
init_done: bool, /* startup packet proceeded */
appname: Option<String>,/* assigned application name */
conf: WalAcceptorConf, /* wal acceptor configuration */
}
/*
@@ -543,6 +544,7 @@ impl Connection {
inbuf: BytesMut::with_capacity(10 * 1024),
outbuf: BytesMut::with_capacity(10 * 1024),
init_done: false,
appname: None,
conf: conf.clone(),
}
}
@@ -855,6 +857,7 @@ impl Connection {
self.send().await?;
self.init_done = true;
self.set_timeline(m.timelineid)?;
self.appname = m.appname;
}
StartupRequestCode::Cancel => return Ok(()),
}
@@ -938,7 +941,7 @@ impl Connection {
let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap());
let cap = caps.next().unwrap();
let mut start_pos: XLogRecPtr = (parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])?;
let stop_pos: XLogRecPtr = if let Some(cap) = caps.next() {
let mut stop_pos: XLogRecPtr = if let Some(cap) = caps.next() {
(parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])?
} else {
0
@@ -951,6 +954,9 @@ impl Connection {
if start_pos == 0 {
start_pos = wal_end;
}
if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) {
stop_pos = wal_end;
}
info!(
"Start replication from {:X}/{:>08X} till {:X}/{:>08X}",
(start_pos >> 32) as u32,