Fix wal safekeeper's reply to IDENTIFY_SYSTEM command.

The PostgreSQL FE/BE RowDescription message was built incorrectly,
the colums were sent in wrong order, and the command tag was missing
NULL-terminator. With these fixes, 'psql' understands the reply and
shows it correctly.
This commit is contained in:
Heikki Linnakangas
2021-04-16 19:53:11 +03:00
parent c5d56ffe22
commit 583f64768f
2 changed files with 17 additions and 16 deletions

View File

@@ -146,20 +146,20 @@ impl<'a> BeMessage<'a> {
BeMessage::RowDescription(rows) => {
buf.put_u8(b'T');
let total_len: u32 = rows
.iter()
.fold(0, |acc, row| acc + row.name.len() as u32 + 3 * (4 + 2));
buf.put_u32(4 + 2 + total_len);
let mut body = BytesMut::new();
body.put_i16(rows.len() as i16); // # of fields
for row in rows.iter() {
buf.put_i16(row.name.len() as i16);
buf.put_slice(row.name);
buf.put_i32(0); /* table oid */
buf.put_i16(0); /* attnum */
buf.put_u32(row.typoid);
buf.put_i16(row.typlen);
buf.put_i32(-1); /* typmod */
buf.put_i16(0); /* format code */
body.put_slice(row.name);
body.put_i32(0); /* table oid */
body.put_i16(0); /* attnum */
body.put_u32(row.typoid);
body.put_i16(row.typlen);
body.put_i32(-1); /* typmod */
body.put_i16(0); /* format code */
}
buf.put_i32((4 + body.len()) as i32); // # of bytes, including len field itself
buf.put(body);
}
BeMessage::DataRow(vals) => {

View File

@@ -366,7 +366,7 @@ pub fn thread_main(conf: WalAcceptorConf) {
info!("Starting wal acceptor on {}", conf.listen_addr);
runtime.block_on(async {
let _unused = main_loop(&conf).await;
main_loop(&conf).await.unwrap();
});
}
@@ -443,7 +443,7 @@ impl System {
return shared_state.hs_feedback;
}
// Load and lock control file (prevent running more than one instance of safekeeper
// Load and lock control file (prevent running more than one instance of safekeeper)
fn load_control_file(&self, conf: &WalAcceptorConf) {
let control_file_path = conf
.data_dir
@@ -678,6 +678,7 @@ impl Connection {
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
if let Err(e) = self.request_callback().await {
// Do not treate it as fatal error and continue work
// FIXME: we should retry after a while...
error!("Failed to send callme request to pageserver: {}", e);
}
@@ -893,11 +894,11 @@ impl Connection {
);
BeMessage::write(
&mut self.outbuf,
&BeMessage::DataRow(&[Some(lsn_bytes), Some(tli_bytes), Some(sysid_bytes), None]),
&BeMessage::DataRow(&[Some(sysid_bytes), Some(tli_bytes), Some(lsn_bytes), None]),
);
BeMessage::write(
&mut self.outbuf,
&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"),
&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"),
);
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await?;