diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 6c4ad24323..653e938bb7 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -191,6 +191,12 @@ impl Storage for FileStorage { control_partial_path.display() ) })?; + control_partial.flush().await.with_context(|| { + format!( + "failed to flush safekeeper state into control file at: {}", + control_partial_path.display() + ) + })?; // fsync the file if !self.conf.no_sync { diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 61ba37efaa..8d8ef6192c 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -188,6 +188,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result let mut response = client.get(&http_url).send().await?; while let Some(chunk) = response.chunk().await? { file.write_all(&chunk).await?; + file.flush().await?; } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index abca0a86b1..92a7bb703a 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -403,16 +403,18 @@ impl SafekeeperPostgresHandler { }; // take the latest commit_lsn if don't have stop_pos - let mut end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow()); + let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow()); if end_pos < start_pos { - warn!("start_pos {} is ahead of end_pos {}", start_pos, end_pos); - end_pos = start_pos; + warn!( + "requested start_pos {} is ahead of available WAL end_pos {}", + start_pos, end_pos + ); } info!( - "starting streaming from {:?} till {:?}", - start_pos, stop_pos + "starting streaming from {:?} till {:?}, available WAL ends at {}", + start_pos, stop_pos, end_pos ); // switch to copy @@ -547,12 +549,14 @@ impl WalSender<'_, IO> { self.end_pos = *self.commit_lsn_watch_rx.borrow(); if self.end_pos > self.start_pos { // We have something to send. + trace!("got end_pos {:?}, streaming", self.end_pos); return Ok(()); } // Wait for WAL to appear, now self.end_pos == self.start_pos. if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? { self.end_pos = lsn; + trace!("got end_pos {:?}, streaming", self.end_pos); return Ok(()); } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 687e1ba6b6..61270a8d0b 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -248,6 +248,10 @@ impl PhysicalStorage { }; file.write_all(buf).await?; + // Note: flush just ensures write above reaches the OS (this is not + // needed in case of sync IO as Write::write there calls directly write + // syscall, but needed in case of async). It does *not* fsyncs the file. + file.flush().await?; if xlogoff + buf.len() == self.wal_seg_size { // If we reached the end of a WAL segment, flush and close it. @@ -716,6 +720,7 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { count -= XLOG_BLCKSZ; } file.write_all(&ZERO_BLOCK[0..count]).await?; + file.flush().await?; Ok(()) }