mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 08:09:58 +00:00
safekeeper: send AppendResponse on segment flush (#9692)
## Problem When processing pipelined `AppendRequest`s, we explicitly flush the WAL every second and return an `AppendResponse`. However, the WAL is also implicitly flushed on segment bounds, but this does not result in an `AppendResponse`. Because of this, concurrent transactions may take up to 1 second to commit and writes may take up to 1 second before sending to the pageserver. ## Summary of changes Advance `flush_lsn` when a WAL segment is closed and flushed, and emit an `AppendResponse`. To accommodate this, track the `flush_lsn` in addition to the `flush_record_lsn`.
This commit is contained in:
@@ -562,6 +562,9 @@ impl WalAcceptor {
|
||||
// Don't flush the WAL on every append, only periodically via flush_ticker.
|
||||
// This batches multiple appends per fsync. If the channel is empty after
|
||||
// sending the reply, we'll schedule an immediate flush.
|
||||
//
|
||||
// Note that a flush can still happen on segment bounds, which will result
|
||||
// in an AppendResponse.
|
||||
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
|
||||
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
dirty = true;
|
||||
|
||||
@@ -947,6 +947,7 @@ where
|
||||
// while first connection still gets some packets later. It might be
|
||||
// better to not log this as error! above.
|
||||
let write_lsn = self.wal_store.write_lsn();
|
||||
let flush_lsn = self.wal_store.flush_lsn();
|
||||
if write_lsn > msg.h.begin_lsn {
|
||||
bail!(
|
||||
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
|
||||
@@ -1004,7 +1005,9 @@ where
|
||||
);
|
||||
|
||||
// If flush_lsn hasn't updated, AppendResponse is not very useful.
|
||||
if !require_flush {
|
||||
// This is the common case for !require_flush, but a flush can still
|
||||
// happen on segment bounds.
|
||||
if !require_flush && flush_lsn == self.flush_lsn() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
|
||||
@@ -113,6 +113,13 @@ pub struct PhysicalStorage {
|
||||
/// non-aligned chunks of data.
|
||||
write_record_lsn: Lsn,
|
||||
|
||||
/// The last LSN flushed to disk. May be in the middle of a record.
|
||||
///
|
||||
/// NB: when the rest of the system refers to `flush_lsn`, it usually
|
||||
/// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
|
||||
/// and should be resolved.
|
||||
flush_lsn: Lsn,
|
||||
|
||||
/// The LSN of the last WAL record flushed to disk.
|
||||
flush_record_lsn: Lsn,
|
||||
|
||||
@@ -211,6 +218,7 @@ impl PhysicalStorage {
|
||||
system_id: state.server.system_id,
|
||||
write_lsn,
|
||||
write_record_lsn: write_lsn,
|
||||
flush_lsn,
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
|
||||
file: None,
|
||||
@@ -295,8 +303,9 @@ impl PhysicalStorage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment.
|
||||
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
|
||||
/// segment was completed, closed, and flushed to disk.
|
||||
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
|
||||
let mut file = if let Some(file) = self.file.take() {
|
||||
file
|
||||
} else {
|
||||
@@ -320,20 +329,24 @@ impl PhysicalStorage {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
|
||||
fs::rename(wal_file_partial_path, wal_file_path).await?;
|
||||
Ok(true)
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
self.file = Some(file);
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes WAL to the segment files, until everything is writed. If some segments
|
||||
/// are fully written, they are flushed to disk. The last (partial) segment can
|
||||
/// be flushed separately later.
|
||||
///
|
||||
/// Updates `write_lsn`.
|
||||
/// Updates `write_lsn` and `flush_lsn`.
|
||||
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
// TODO: this shouldn't be possible, except possibly with write_lsn == 0.
|
||||
// Rename this method to `append_exact`, and make it append-only, removing
|
||||
// the `pos` parameter and this check. For this reason, we don't update
|
||||
// `flush_lsn` here.
|
||||
if self.write_lsn != pos {
|
||||
// need to flush the file before discarding it
|
||||
if let Some(file) = self.file.take() {
|
||||
@@ -355,9 +368,13 @@ impl PhysicalStorage {
|
||||
buf.len()
|
||||
};
|
||||
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
|
||||
let flushed = self
|
||||
.write_in_segment(segno, xlogoff, &buf[..bytes_write])
|
||||
.await?;
|
||||
self.write_lsn += bytes_write as u64;
|
||||
if flushed {
|
||||
self.flush_lsn = self.write_lsn;
|
||||
}
|
||||
buf = &buf[bytes_write..];
|
||||
}
|
||||
|
||||
@@ -371,6 +388,9 @@ impl Storage for PhysicalStorage {
|
||||
self.write_lsn
|
||||
}
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
///
|
||||
/// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
|
||||
#[allow(clippy::misnamed_getters)]
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
@@ -424,8 +444,9 @@ impl Storage for PhysicalStorage {
|
||||
self.metrics.observe_write_seconds(write_seconds);
|
||||
self.metrics.observe_write_bytes(buf.len());
|
||||
|
||||
// figure out last record's end lsn for reporting (if we got the
|
||||
// whole record)
|
||||
// Figure out the last record's end LSN and update `write_record_lsn`
|
||||
// (if we got a whole record). The write may also have closed and
|
||||
// flushed a segment, so update `flush_record_lsn` as well.
|
||||
if self.decoder.available() != startpos {
|
||||
info!(
|
||||
"restart decoder from {} to {}",
|
||||
@@ -436,12 +457,15 @@ impl Storage for PhysicalStorage {
|
||||
self.decoder = WalStreamDecoder::new(startpos, pg_version);
|
||||
}
|
||||
self.decoder.feed_bytes(buf);
|
||||
loop {
|
||||
match self.decoder.poll_decode()? {
|
||||
None => break, // no full record yet
|
||||
Some((lsn, _rec)) => {
|
||||
self.write_record_lsn = lsn;
|
||||
}
|
||||
|
||||
if self.write_record_lsn <= self.flush_lsn {
|
||||
// We may have flushed a previously written record.
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
}
|
||||
while let Some((lsn, _rec)) = self.decoder.poll_decode()? {
|
||||
self.write_record_lsn = lsn;
|
||||
if lsn <= self.flush_lsn {
|
||||
self.flush_record_lsn = lsn;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -458,19 +482,17 @@ impl Storage for PhysicalStorage {
|
||||
self.fdatasync_file(&unflushed_file).await?;
|
||||
self.file = Some(unflushed_file);
|
||||
} else {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file.
|
||||
// This should only happen if last file was fully written and flushed,
|
||||
// but haven't updated flush_lsn yet.
|
||||
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
|
||||
bail!(
|
||||
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
|
||||
self.write_lsn,
|
||||
self.flush_record_lsn
|
||||
);
|
||||
}
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file. This
|
||||
// shouldn't happen, since the segment is flushed on close.
|
||||
bail!(
|
||||
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
|
||||
self.write_lsn,
|
||||
self.flush_record_lsn
|
||||
);
|
||||
}
|
||||
|
||||
// everything is flushed now, let's update flush_lsn
|
||||
self.flush_lsn = self.write_lsn;
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
Ok(())
|
||||
}
|
||||
@@ -517,6 +539,7 @@ impl Storage for PhysicalStorage {
|
||||
self.pending_wal_truncation = true;
|
||||
|
||||
self.write_lsn = end_pos;
|
||||
self.flush_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.flush_record_lsn = end_pos;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user