diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index 286d563b73..299b830d5e 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -146,20 +146,20 @@ impl<'a> BeMessage<'a> { BeMessage::RowDescription(rows) => { buf.put_u8(b'T'); - let total_len: u32 = rows - .iter() - .fold(0, |acc, row| acc + row.name.len() as u32 + 3 * (4 + 2)); - buf.put_u32(4 + 2 + total_len); + + let mut body = BytesMut::new(); + body.put_i16(rows.len() as i16); // # of fields for row in rows.iter() { - buf.put_i16(row.name.len() as i16); - buf.put_slice(row.name); - buf.put_i32(0); /* table oid */ - buf.put_i16(0); /* attnum */ - buf.put_u32(row.typoid); - buf.put_i16(row.typlen); - buf.put_i32(-1); /* typmod */ - buf.put_i16(0); /* format code */ + body.put_slice(row.name); + body.put_i32(0); /* table oid */ + body.put_i16(0); /* attnum */ + body.put_u32(row.typoid); + body.put_i16(row.typlen); + body.put_i32(-1); /* typmod */ + body.put_i16(0); /* format code */ } + buf.put_i32((4 + body.len()) as i32); // # of bytes, including len field itself + buf.put(body); } BeMessage::DataRow(vals) => { diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 5570781123..6e17f41f06 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -366,7 +366,7 @@ pub fn thread_main(conf: WalAcceptorConf) { info!("Starting wal acceptor on {}", conf.listen_addr); runtime.block_on(async { - let _unused = main_loop(&conf).await; + main_loop(&conf).await.unwrap(); }); } @@ -443,7 +443,7 @@ impl System { return shared_state.hs_feedback; } - // Load and lock control file (prevent running more than one instance of safekeeper + // Load and lock control file (prevent running more than one instance of safekeeper) fn load_control_file(&self, conf: &WalAcceptorConf) { let control_file_path = conf .data_dir @@ -678,6 +678,7 @@ impl Connection { // Add far as replication in postgres is initiated by receiver, we should use callme mechanism if let Err(e) = self.request_callback().await { // Do not treate it as fatal error and continue work + // FIXME: we should retry after a while... error!("Failed to send callme request to pageserver: {}", e); } @@ -893,11 +894,11 @@ impl Connection { ); BeMessage::write( &mut self.outbuf, - &BeMessage::DataRow(&[Some(lsn_bytes), Some(tli_bytes), Some(sysid_bytes), None]), + &BeMessage::DataRow(&[Some(sysid_bytes), Some(tli_bytes), Some(lsn_bytes), None]), ); BeMessage::write( &mut self.outbuf, - &BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"), + &BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"), ); BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); self.send().await?;