From 634be4f4e06f67444524aca33c1f1b5b29b8d16c Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 6 Jul 2023 18:11:43 +0400 Subject: [PATCH] Fix async write in safekeepers. General Rust Write trait semantics (as well as its async brother) is that write definitely happens only after Write::flush(). This wasn't needed in sync where rust write calls the syscall directly, but is required in async. Also fix setting initial end_pos in walsender, sometimes it was from the future. fixes https://github.com/neondatabase/neon/issues/4518 --- safekeeper/src/control_file.rs | 6 ++++++ safekeeper/src/pull_timeline.rs | 1 + safekeeper/src/send_wal.rs | 14 +++++++++----- safekeeper/src/wal_storage.rs | 5 +++++ 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 6c4ad24323..653e938bb7 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -191,6 +191,12 @@ impl Storage for FileStorage { control_partial_path.display() ) })?; + control_partial.flush().await.with_context(|| { + format!( + "failed to flush safekeeper state into control file at: {}", + control_partial_path.display() + ) + })?; // fsync the file if !self.conf.no_sync { diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 61ba37efaa..8d8ef6192c 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -188,6 +188,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result let mut response = client.get(&http_url).send().await?; while let Some(chunk) = response.chunk().await? { file.write_all(&chunk).await?; + file.flush().await?; } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index abca0a86b1..92a7bb703a 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -403,16 +403,18 @@ impl SafekeeperPostgresHandler { }; // take the latest commit_lsn if don't have stop_pos - let mut end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow()); + let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow()); if end_pos < start_pos { - warn!("start_pos {} is ahead of end_pos {}", start_pos, end_pos); - end_pos = start_pos; + warn!( + "requested start_pos {} is ahead of available WAL end_pos {}", + start_pos, end_pos + ); } info!( - "starting streaming from {:?} till {:?}", - start_pos, stop_pos + "starting streaming from {:?} till {:?}, available WAL ends at {}", + start_pos, stop_pos, end_pos ); // switch to copy @@ -547,12 +549,14 @@ impl WalSender<'_, IO> { self.end_pos = *self.commit_lsn_watch_rx.borrow(); if self.end_pos > self.start_pos { // We have something to send. + trace!("got end_pos {:?}, streaming", self.end_pos); return Ok(()); } // Wait for WAL to appear, now self.end_pos == self.start_pos. if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? { self.end_pos = lsn; + trace!("got end_pos {:?}, streaming", self.end_pos); return Ok(()); } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 687e1ba6b6..61270a8d0b 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -248,6 +248,10 @@ impl PhysicalStorage { }; file.write_all(buf).await?; + // Note: flush just ensures write above reaches the OS (this is not + // needed in case of sync IO as Write::write there calls directly write + // syscall, but needed in case of async). It does *not* fsyncs the file. + file.flush().await?; if xlogoff + buf.len() == self.wal_seg_size { // If we reached the end of a WAL segment, flush and close it. @@ -716,6 +720,7 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { count -= XLOG_BLCKSZ; } file.write_all(&ZERO_BLOCK[0..count]).await?; + file.flush().await?; Ok(()) }