Compare commits

...

21 Commits

Author SHA1 Message Date
Alexey Kondratov
58ee590bfd cargo fmt 2021-10-20 14:01:00 +03:00
Alexey Kondratov
78773ee2c1 Bump vendor/postgres 2021-10-20 13:59:42 +03:00
Konstantin Knizhnik
a6d096a874 Merge with main 2021-10-20 13:19:50 +03:00
Konstantin Knizhnik
1ae336c356 Fix type in comment 2021-10-19 17:42:29 +03:00
Konstantin Knizhnik
57da296356 Fix race condition in add_replica 2021-10-19 12:01:24 +03:00
Konstantin Knizhnik
4ee30b6c36 Merge with main 2021-10-08 14:47:00 +03:00
Konstantin Knizhnik
4d90f2f316 Add separate back pressure threashold for write/flush LSNs 2021-09-30 19:55:38 +03:00
Konstantin Knizhnik
fc774c819e MReduce default checkpoint distance to 64Mb 2021-09-30 17:32:57 +03:00
Konstantin Knizhnik
04a6652f16 Bump postgres version 2021-09-22 16:02:12 +03:00
Konstantin Knizhnik
25ca847295 Remove extra tracing 2021-09-22 11:07:27 +03:00
Konstantin Knizhnik
b472e28f3a Fix indentation 2021-09-22 10:40:31 +03:00
Konstantin Knizhnik
e28960b816 Fix zerialization of KeepAlive message 2021-09-22 10:40:31 +03:00
Konstantin Knizhnik
379f6b8638 Fix clippy errors 2021-09-22 10:40:31 +03:00
Konstantin Knizhnik
018408e0af Fix clippy error 2021-09-22 10:40:31 +03:00
Konstantin Knizhnik
bf5d17cbaa Request disk consistent LSN through safekeeper 2021-09-22 10:40:31 +03:00
Konstantin Knizhnik
ae2232641d Eliminate warnings 2021-09-22 10:40:22 +03:00
Konstantin Knizhnik
891633f2cd Bump postgres version 2021-09-22 10:36:14 +03:00
Konstantin Knizhnik
6056ce7602 Implement backpressure for compute node to avoid WAL overflow 2021-09-22 10:36:14 +03:00
Konstantin Knizhnik
e53dd5240a Adjust default parameters for chekpointer to avoid OOM 2021-09-22 10:35:17 +03:00
Konstantin Knizhnik
611abc5299 Use checkpoint_distance instead of OLDEST_INMEM_DISTANCE 2021-09-22 10:35:17 +03:00
Konstantin Knizhnik
09cced0855 Delay adding new layers if checpointing is too slow to avoid OOM 2021-09-22 10:35:17 +03:00
9 changed files with 165 additions and 71 deletions

View File

@@ -300,7 +300,7 @@ impl PostgresNode {
conf.append("shared_buffers", "1MB"); conf.append("shared_buffers", "1MB");
conf.append("fsync", "off"); conf.append("fsync", "off");
conf.append("max_connections", "100"); conf.append("max_connections", "100");
conf.append("wal_sender_timeout", "0"); conf.append("wal_sender_timeout", "10s");
conf.append("wal_level", "replica"); conf.append("wal_level", "replica");
conf.append("listen_addresses", &self.address.ip().to_string()); conf.append("listen_addresses", &self.address.ip().to_string());
conf.append("port", &self.address.port().to_string()); conf.append("port", &self.address.port().to_string());

View File

@@ -695,8 +695,8 @@ impl Timeline for LayeredTimeline {
.wait_for_timeout(lsn, TIMEOUT) .wait_for_timeout(lsn, TIMEOUT)
.with_context(|| { .with_context(|| {
format!( format!(
"Timed out while waiting for WAL record at LSN {} to arrive", "Timed out while waiting for WAL record at LSN {} to arrive, disk consistent LSN={}",
lsn lsn, self.get_disk_consistent_lsn()
) )
})?; })?;
@@ -910,6 +910,10 @@ impl Timeline for LayeredTimeline {
Ok(total_blocks * BLCKSZ as usize) Ok(total_blocks * BLCKSZ as usize)
} }
fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn.load()
}
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> { fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
Box::new(LayeredTimelineWriter { Box::new(LayeredTimelineWriter {
tl: self, tl: self,

View File

@@ -134,6 +134,7 @@ pub trait Timeline: Send + Sync {
fn get_last_record_lsn(&self) -> Lsn; fn get_last_record_lsn(&self) -> Lsn;
fn get_prev_record_lsn(&self) -> Lsn; fn get_prev_record_lsn(&self) -> Lsn;
fn get_start_lsn(&self) -> Lsn; fn get_start_lsn(&self) -> Lsn;
fn get_disk_consistent_lsn(&self) -> Lsn;
/// Mutate the timeline with a [`TimelineWriter`]. /// Mutate the timeline with a [`TimelineWriter`].
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a>; fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a>;

View File

@@ -284,12 +284,14 @@ fn walreceiver_main(
if let Some(last_lsn) = status_update { if let Some(last_lsn) = status_update {
// TODO: More thought should go into what values are sent here. // TODO: More thought should go into what values are sent here.
let last_lsn = PgLsn::from(u64::from(last_lsn)); let last_lsn = PgLsn::from(u64::from(last_lsn));
let write_lsn = last_lsn; // We are using disk consistent LSN as `write_lsn`, i.e. LSN at which page server
// may guarantee persistence of all received data. Safekeeper is not free to remove
// WAL preceding `write_lsn`: it should not be requested by this page server.
let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
let flush_lsn = last_lsn; let flush_lsn = last_lsn;
let apply_lsn = PgLsn::from(0); let apply_lsn = PgLsn::from(0);
let ts = SystemTime::now(); let ts = SystemTime::now();
const NO_REPLY: u8 = 0; const NO_REPLY: u8 = 0;
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
} }

View File

@@ -2,7 +2,7 @@
//! with the "START_REPLICATION" message. //! with the "START_REPLICATION" message.
use crate::send_wal::SendWalHandler; use crate::send_wal::SendWalHandler;
use crate::timeline::{Timeline, TimelineTools}; use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use bytes::Bytes; use bytes::Bytes;
use log::*; use log::*;
@@ -20,7 +20,7 @@ use std::{str, thread};
use zenith_utils::bin_ser::BeSer; use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeMessage, XLogDataBody}; use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody};
use zenith_utils::sock_split::ReadStream; use zenith_utils::sock_split::ReadStream;
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
@@ -32,7 +32,7 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
type FullTransactionId = u64; type FullTransactionId = u64;
/// Hot standby feedback received from replica /// Hot standby feedback received from replica
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct HotStandbyFeedback { pub struct HotStandbyFeedback {
pub ts: TimestampTz, pub ts: TimestampTz,
pub xmin: FullTransactionId, pub xmin: FullTransactionId,
@@ -49,6 +49,16 @@ impl HotStandbyFeedback {
} }
} }
/// Standby status update
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // disk consistent lSN
pub flush_lsn: Lsn, // LSN committedby quorum
pub apply_lsn: Lsn, // not used
pub reply_ts: TimestampTz,
pub reply_requested: bool,
}
/// A network connection that's speaking the replication protocol. /// A network connection that's speaking the replication protocol.
pub struct ReplicationConn { pub struct ReplicationConn {
/// This is an `Option` because we will spawn a background thread that will /// This is an `Option` because we will spawn a background thread that will
@@ -56,16 +66,15 @@ pub struct ReplicationConn {
stream_in: Option<ReadStream>, stream_in: Option<ReadStream>,
} }
// TODO: move this to crate::timeline when there's more users /// Scope guard to unregister replication connection from timeline
// TODO: design a proper Timeline mock api struct ReplicationConnGuard {
trait HsFeedbackSubscriber { replica: usize, // replica internal ID assigned by timeline
fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {} timeline: Arc<Timeline>,
} }
impl HsFeedbackSubscriber for Arc<Timeline> { impl Drop for ReplicationConnGuard {
#[inline(always)] fn drop(&mut self) {
fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { self.timeline.update_replica_state(self.replica, None);
Timeline::add_hs_feedback(self, feedback);
} }
} }
@@ -79,26 +88,33 @@ impl ReplicationConn {
/// Handle incoming messages from the network. /// Handle incoming messages from the network.
/// This is spawned into the background by `handle_start_replication`. /// This is spawned into the background by `handle_start_replication`.
fn background_thread( fn background_thread(mut stream_in: impl Read, timeline: Arc<Timeline>) -> Result<()> {
mut stream_in: impl Read, let mut state = ReplicaState::new();
subscriber: impl HsFeedbackSubscriber, let replica = timeline.add_replica(state);
) -> Result<()> { let _guard = ReplicationConnGuard {
replica,
timeline: timeline.clone(),
};
// Wait for replica's feedback. // Wait for replica's feedback.
while let Some(msg) = FeMessage::read(&mut stream_in)? { while let Some(msg) = FeMessage::read(&mut stream_in)? {
match &msg { match &msg {
FeMessage::CopyData(m) => { FeMessage::CopyData(m) => {
// There's two possible data messages that the client is supposed to send here: // There's two possible data messages that the client is supposed to send here:
// `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby // `HotStandbyFeedback` and `StandbyStatusUpdate`.
// feedback.
match m.first().cloned() { match m.first().cloned() {
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
// Note: deserializing is on m[1..] because we skip the tag byte. // Note: deserializing is on m[1..] because we skip the tag byte.
let feedback = HotStandbyFeedback::des(&m[1..]) state.hs_feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?; .context("failed to deserialize HotStandbyFeedback")?;
subscriber.add_hs_feedback(feedback); timeline.update_replica_state(replica, Some(state));
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
.context("failed to deserialize StandbyReply")?;
state.disk_consistent_lsn = reply.write_lsn;
timeline.update_replica_state(replica, Some(state));
} }
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (),
_ => warn!("unexpected message {:?}", msg), _ => warn!("unexpected message {:?}", msg),
} }
} }
@@ -187,7 +203,7 @@ impl ReplicationConn {
// switch to copy // switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?; pgb.write_message(&BeMessage::CopyBothResponse)?;
let mut end_pos: Lsn; let mut end_pos = Lsn(0);
let mut wal_file: Option<File> = None; let mut wal_file: Option<File> = None;
loop { loop {
@@ -202,7 +218,18 @@ impl ReplicationConn {
} else { } else {
/* normal mode */ /* normal mode */
let timeline = swh.timeline.get(); let timeline = swh.timeline.get();
end_pos = timeline.wait_for_lsn(start_pos); if let Some(lsn) = timeline.wait_for_lsn(start_pos) {
end_pos = lsn
} else {
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
}
} }
if end_pos == END_REPLICATION_MARKER { if end_pos == END_REPLICATION_MARKER {
break; break;
@@ -257,18 +284,3 @@ impl ReplicationConn {
Ok(()) Ok(())
} }
} }
#[cfg(test)]
mod tests {
use super::*;
// A no-op impl for tests
impl HsFeedbackSubscriber for () {}
#[test]
fn test_replication_conn_background_thread_eof() {
// Test that background_thread recognizes EOF
let stream: &[u8] = &[];
ReplicationConn::background_thread(stream, ()).unwrap();
}
}

View File

@@ -191,6 +191,8 @@ pub struct AppendResponse {
// We report back our awareness about which WAL is committed, as this is // We report back our awareness about which WAL is committed, as this is
// a criterion for walproposer --sync mode exit // a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn, pub commit_lsn: Lsn,
// Min disk consistent lsn of pageservers (portion of WAL applied and written to the disk by pageservers)
pub disk_consistent_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback, pub hs_feedback: HotStandbyFeedback,
} }
@@ -458,6 +460,7 @@ where
epoch: self.s.acceptor_state.epoch, epoch: self.s.acceptor_state.epoch,
commit_lsn: Lsn(0), commit_lsn: Lsn(0),
flush_lsn: Lsn(0), flush_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(), hs_feedback: HotStandbyFeedback::empty(),
}; };
return Ok(AcceptorProposerMessage::AppendResponse(resp)); return Ok(AcceptorProposerMessage::AppendResponse(resp));
@@ -567,6 +570,7 @@ where
epoch: self.s.acceptor_state.epoch, epoch: self.s.acceptor_state.epoch,
flush_lsn: self.flush_lsn, flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn, commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by caller code to avoid bothering safekeeper // will be filled by caller code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(), hs_feedback: HotStandbyFeedback::empty(),
}; };

View File

@@ -11,9 +11,9 @@ use std::collections::HashMap;
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
use std::io::{Seek, SeekFrom, Write}; use std::io::{Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex}; use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use zenith_utils::bin_ser::LeSer; use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId}; use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER};
@@ -25,6 +25,35 @@ use crate::WalAcceptorConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
const CONTROL_FILE_NAME: &str = "safekeeper.control"; const CONTROL_FILE_NAME: &str = "safekeeper.control";
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// Replica status: host standby feedback + disk consistent lsn
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
/// combined disk_consistent_lsn of pageservers
pub disk_consistent_lsn: Lsn,
/// combined hot standby feedback from all replicas
pub hs_feedback: HotStandbyFeedback,
}
impl Default for ReplicaState {
fn default() -> Self {
Self::new()
}
}
impl ReplicaState {
pub fn new() -> ReplicaState {
ReplicaState {
disk_consistent_lsn: Lsn(u64::MAX),
hs_feedback: HotStandbyFeedback {
ts: 0,
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
}
}
}
/// Shared state associated with database instance (tenant) /// Shared state associated with database instance (tenant)
struct SharedState { struct SharedState {
@@ -33,8 +62,8 @@ struct SharedState {
/// For receiving-sending wal cooperation /// For receiving-sending wal cooperation
/// quorum commit LSN we've notified walsenders about /// quorum commit LSN we've notified walsenders about
notified_commit_lsn: Lsn, notified_commit_lsn: Lsn,
/// combined hot standby feedback from all replicas /// State of replicas
hs_feedback: HotStandbyFeedback, replicas: Vec<Option<ReplicaState>>,
} }
// A named boolean. // A named boolean.
@@ -45,6 +74,31 @@ pub enum CreateControlFile {
} }
impl SharedState { impl SharedState {
/// Get combined stateof all alive replicas
pub fn get_replicas_state(&self) -> ReplicaState {
let mut acc = ReplicaState::new();
for state in self.replicas.iter().flatten() {
acc.hs_feedback.ts = max(acc.hs_feedback.ts, state.hs_feedback.ts);
acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin);
acc.hs_feedback.catalog_xmin =
min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin);
acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn);
}
acc
}
/// Assign new replica ID. We choose first empty cell in the replicas vector
/// or extend the vector if there are not free items.
pub fn add_replica(&mut self, state: ReplicaState) -> usize {
if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) {
self.replicas[pos] = Some(state);
return pos;
}
let pos = self.replicas.len();
self.replicas.push(Some(state));
pos
}
/// Restore SharedState from control file. Locks the control file along the /// Restore SharedState from control file. Locks the control file along the
/// way to prevent running more than one instance of safekeeper on the same /// way to prevent running more than one instance of safekeeper on the same
/// data dir. /// data dir.
@@ -74,21 +128,10 @@ impl SharedState {
Ok(Self { Ok(Self {
notified_commit_lsn: Lsn(0), notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state), sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state),
hs_feedback: HotStandbyFeedback { replicas: Vec::new(),
ts: 0,
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
}) })
} }
/// Accumulate hot standby feedbacks from replicas
pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) {
self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin);
self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin);
self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts);
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper) /// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out. /// If create=false and file doesn't exist, bails out.
fn load_control_file( fn load_control_file(
@@ -178,20 +221,27 @@ impl Timeline {
} }
} }
/// Wait for an LSN to be committed. /// Timed wait for an LSN to be committed.
/// ///
/// Returns the last committed LSN, which will be at least /// Returns the last committed LSN, which will be at least
/// as high as the LSN waited for. /// as high as the LSN waited for, or None if timeout expired.
/// ///
pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn { pub fn wait_for_lsn(&self, lsn: Lsn) -> Option<Lsn> {
let mut shared_state = self.mutex.lock().unwrap(); let mut shared_state = self.mutex.lock().unwrap();
loop { loop {
let commit_lsn = shared_state.notified_commit_lsn; let commit_lsn = shared_state.notified_commit_lsn;
// This must be `>`, not `>=`. // This must be `>`, not `>=`.
if commit_lsn > lsn { if commit_lsn > lsn {
return commit_lsn; return Some(commit_lsn);
} }
shared_state = self.cond.wait(shared_state).unwrap(); let result = self
.cond
.wait_timeout(shared_state, POLL_STATE_TIMEOUT)
.unwrap();
if result.1.timed_out() {
return None;
}
shared_state = result.0
} }
} }
@@ -219,9 +269,11 @@ impl Timeline {
// commit_lsn if we are catching up safekeeper. // commit_lsn if we are catching up safekeeper.
commit_lsn = shared_state.sk.commit_lsn; commit_lsn = shared_state.sk.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg { if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {
resp.hs_feedback = shared_state.hs_feedback.clone(); let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
resp.disk_consistent_lsn = state.disk_consistent_lsn;
} }
} }
// Ping wal sender that new data might be available. // Ping wal sender that new data might be available.
@@ -233,15 +285,14 @@ impl Timeline {
self.mutex.lock().unwrap().sk.s.clone() self.mutex.lock().unwrap().sk.s.clone()
} }
// Accumulate hot standby feedbacks from replicas pub fn add_replica(&self, state: ReplicaState) -> usize {
pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) {
let mut shared_state = self.mutex.lock().unwrap(); let mut shared_state = self.mutex.lock().unwrap();
shared_state.add_hs_feedback(feedback); shared_state.add_replica(state)
} }
pub fn get_hs_feedback(&self) -> HotStandbyFeedback { pub fn update_replica_state(&self, id: usize, state: Option<ReplicaState>) {
let shared_state = self.mutex.lock().unwrap(); let mut shared_state = self.mutex.lock().unwrap();
shared_state.hs_feedback.clone() shared_state.replicas[id] = state;
} }
pub fn get_end_of_wal(&self) -> (Lsn, u32) { pub fn get_end_of_wal(&self) -> (Lsn, u32) {

View File

@@ -358,6 +358,7 @@ pub enum BeMessage<'a> {
RowDescription(&'a [RowDescriptor<'a>]), RowDescription(&'a [RowDescriptor<'a>]),
XLogData(XLogDataBody<'a>), XLogData(XLogDataBody<'a>),
NoticeResponse(String), NoticeResponse(String),
KeepAlive(WalSndKeepAlive),
} }
// One row desciption in RowDescription packet. // One row desciption in RowDescription packet.
@@ -409,6 +410,13 @@ pub struct XLogDataBody<'a> {
pub data: &'a [u8], pub data: &'a [u8],
} }
#[derive(Debug)]
pub struct WalSndKeepAlive {
pub sent_ptr: u64,
pub timestamp: i64,
pub request_reply: bool,
}
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]); pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
// single text column // single text column
@@ -721,6 +729,18 @@ impl<'a> BeMessage<'a> {
}) })
.unwrap(); .unwrap();
} }
BeMessage::KeepAlive(req) => {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'k');
buf.put_u64(req.sent_ptr);
buf.put_i64(req.timestamp);
buf.put_u8(if req.request_reply { 1u8 } else { 0u8 });
Ok::<_, io::Error>(())
})
.unwrap();
}
} }
Ok(()) Ok(())
} }