Compare commits

...

4 Commits

Author SHA1 Message Date
Joonas Koivunen
ab0ecf5548 feat: request logging 2023-04-29 16:57:41 +03:00
Joonas Koivunen
b0ea9175fb fix: exiting from process while running drops
calling std::process::exit on signals or when one task exits will not
run any drops nor drop the only runtime.
2023-04-29 14:25:54 +03:00
Joonas Koivunen
ed4fd15b7c fix: strip extra runtimes 2023-04-29 12:56:08 +03:00
Arseny Sher
2356a24379 Switch safekeepers to async.
This is a full switch, fs io operations are also tokio ones, working through
thread pool. Similar to pageserver, we have multiple runtimes for easier `top`
usage and isolation.

Notable points:
- Now that guts of safekeeper.rs are full of .await's, we need to be very
  careful not to drop task at random point, leaving timeline in unclear
  state. Currently the only writer is walreceiver and we don't have top
  level cancellation there, so we are good. But to be safe probably we should
  add a fuse panicking if task is being dropped while operation on a timeline
  is in progress.
- Timeline lock is Tokio one now, as we do disk IO under it.
- Collecting metrics got a crutch: since prometheus Collector is
  synchronous, there is now a special task copying once in a scrape period
  data from under async lock to sync one where collector can take it.
- Anything involving closures becomes significantly more complicated, as
  async fns are already kinda closures + 'async closures are unstable'.
- Main thread now tracks other main tasks, which got much easier.
- The only sync place left is initial data loading, as otherwise clippy
  complains on timeline map lock being held across await points -- which is
  not bad here as it happens only in single threaded runtime of main thread.
  But having it sync doesn't hurt either.

I'm concerned about performance of thread pool io offloading, async traits and
many await points; but we can try and see how it goes.

fixes https://github.com/neondatabase/neon/issues/3036
fixes https://github.com/neondatabase/neon/issues/3966
2023-04-29 07:00:24 +04:00
21 changed files with 617 additions and 589 deletions

View File

@@ -1,19 +1,18 @@
use crate::auth::{Claims, JwtAuth}; use crate::auth::{Claims, JwtAuth};
use crate::http::error; use crate::http::error;
use anyhow::{anyhow, Context}; use anyhow::Context;
use hyper::header::{HeaderName, AUTHORIZATION}; use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue; use hyper::http::HeaderValue;
use hyper::Method; use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server}; use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use routerify::ext::RequestExt; use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService}; use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::task::JoinError; use tokio::task::JoinError;
use tracing::{self, debug, info, info_span, warn, Instrument}; use tracing::{self, debug, info, info_span, warn, Instrument};
use std::future::Future; use std::future::Future;
use std::net::TcpListener;
use std::str::FromStr; use std::str::FromStr;
use super::error::ApiError; use super::error::ApiError;
@@ -341,40 +340,6 @@ pub fn check_permission_with(
} }
} }
///
/// Start listening for HTTP requests on given socket.
///
/// 'shutdown_future' can be used to stop. If the Future becomes
/// ready, we stop listening for new requests, and the function returns.
///
pub fn serve_thread_main<S>(
router_builder: RouterBuilder<hyper::Body, ApiError>,
listener: TcpListener,
shutdown_future: S,
) -> anyhow::Result<()>
where
S: Future<Output = ()> + Send + Sync,
{
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
// Create a Service from the router above to handle incoming requests.
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();
// Enter a single-threaded tokio runtime bound to the current thread
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let _guard = runtime.enter();
let server = Server::from_tcp(listener)?
.serve(service)
.with_graceful_shutdown(shutdown_future);
runtime.block_on(server)?;
Ok(())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -3,15 +3,16 @@
// //
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use clap::Parser; use clap::Parser;
use futures::FutureExt;
use remote_storage::RemoteStorageConfig; use remote_storage::RemoteStorageConfig;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document; use toml_edit::Document;
use utils::signals::ShutdownSignals;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{ErrorKind, Write}; use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::time::Duration; use std::time::Duration;
use storage_broker::Uri; use storage_broker::Uri;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@@ -35,7 +36,6 @@ use safekeeper::SafeKeeperConf;
use storage_broker::DEFAULT_ENDPOINT; use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::JwtAuth; use utils::auth::JwtAuth;
use utils::{ use utils::{
http::endpoint,
id::NodeId, id::NodeId,
logging::{self, LogFormat}, logging::{self, LogFormat},
project_git_version, project_git_version,
@@ -120,7 +120,8 @@ struct Args {
log_format: String, log_format: String,
} }
fn main() -> anyhow::Result<()> { #[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let args = Args::parse(); let args = Args::parse();
if let Some(addr) = args.dump_control_file { if let Some(addr) = args.dump_control_file {
@@ -180,7 +181,6 @@ fn main() -> anyhow::Result<()> {
heartbeat_timeout: args.heartbeat_timeout, heartbeat_timeout: args.heartbeat_timeout,
remote_storage: args.remote_storage, remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag, max_offloader_lag_bytes: args.max_offloader_lag,
backup_runtime_threads: args.wal_backup_threads,
wal_backup_enabled: !args.disable_wal_backup, wal_backup_enabled: !args.disable_wal_backup,
auth, auth,
}; };
@@ -190,10 +190,10 @@ fn main() -> anyhow::Result<()> {
Some(GIT_VERSION.into()), Some(GIT_VERSION.into()),
&[("node_id", &conf.my_id.to_string())], &[("node_id", &conf.my_id.to_string())],
); );
start_safekeeper(conf) start_safekeeper(conf).await
} }
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { async fn start_safekeeper(conf: SafeKeeperConf) -> anyhow::Result<()> {
// Prevent running multiple safekeepers on the same directory // Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME); let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file = let lock_file =
@@ -204,14 +204,18 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// we need to release the lock file only when the current process is gone // we need to release the lock file only when the current process is gone
std::mem::forget(lock_file); std::mem::forget(lock_file);
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
error!("failed to bind to address {}: {}", conf.listen_http_addr, e); let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
e e
})?; })?;
info!("starting safekeeper on {}", conf.listen_pg_addr); info!(
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| { "starting safekeeper HTTP service on {}",
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); conf.listen_http_addr
);
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e e
})?; })?;
@@ -220,71 +224,90 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new(); let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?; metrics::register_internal(Box::new(timeline_collector))?;
let mut threads = vec![];
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
// Load all timelines from disk to memory. // Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?; GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
let conf_ = conf.clone(); fn named_should_never_return(
threads.push( name: &'static str,
thread::Builder::new() unexpected: Result<Result<(), anyhow::Error>, JoinError>,
.name("http_endpoint_thread".into()) ) -> anyhow::Result<()> {
.spawn(|| { let res = match unexpected {
let router = http::make_router(conf_); Ok(Ok(())) => Err(anyhow::anyhow!("unexpected Ok(()) return")),
endpoint::serve_thread_main( Ok(Err(e)) => Err(e),
router, Err(e) => Err(anyhow::Error::new(e)),
http_listener, };
std::future::pending(), // never shut down
)
.unwrap();
})?,
);
let conf_cloned = conf.clone(); // was not able to get this working with `enum Void {}`
let safekeeper_thread = thread::Builder::new() res.with_context(|| format!("task {name} unexpectedly joined"))
.name("WAL service thread".into()) }
.spawn(|| wal_service::thread_main(conf_cloned, pg_listener))
.unwrap();
threads.push(safekeeper_thread);
let conf_ = conf.clone(); let conf_ = conf.clone();
threads.push( let wal_service_handle = tokio::spawn(wal_service::task_main(conf_, pg_listener))
thread::Builder::new() // wrap with task name for error reporting
.name("broker thread".into()) .map(|res| named_should_never_return("WAL service main", res));
.spawn(|| {
broker::thread_main(conf_);
})?,
);
let conf_ = conf.clone(); let conf_ = conf.clone();
threads.push( let http_handle = tokio::spawn(http::task_main(conf_, http_listener))
thread::Builder::new() .map(|res| named_should_never_return("HTTP service main", res));
.name("WAL removal thread".into())
.spawn(|| {
remove_wal::thread_main(conf_);
})?,
);
threads.push( let conf_ = conf.clone();
thread::Builder::new() let broker_task_handle =
.name("WAL backup launcher thread".into()) tokio::spawn(broker::task_main(conf_).instrument(info_span!("broker")))
.spawn(move || { .map(|res| named_should_never_return("broker main", res));
wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx);
})?, let conf_ = conf.clone();
); let wal_remover_handle = tokio::spawn(remove_wal::task_main(conf_))
.map(|res| named_should_never_return("WAL remover", res));
let conf_ = conf.clone();
let wal_backup_handle = tokio::spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| named_should_never_return("WAL backup launcher", res));
let metrics_shifter_handle = tokio::spawn(safekeeper::metrics::metrics_shifter())
.map(|res| named_should_never_return("metrics shifter", res));
set_build_info_metric(GIT_VERSION); set_build_info_metric(GIT_VERSION);
// TODO: put more thoughts into handling of failed threads
// We should catch & die if they are in trouble.
// On any shutdown signal, log receival and exit. Additionally, handling // TODO: update tokio-stream, convert to real async Stream with
// SIGQUIT prevents coredump. // SignalStream, map it to obtain missing signal name, combine streams into
ShutdownSignals::handle(|signal| { // single stream we can easily sit on.
info!("received {}, terminating", signal.name()); let mut sigquit_stream = signal(SignalKind::quit())?;
std::process::exit(0); let mut sigint_stream = signal(SignalKind::interrupt())?;
}) let mut sigterm_stream = signal(SignalKind::terminate())?;
let tasks = async move {
tokio::try_join!(
wal_service_handle,
http_handle,
broker_task_handle,
wal_remover_handle,
wal_backup_handle,
metrics_shifter_handle
)
};
tokio::select! {
res = tasks => {
// this will be the first reason to stop a safekeeper, but not necessarily the only one
// which will get to happen before we exit
match res {
Ok(_) => unreachable!("because of named_should_never_return, we can never end up here, cannot use ! yet"),
Err(e) => return Err(e),
}
}
// On any shutdown signal, log receival and exit. Additionally, handling
// SIGQUIT prevents coredump.
_ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
_ = sigint_stream.recv() => info!("received SIGINT, terminating"),
_ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
}
Ok(())
} }
/// Determine safekeeper id. /// Determine safekeeper id.

View File

@@ -15,7 +15,7 @@ use storage_broker::Request;
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep}; use tokio::time::sleep;
use tracing::*; use tracing::*;
use crate::GlobalTimelines; use crate::GlobalTimelines;
@@ -24,20 +24,6 @@ use crate::SafeKeeperConf;
const RETRY_INTERVAL_MSEC: u64 = 1000; const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000;
pub fn thread_main(conf: SafeKeeperConf) {
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _enter = info_span!("broker").entered();
info!("started, broker endpoint {:?}", conf.broker_endpoint);
runtime.block_on(async {
main_loop(conf).await;
});
}
/// Push once in a while data about all active timelines to the broker. /// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = BrokerServiceClient::connect(conf.broker_endpoint.clone()).await?; let mut client = BrokerServiceClient::connect(conf.broker_endpoint.clone()).await?;
@@ -49,10 +35,15 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// is under plain mutex. That's ok, all this code is not performance // is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while // sensitive and there is no risk of deadlock as we don't await while
// lock is held. // lock is held.
let mut active_tlis = GlobalTimelines::get_all(); let all_tlis = GlobalTimelines::get_all();
active_tlis.retain(|tli| tli.is_active()); for tli in &all_tlis {
for tli in &active_tlis { // filtering alternative futures::stream::iter(all_tlis)
let sk_info = tli.get_safekeeper_info(&conf); // .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
// doesn't look better, and I'm not sure how to do that without collect.
if !tli.is_active().await {
continue;
}
let sk_info = tli.get_safekeeper_info(&conf).await;
yield sk_info; yield sk_info;
} }
sleep(push_interval).await; sleep(push_interval).await;
@@ -97,10 +88,13 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
bail!("end of stream"); bail!("end of stream");
} }
async fn main_loop(conf: SafeKeeperConf) { pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
info!("started, broker endpoint {:?}", conf.broker_endpoint);
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC)); let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None; let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None; let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
// Selecting on JoinHandles requires some squats; is there a better way to // Selecting on JoinHandles requires some squats; is there a better way to
// reap tasks individually? // reap tasks individually?

View File

@@ -2,9 +2,10 @@
use anyhow::{bail, ensure, Context, Result}; use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use tokio::fs::{self, File};
use tokio::io::AsyncWriteExt;
use std::fs::{self, File, OpenOptions}; use std::io::Read;
use std::io::{Read, Write};
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -25,9 +26,10 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Storage should keep actual state inside of it. It should implement Deref /// Storage should keep actual state inside of it. It should implement Deref
/// trait to access state fields and have persist method for updating that state. /// trait to access state fields and have persist method for updating that state.
#[async_trait::async_trait]
pub trait Storage: Deref<Target = SafeKeeperState> { pub trait Storage: Deref<Target = SafeKeeperState> {
/// Persist safekeeper state on disk and update internal state. /// Persist safekeeper state on disk and update internal state.
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
} }
#[derive(Debug)] #[derive(Debug)]
@@ -74,7 +76,7 @@ impl FileStorage {
/// Check the magic/version in the on-disk data and deserialize it, if possible. /// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> { fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
// Read the version independent part // Read the version independent part
let magic = buf.read_u32::<LittleEndian>()?; let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
if magic != SK_MAGIC { if magic != SK_MAGIC {
bail!( bail!(
"bad control file magic: {:X}, expected {:X}", "bad control file magic: {:X}, expected {:X}",
@@ -82,7 +84,7 @@ impl FileStorage {
SK_MAGIC SK_MAGIC
); );
} }
let version = buf.read_u32::<LittleEndian>()?; let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
if version == SK_FORMAT_VERSION { if version == SK_FORMAT_VERSION {
let res = SafeKeeperState::des(buf)?; let res = SafeKeeperState::des(buf)?;
return Ok(res); return Ok(res);
@@ -102,7 +104,7 @@ impl FileStorage {
/// Read in the control file. /// Read in the control file.
pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> { pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
let mut control_file = OpenOptions::new() let mut control_file = std::fs::OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
.open(&control_file_path) .open(&control_file_path)
@@ -151,30 +153,31 @@ impl Deref for FileStorage {
} }
} }
#[async_trait::async_trait]
impl Storage for FileStorage { impl Storage for FileStorage {
/// persists state durably to underlying storage /// persists state durably to underlying storage
/// for description see https://lwn.net/Articles/457667/ /// for description see https://lwn.net/Articles/457667/
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer(); let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
// write data to safekeeper.control.partial // write data to safekeeper.control.partial
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
let mut control_partial = File::create(&control_partial_path).with_context(|| { let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
format!( format!(
"failed to create partial control file at: {}", "failed to create partial control file at: {}",
&control_partial_path.display() &control_partial_path.display()
) )
})?; })?;
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
buf.write_u32::<LittleEndian>(SK_MAGIC)?; WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
buf.write_u32::<LittleEndian>(SK_FORMAT_VERSION)?; WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?; s.ser_into(&mut buf)?;
// calculate checksum before resize // calculate checksum before resize
let checksum = crc32c::crc32c(&buf); let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes()); buf.extend_from_slice(&checksum.to_le_bytes());
control_partial.write_all(&buf).with_context(|| { control_partial.write_all(&buf).await.with_context(|| {
format!( format!(
"failed to write safekeeper state into control file at: {}", "failed to write safekeeper state into control file at: {}",
control_partial_path.display() control_partial_path.display()
@@ -183,7 +186,7 @@ impl Storage for FileStorage {
// fsync the file // fsync the file
if !self.conf.no_sync { if !self.conf.no_sync {
control_partial.sync_all().with_context(|| { control_partial.sync_all().await.with_context(|| {
format!( format!(
"failed to sync partial control file at {}", "failed to sync partial control file at {}",
control_partial_path.display() control_partial_path.display()
@@ -194,21 +197,22 @@ impl Storage for FileStorage {
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
// rename should be atomic // rename should be atomic
fs::rename(&control_partial_path, &control_path)?; fs::rename(&control_partial_path, &control_path).await?;
// this sync is not required by any standard but postgres does this (see durable_rename) // this sync is not required by any standard but postgres does this (see durable_rename)
if !self.conf.no_sync { if !self.conf.no_sync {
File::open(&control_path) let new_f = File::open(&control_path).await?;
.and_then(|f| f.sync_all()) new_f.sync_all().await.with_context(|| {
.with_context(|| { format!(
format!( "failed to sync control file at: {}",
"failed to sync control file at: {}", &control_path.display()
&control_path.display() )
) })?;
})?;
// fsync the directory (linux specific) // fsync the directory (linux specific)
File::open(&self.timeline_dir) let tli_dir = File::open(&self.timeline_dir).await?;
.and_then(|f| f.sync_all()) tli_dir
.sync_all()
.await
.context("failed to sync control file directory")?; .context("failed to sync control file directory")?;
} }
@@ -224,7 +228,6 @@ mod test {
use super::*; use super::*;
use crate::{safekeeper::SafeKeeperState, SafeKeeperConf}; use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
use anyhow::Result; use anyhow::Result;
use std::fs;
use utils::{id::TenantTimelineId, lsn::Lsn}; use utils::{id::TenantTimelineId, lsn::Lsn};
fn stub_conf() -> SafeKeeperConf { fn stub_conf() -> SafeKeeperConf {
@@ -235,59 +238,75 @@ mod test {
} }
} }
fn load_from_control_file( async fn load_from_control_file(
conf: &SafeKeeperConf, conf: &SafeKeeperConf,
ttid: &TenantTimelineId, ttid: &TenantTimelineId,
) -> Result<(FileStorage, SafeKeeperState)> { ) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir"); fs::create_dir_all(conf.timeline_dir(ttid))
.await
.expect("failed to create timeline dir");
Ok(( Ok((
FileStorage::restore_new(ttid, conf)?, FileStorage::restore_new(ttid, conf)?,
FileStorage::load_control_file_conf(conf, ttid)?, FileStorage::load_control_file_conf(conf, ttid)?,
)) ))
} }
fn create( async fn create(
conf: &SafeKeeperConf, conf: &SafeKeeperConf,
ttid: &TenantTimelineId, ttid: &TenantTimelineId,
) -> Result<(FileStorage, SafeKeeperState)> { ) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir"); fs::create_dir_all(conf.timeline_dir(ttid))
.await
.expect("failed to create timeline dir");
let state = SafeKeeperState::empty(); let state = SafeKeeperState::empty();
let storage = FileStorage::create_new(ttid, conf, state.clone())?; let storage = FileStorage::create_new(ttid, conf, state.clone())?;
Ok((storage, state)) Ok((storage, state))
} }
#[test] #[tokio::test]
fn test_read_write_safekeeper_state() { async fn test_read_write_safekeeper_state() {
let conf = stub_conf(); let conf = stub_conf();
let ttid = TenantTimelineId::generate(); let ttid = TenantTimelineId::generate();
{ {
let (mut storage, mut state) = create(&conf, &ttid).expect("failed to create state"); let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to create state");
// change something // change something
state.commit_lsn = Lsn(42); state.commit_lsn = Lsn(42);
storage.persist(&state).expect("failed to persist state"); storage
.persist(&state)
.await
.expect("failed to persist state");
} }
let (_, state) = load_from_control_file(&conf, &ttid).expect("failed to read state"); let (_, state) = load_from_control_file(&conf, &ttid)
.await
.expect("failed to read state");
assert_eq!(state.commit_lsn, Lsn(42)); assert_eq!(state.commit_lsn, Lsn(42));
} }
#[test] #[tokio::test]
fn test_safekeeper_state_checksum_mismatch() { async fn test_safekeeper_state_checksum_mismatch() {
let conf = stub_conf(); let conf = stub_conf();
let ttid = TenantTimelineId::generate(); let ttid = TenantTimelineId::generate();
{ {
let (mut storage, mut state) = create(&conf, &ttid).expect("failed to read state"); let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to read state");
// change something // change something
state.commit_lsn = Lsn(42); state.commit_lsn = Lsn(42);
storage.persist(&state).expect("failed to persist state"); storage
.persist(&state)
.await
.expect("failed to persist state");
} }
let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME); let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).unwrap(); let mut data = fs::read(&control_path).await.unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data).expect("failed to write control file"); fs::write(&control_path, &data)
.await
.expect("failed to write control file");
match load_from_control_file(&conf, &ttid) { match load_from_control_file(&conf, &ttid).await {
Err(err) => assert!(err Err(err) => assert!(err
.to_string() .to_string()
.contains("safekeeper control file checksum mismatch")), .contains("safekeeper control file checksum mismatch")),

View File

@@ -121,7 +121,7 @@ pub struct FileInfo {
} }
/// Build debug dump response, using the provided [`Args`] filters. /// Build debug dump response, using the provided [`Args`] filters.
pub fn build(args: Args) -> Result<Response> { pub async fn build(args: Args) -> Result<Response> {
let start_time = Utc::now(); let start_time = Utc::now();
let timelines_count = GlobalTimelines::timelines_count(); let timelines_count = GlobalTimelines::timelines_count();
@@ -155,7 +155,7 @@ pub fn build(args: Args) -> Result<Response> {
} }
let control_file = if args.dump_control_file { let control_file = if args.dump_control_file {
let mut state = tli.get_state().1; let mut state = tli.get_state().await.1;
if !args.dump_term_history { if !args.dump_term_history {
state.acceptor_state.term_history = TermHistory(vec![]); state.acceptor_state.term_history = TermHistory(vec![]);
} }
@@ -165,7 +165,7 @@ pub fn build(args: Args) -> Result<Response> {
}; };
let memory = if args.dump_memory { let memory = if args.dump_memory {
Some(tli.memory_dump()) Some(tli.memory_dump().await)
} else { } else {
None None
}; };

View File

@@ -241,14 +241,14 @@ impl SafekeeperPostgresHandler {
let lsn = if self.is_walproposer_recovery() { let lsn = if self.is_walproposer_recovery() {
// walproposer should get all local WAL until flush_lsn // walproposer should get all local WAL until flush_lsn
tli.get_flush_lsn() tli.get_flush_lsn().await
} else { } else {
// other clients shouldn't get any uncommitted WAL // other clients shouldn't get any uncommitted WAL
tli.get_state().0.commit_lsn tli.get_state().await.0.commit_lsn
} }
.to_string(); .to_string();
let sysid = tli.get_state().1.server.system_id.to_string(); let sysid = tli.get_state().await.1.server.system_id.to_string();
let lsn_bytes = lsn.as_bytes(); let lsn_bytes = lsn.as_bytes();
let tli = PG_TLI.to_string(); let tli = PG_TLI.to_string();
let tli_bytes = tli.as_bytes(); let tli_bytes = tli.as_bytes();

View File

@@ -2,3 +2,18 @@ pub mod routes;
pub use routes::make_router; pub use routes::make_router;
pub use safekeeper_api::models; pub use safekeeper_api::models;
use crate::SafeKeeperConf;
pub async fn task_main(
conf: SafeKeeperConf,
http_listener: std::net::TcpListener,
) -> anyhow::Result<()> {
let router = make_router(conf)
.build()
.map_err(|err| anyhow::anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?;
server.serve(service).await?;
Ok(()) // unreachable
}

View File

@@ -13,7 +13,6 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::task::JoinError;
use crate::safekeeper::ServerInfo; use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term; use crate::safekeeper::Term;
@@ -116,8 +115,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
check_permission(&request, Some(ttid.tenant_id))?; check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let (inmem, state) = tli.get_state(); let (inmem, state) = tli.get_state().await;
let flush_lsn = tli.get_flush_lsn(); let flush_lsn = tli.get_flush_lsn().await;
let epoch = state.acceptor_state.get_epoch(flush_lsn); let epoch = state.acceptor_state.get_epoch(flush_lsn);
let term_history = state let term_history = state
@@ -232,13 +231,11 @@ async fn timeline_delete_force_handler(
); );
check_permission(&request, Some(ttid.tenant_id))?; check_permission(&request, Some(ttid.tenant_id))?;
ensure_no_body(&mut request).await?; ensure_no_body(&mut request).await?;
let resp = tokio::task::spawn_blocking(move || { // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better // error handling here when we're able to.
// error handling here when we're able to. let resp = GlobalTimelines::delete_force(&ttid)
GlobalTimelines::delete_force(&ttid).map_err(ApiError::InternalServerError) .await
}) .map_err(ApiError::InternalServerError)?;
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
json_response(StatusCode::OK, resp) json_response(StatusCode::OK, resp)
} }
@@ -250,14 +247,11 @@ async fn tenant_delete_force_handler(
let tenant_id = parse_request_param(&request, "tenant_id")?; let tenant_id = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?; check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?; ensure_no_body(&mut request).await?;
let delete_info = tokio::task::spawn_blocking(move || { // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons; // Using an `InternalServerError` should be fixed when the types support it
// Using an `InternalServerError` should be fixed when the types support it let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
GlobalTimelines::delete_force_all_for_tenant(&tenant_id) .await
.map_err(ApiError::InternalServerError) .map_err(ApiError::InternalServerError)?;
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
json_response( json_response(
StatusCode::OK, StatusCode::OK,
delete_info delete_info
@@ -353,11 +347,9 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
timeline_id, timeline_id,
}; };
let resp = tokio::task::spawn_blocking(move || { let resp = debug_dump::build(args)
debug_dump::build(args).map_err(ApiError::InternalServerError) .await
}) .map_err(ApiError::InternalServerError)?;
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
// TODO: use streaming response // TODO: use streaming response
json_response(StatusCode::OK, resp) json_response(StatusCode::OK, resp)
@@ -365,6 +357,8 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
/// Safekeeper http router. /// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> { pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
use utils::http::endpoint::RequestSpan;
let mut router = endpoint::make_router(); let mut router = endpoint::make_router();
if conf.auth.is_some() { if conf.auth.is_some() {
router = router.middleware(auth_middleware(|request| { router = router.middleware(auth_middleware(|request| {
@@ -386,29 +380,34 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
router router
.data(Arc::new(conf)) .data(Arc::new(conf))
.data(auth) .data(auth)
.get("/v1/status", status_handler) .get("/v1/status", |r| RequestSpan(status_handler).handle(r))
// Will be used in the future instead of implicit timeline creation // Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", timeline_create_handler) .post("/v1/tenant/timeline", |r| {
.get( RequestSpan(timeline_create_handler).handle(r)
"/v1/tenant/:tenant_id/timeline/:timeline_id", })
timeline_status_handler, .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
) RequestSpan(timeline_status_handler).handle(r)
.delete( })
"/v1/tenant/:tenant_id/timeline/:timeline_id", .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
timeline_delete_force_handler, RequestSpan(timeline_delete_force_handler).handle(r)
) })
.delete("/v1/tenant/:tenant_id", tenant_delete_force_handler) .delete("/v1/tenant/:tenant_id", |r| {
.post("/v1/pull_timeline", timeline_pull_handler) RequestSpan(tenant_delete_force_handler).handle(r)
})
.post("/v1/pull_timeline", |r| {
RequestSpan(timeline_pull_handler).handle(r)
})
.get( .get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename", "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
timeline_files_handler, |r| RequestSpan(timeline_files_handler).handle(r),
) )
// for tests // for tests
.post( .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
"/v1/record_safekeeper_info/:tenant_id/:timeline_id", RequestSpan(record_safekeeper_info).handle(r)
record_safekeeper_info, })
) .get("/v1/debug_dump", |r| {
.get("/v1/debug_dump", dump_debug_handler) RequestSpan(dump_debug_handler).handle(r)
})
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -73,12 +73,12 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
// if send_proposer_elected is true, we need to update local history // if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected { if append_request.send_proposer_elected {
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?; send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
} }
let inserted_wal = append_logical_message(&tli, append_request)?; let inserted_wal = append_logical_message(&tli, append_request).await?;
let response = AppendResult { let response = AppendResult {
state: tli.get_state().1, state: tli.get_state().await.1,
inserted_wal, inserted_wal,
}; };
let response_data = serde_json::to_vec(&response) let response_data = serde_json::to_vec(&response)
@@ -114,9 +114,9 @@ async fn prepare_safekeeper(
.await .await
} }
fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> { async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
// add new term to existing history // add new term to existing history
let history = tli.get_state().1.acceptor_state.term_history; let history = tli.get_state().await.1.acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0; let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn }); history_entries.push(TermSwitchEntry { term, lsn });
@@ -129,7 +129,7 @@ fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::R
timeline_start_lsn: lsn, timeline_start_lsn: lsn,
}); });
tli.process_msg(&proposer_elected_request)?; tli.process_msg(&proposer_elected_request).await?;
Ok(()) Ok(())
} }
@@ -142,12 +142,12 @@ pub struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that, /// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper. /// create AppendRequest with new WAL and pass it to safekeeper.
pub fn append_logical_message( pub async fn append_logical_message(
tli: &Arc<Timeline>, tli: &Arc<Timeline>,
msg: &AppendLogicalMessage, msg: &AppendLogicalMessage,
) -> anyhow::Result<InsertedWAL> { ) -> anyhow::Result<InsertedWAL> {
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = tli.get_state().1; let sk_state = tli.get_state().await.1;
let begin_lsn = msg.begin_lsn; let begin_lsn = msg.begin_lsn;
let end_lsn = begin_lsn + wal_data.len() as u64; let end_lsn = begin_lsn + wal_data.len() as u64;
@@ -171,7 +171,7 @@ pub fn append_logical_message(
wal_data: Bytes::from(wal_data), wal_data: Bytes::from(wal_data),
}); });
let response = tli.process_msg(&append_request)?; let response = tli.process_msg(&append_request).await?;
let append_response = match response { let append_response = match response {
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,

View File

@@ -36,7 +36,6 @@ pub mod defaults {
DEFAULT_PG_LISTEN_PORT, DEFAULT_PG_LISTEN_PORT,
}; };
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms"; pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
} }
@@ -60,7 +59,6 @@ pub struct SafeKeeperConf {
pub heartbeat_timeout: Duration, pub heartbeat_timeout: Duration,
pub remote_storage: Option<RemoteStorageConfig>, pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64, pub max_offloader_lag_bytes: u64,
pub backup_runtime_threads: Option<usize>,
pub wal_backup_enabled: bool, pub wal_backup_enabled: bool,
pub auth: Option<Arc<JwtAuth>>, pub auth: Option<Arc<JwtAuth>>,
} }
@@ -91,7 +89,6 @@ impl SafeKeeperConf {
.parse() .parse()
.expect("failed to parse default broker endpoint"), .expect("failed to parse default broker endpoint"),
broker_keepalive_interval: Duration::from_secs(5), broker_keepalive_interval: Duration::from_secs(5),
backup_runtime_threads: None,
wal_backup_enabled: true, wal_backup_enabled: true,
auth: None, auth: None,
heartbeat_timeout: Duration::new(5, 0), heartbeat_timeout: Duration::new(5, 0),

View File

@@ -2,11 +2,12 @@
use std::{ use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
time::{Instant, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS}; use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
use anyhow::Result; use anyhow::Result;
use futures::Future;
use metrics::{ use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily, proto::MetricFamily,
@@ -15,6 +16,7 @@ use metrics::{
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use postgres_ffi::XLogSegNo; use postgres_ffi::XLogSegNo;
use tokio::time::interval;
use utils::pageserver_feedback::PageserverFeedback; use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn}; use utils::{id::TenantTimelineId, lsn::Lsn};
@@ -221,14 +223,17 @@ impl WalStorageMetrics {
} }
} }
/// Accepts a closure that returns a result, and returns the duration of the closure. /// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result<f64> { pub async fn time_io_closure<E: Into<anyhow::Error>>(
closure: impl Future<Output = Result<(), E>>,
) -> Result<f64> {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
closure()?; closure.await.map_err(|e| e.into())?;
Ok(start.elapsed().as_secs_f64()) Ok(start.elapsed().as_secs_f64())
} }
/// Metrics for a single timeline. /// Metrics for a single timeline.
#[derive(Clone)]
pub struct FullTimelineInfo { pub struct FullTimelineInfo {
pub ttid: TenantTimelineId, pub ttid: TenantTimelineId,
pub ps_feedback: PageserverFeedback, pub ps_feedback: PageserverFeedback,
@@ -611,3 +616,18 @@ impl Collector for TimelineCollector {
mfs mfs
} }
} }
/// Prometheus crate Collector interface is sync, and all safekeeper code is
/// async. To bridge the gap, this function wakes once in scrape interval and
/// copies metrics from under async lock to sync where collection can take it.
pub async fn metrics_shifter() -> anyhow::Result<()> {
let scrape_interval = Duration::from_secs(30);
let mut interval = interval(scrape_interval);
loop {
interval.tick().await;
let timelines = GlobalTimelines::get_all();
for tli in timelines {
tli.set_info_for_metrics().await;
}
}
}

View File

@@ -231,7 +231,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
info!( info!(
"Loaded timeline {}, flush_lsn={}", "Loaded timeline {}, flush_lsn={}",
ttid, ttid,
tli.get_flush_lsn() tli.get_flush_lsn().await
); );
Ok(Response { Ok(Response {

View File

@@ -18,15 +18,14 @@ use postgres_backend::QueryError;
use pq_proto::BeMessage; use pq_proto::BeMessage;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking; use tokio::task;
use tokio::task::JoinHandle;
use tokio::time::Duration; use tokio::time::Duration;
use tokio::time::Instant; use tokio::time::Instant;
use tracing::*; use tracing::*;
@@ -97,7 +96,7 @@ impl SafekeeperPostgresHandler {
Err(res.expect_err("no error with WalAcceptor not spawn")) Err(res.expect_err("no error with WalAcceptor not spawn"))
} }
Some(handle) => { Some(handle) => {
let wal_acceptor_res = handle.join(); let wal_acceptor_res = handle.await;
// If there was any network error, return it. // If there was any network error, return it.
res?; res?;
@@ -107,7 +106,7 @@ impl SafekeeperPostgresHandler {
Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))), Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!( Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
"WalAcceptor thread panicked", "WalAcceptor task panicked",
))), ))),
} }
} }
@@ -154,10 +153,12 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
} }
}; };
*self.acceptor_handle = Some( *self.acceptor_handle = Some(WalAcceptor::spawn(
WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, self.conn_id) tli.clone(),
.context("spawn WalAcceptor thread")?, msg_rx,
); reply_tx,
self.conn_id,
));
// Forward all messages to WalAcceptor // Forward all messages to WalAcceptor
read_network_loop(self.pgb_reader, msg_tx, next_msg).await read_network_loop(self.pgb_reader, msg_tx, next_msg).await
@@ -226,28 +227,19 @@ impl WalAcceptor {
msg_rx: Receiver<ProposerAcceptorMessage>, msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>, reply_tx: Sender<AcceptorProposerMessage>,
conn_id: ConnectionId, conn_id: ConnectionId,
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> { ) -> JoinHandle<anyhow::Result<()>> {
let thread_name = format!("WAL acceptor {}", tli.ttid); task::spawn(async move {
thread::Builder::new() let mut wa = WalAcceptor {
.name(thread_name) tli,
.spawn(move || -> anyhow::Result<()> { msg_rx,
let mut wa = WalAcceptor { reply_tx,
tli, };
msg_rx,
reply_tx,
};
let runtime = tokio::runtime::Builder::new_current_thread() let span_ttid = wa.tli.ttid; // satisfy borrow checker
.enable_all() wa.run()
.build()?; .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
.await
let span_ttid = wa.tli.ttid; // satisfy borrow checker })
runtime.block_on(
wa.run()
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)),
)
})
.map_err(anyhow::Error::from)
} }
/// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed; /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
@@ -281,7 +273,7 @@ impl WalAcceptor {
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg { while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
if let Some(reply) = self.tli.process_msg(&noflush_msg)? { if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
if self.reply_tx.send(reply).await.is_err() { if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated return Ok(()); // chan closed, streaming terminated
} }
@@ -300,10 +292,12 @@ impl WalAcceptor {
} }
// flush all written WAL to the disk // flush all written WAL to the disk
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
} else { } else {
// process message other than AppendRequest // process message other than AppendRequest
self.tli.process_msg(&next_msg)? self.tli.process_msg(&next_msg).await?
}; };
if let Some(reply) = reply_msg { if let Some(reply) = reply_msg {
@@ -326,8 +320,8 @@ impl Drop for ComputeConnectionGuard {
let tli = self.timeline.clone(); let tli = self.timeline.clone();
// tokio forbids to call blocking_send inside the runtime, and see // tokio forbids to call blocking_send inside the runtime, and see
// comments in on_compute_disconnect why we call blocking_send. // comments in on_compute_disconnect why we call blocking_send.
spawn_blocking(move || { tokio::spawn(async move {
if let Err(e) = tli.on_compute_disconnect() { if let Err(e) = tli.on_compute_disconnect().await {
error!("failed to unregister compute connection: {}", e); error!("failed to unregister compute connection: {}", e);
} }
}); });

View File

@@ -1,26 +1,29 @@
//! Thread removing old WAL. //! Thread removing old WAL.
use std::{thread, time::Duration}; use std::time::Duration;
use tokio::time::sleep;
use tracing::*; use tracing::*;
use crate::{GlobalTimelines, SafeKeeperConf}; use crate::{GlobalTimelines, SafeKeeperConf};
pub fn thread_main(conf: SafeKeeperConf) { pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
let wal_removal_interval = Duration::from_millis(5000); let wal_removal_interval = Duration::from_millis(5000);
loop { loop {
let tlis = GlobalTimelines::get_all(); let tlis = GlobalTimelines::get_all();
for tli in &tlis { for tli in &tlis {
if !tli.is_active() { if !tli.is_active().await {
continue; continue;
} }
let ttid = tli.ttid; let ttid = tli.ttid;
let _enter = if let Err(e) = tli
info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered(); .remove_old_wal(conf.wal_backup_enabled)
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { .instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id))
warn!("failed to remove WAL: {}", e); .await
{
error!("failed to remove WAL: {}", e);
} }
} }
thread::sleep(wal_removal_interval) sleep(wal_removal_interval).await;
} }
} }

View File

@@ -567,25 +567,27 @@ where
/// Process message from proposer and possibly form reply. Concurrent /// Process message from proposer and possibly form reply. Concurrent
/// callers must exclude each other. /// callers must exclude each other.
pub fn process_msg( pub async fn process_msg(
&mut self, &mut self,
msg: &ProposerAcceptorMessage, msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> { ) -> Result<Option<AcceptorProposerMessage>> {
match msg { match msg {
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg), ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true), ProposerAcceptorMessage::AppendRequest(msg) => {
ProposerAcceptorMessage::NoFlushAppendRequest(msg) => { self.handle_append_request(msg, true).await
self.handle_append_request(msg, false)
} }
ProposerAcceptorMessage::FlushWAL => self.handle_flush(), ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
self.handle_append_request(msg, false).await
}
ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
} }
} }
/// Handle initial message from proposer: check its sanity and send my /// Handle initial message from proposer: check its sanity and send my
/// current term. /// current term.
fn handle_greeting( async fn handle_greeting(
&mut self, &mut self,
msg: &ProposerGreeting, msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> { ) -> Result<Option<AcceptorProposerMessage>> {
@@ -647,7 +649,7 @@ where
if msg.pg_version != UNKNOWN_SERVER_VERSION { if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version; state.server.pg_version = msg.pg_version;
} }
self.state.persist(&state)?; self.state.persist(&state).await?;
} }
info!( info!(
@@ -662,7 +664,7 @@ where
} }
/// Give vote for the given term, if we haven't done that previously. /// Give vote for the given term, if we haven't done that previously.
fn handle_vote_request( async fn handle_vote_request(
&mut self, &mut self,
msg: &VoteRequest, msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> { ) -> Result<Option<AcceptorProposerMessage>> {
@@ -676,7 +678,7 @@ where
// handle_elected instead. Currently not a big deal, as proposer is the // handle_elected instead. Currently not a big deal, as proposer is the
// only source of WAL; with peer2peer recovery it would be more // only source of WAL; with peer2peer recovery it would be more
// important. // important.
self.wal_store.flush_wal()?; self.wal_store.flush_wal().await?;
// initialize with refusal // initialize with refusal
let mut resp = VoteResponse { let mut resp = VoteResponse {
term: self.state.acceptor_state.term, term: self.state.acceptor_state.term,
@@ -690,7 +692,7 @@ where
let mut state = self.state.clone(); let mut state = self.state.clone();
state.acceptor_state.term = msg.term; state.acceptor_state.term = msg.term;
// persist vote before sending it out // persist vote before sending it out
self.state.persist(&state)?; self.state.persist(&state).await?;
resp.term = self.state.acceptor_state.term; resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64; resp.vote_given = true as u64;
@@ -713,12 +715,15 @@ where
ar ar
} }
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> { async fn handle_elected(
&mut self,
msg: &ProposerElected,
) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg); info!("received ProposerElected {:?}", msg);
if self.state.acceptor_state.term < msg.term { if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone(); let mut state = self.state.clone();
state.acceptor_state.term = msg.term; state.acceptor_state.term = msg.term;
self.state.persist(&state)?; self.state.persist(&state).await?;
} }
// If our term is higher, ignore the message (next feedback will inform the compute) // If our term is higher, ignore the message (next feedback will inform the compute)
@@ -748,7 +753,7 @@ where
// intersection of our history and history from msg // intersection of our history and history from msg
// truncate wal, update the LSNs // truncate wal, update the LSNs
self.wal_store.truncate_wal(msg.start_streaming_at)?; self.wal_store.truncate_wal(msg.start_streaming_at).await?;
// and now adopt term history from proposer // and now adopt term history from proposer
{ {
@@ -782,7 +787,7 @@ where
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
state.acceptor_state.term_history = msg.term_history.clone(); state.acceptor_state.term_history = msg.term_history.clone();
self.persist_control_file(state)?; self.persist_control_file(state).await?;
} }
info!("start receiving WAL since {:?}", msg.start_streaming_at); info!("start receiving WAL since {:?}", msg.start_streaming_at);
@@ -794,7 +799,7 @@ where
/// ///
/// Note: it is assumed that 'WAL we have is from the right term' check has /// Note: it is assumed that 'WAL we have is from the right term' check has
/// already been done outside. /// already been done outside.
fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> { async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
// Both peers and walproposer communicate this value, we might already // Both peers and walproposer communicate this value, we might already
// have a fresher (higher) version. // have a fresher (higher) version.
candidate = max(candidate, self.inmem.commit_lsn); candidate = max(candidate, self.inmem.commit_lsn);
@@ -816,29 +821,29 @@ where
// that we receive new epoch_start_lsn, and we still need to sync // that we receive new epoch_start_lsn, and we still need to sync
// control file in this case. // control file in this case.
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn { if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
self.persist_control_file(self.state.clone())?; self.persist_control_file(self.state.clone()).await?;
} }
Ok(()) Ok(())
} }
/// Persist control file to disk, called only after timeline creation (bootstrap). /// Persist control file to disk, called only after timeline creation (bootstrap).
pub fn persist(&mut self) -> Result<()> { pub async fn persist(&mut self) -> Result<()> {
self.persist_control_file(self.state.clone()) self.persist_control_file(self.state.clone()).await
} }
/// Persist in-memory state to the disk, taking other data from state. /// Persist in-memory state to the disk, taking other data from state.
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
state.commit_lsn = self.inmem.commit_lsn; state.commit_lsn = self.inmem.commit_lsn;
state.backup_lsn = self.inmem.backup_lsn; state.backup_lsn = self.inmem.backup_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.proposer_uuid = self.inmem.proposer_uuid; state.proposer_uuid = self.inmem.proposer_uuid;
self.state.persist(&state) self.state.persist(&state).await
} }
/// Handle request to append WAL. /// Handle request to append WAL.
#[allow(clippy::comparison_chain)] #[allow(clippy::comparison_chain)]
fn handle_append_request( async fn handle_append_request(
&mut self, &mut self,
msg: &AppendRequest, msg: &AppendRequest,
require_flush: bool, require_flush: bool,
@@ -861,17 +866,19 @@ where
// do the job // do the job
if !msg.wal_data.is_empty() { if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; self.wal_store
.write_wal(msg.h.begin_lsn, &msg.wal_data)
.await?;
} }
// flush wal to the disk, if required // flush wal to the disk, if required
if require_flush { if require_flush {
self.wal_store.flush_wal()?; self.wal_store.flush_wal().await?;
} }
// Update commit_lsn. // Update commit_lsn.
if msg.h.commit_lsn != Lsn(0) { if msg.h.commit_lsn != Lsn(0) {
self.update_commit_lsn(msg.h.commit_lsn)?; self.update_commit_lsn(msg.h.commit_lsn).await?;
} }
// Value calculated by walproposer can always lag: // Value calculated by walproposer can always lag:
// - safekeepers can forget inmem value and send to proposer lower // - safekeepers can forget inmem value and send to proposer lower
@@ -887,7 +894,7 @@ where
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn < self.inmem.peer_horizon_lsn
{ {
self.persist_control_file(self.state.clone())?; self.persist_control_file(self.state.clone()).await?;
} }
trace!( trace!(
@@ -909,15 +916,15 @@ where
} }
/// Flush WAL to disk. Return AppendResponse with latest LSNs. /// Flush WAL to disk. Return AppendResponse with latest LSNs.
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> { async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal()?; self.wal_store.flush_wal().await?;
Ok(Some(AcceptorProposerMessage::AppendResponse( Ok(Some(AcceptorProposerMessage::AppendResponse(
self.append_response(), self.append_response(),
))) )))
} }
/// Update timeline state with peer safekeeper data. /// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> { pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
let mut sync_control_file = false; let mut sync_control_file = false;
if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) { if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
@@ -925,7 +932,7 @@ where
// commit_lsn if our history matches (is part of) history of advanced // commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider. // commit_lsn provider.
if sk_info.last_log_term == self.get_epoch() { if sk_info.last_log_term == self.get_epoch() {
self.update_commit_lsn(Lsn(sk_info.commit_lsn))?; self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
} }
} }
@@ -952,7 +959,7 @@ where
// persisting cf -- that is not much needed currently. We could do // persisting cf -- that is not much needed currently. We could do
// that by storing Arc to walsenders in Safekeeper. // that by storing Arc to walsenders in Safekeeper.
state.remote_consistent_lsn = new_remote_consistent_lsn; state.remote_consistent_lsn = new_remote_consistent_lsn;
self.persist_control_file(state)?; self.persist_control_file(state).await?;
} }
Ok(()) Ok(())
} }
@@ -976,6 +983,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::future::BoxFuture;
use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_ffi::WAL_SEGMENT_SIZE;
use super::*; use super::*;
@@ -987,8 +995,9 @@ mod tests {
persisted_state: SafeKeeperState, persisted_state: SafeKeeperState,
} }
#[async_trait::async_trait]
impl control_file::Storage for InMemoryState { impl control_file::Storage for InMemoryState {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
self.persisted_state = s.clone(); self.persisted_state = s.clone();
Ok(()) Ok(())
} }
@@ -1014,27 +1023,28 @@ mod tests {
lsn: Lsn, lsn: Lsn,
} }
#[async_trait::async_trait]
impl wal_storage::Storage for DummyWalStore { impl wal_storage::Storage for DummyWalStore {
fn flush_lsn(&self) -> Lsn { fn flush_lsn(&self) -> Lsn {
self.lsn self.lsn
} }
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64; self.lsn = startpos + buf.len() as u64;
Ok(()) Ok(())
} }
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
self.lsn = end_pos; self.lsn = end_pos;
Ok(()) Ok(())
} }
fn flush_wal(&mut self) -> Result<()> { async fn flush_wal(&mut self) -> Result<()> {
Ok(()) Ok(())
} }
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> { fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
Box::new(move |_segno_up_to: XLogSegNo| Ok(())) Box::pin(async { Ok(()) })
} }
fn get_metrics(&self) -> crate::metrics::WalStorageMetrics { fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
@@ -1042,8 +1052,8 @@ mod tests {
} }
} }
#[test] #[tokio::test]
fn test_voting() { async fn test_voting() {
let storage = InMemoryState { let storage = InMemoryState {
persisted_state: test_sk_state(), persisted_state: test_sk_state(),
}; };
@@ -1052,7 +1062,7 @@ mod tests {
// check voting for 1 is ok // check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let mut vote_resp = sk.process_msg(&vote_request); let mut vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() { match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
r => panic!("unexpected response: {:?}", r), r => panic!("unexpected response: {:?}", r),
@@ -1067,15 +1077,15 @@ mod tests {
sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap(); sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
// and ensure voting second time for 1 is not ok // and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request); vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() { match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
r => panic!("unexpected response: {:?}", r), r => panic!("unexpected response: {:?}", r),
} }
} }
#[test] #[tokio::test]
fn test_epoch_switch() { async fn test_epoch_switch() {
let storage = InMemoryState { let storage = InMemoryState {
persisted_state: test_sk_state(), persisted_state: test_sk_state(),
}; };
@@ -1107,10 +1117,13 @@ mod tests {
timeline_start_lsn: Lsn(0), timeline_start_lsn: Lsn(0),
}; };
sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap(); .unwrap();
// check that AppendRequest before epochStartLsn doesn't switch epoch // check that AppendRequest before epochStartLsn doesn't switch epoch
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok()); assert!(resp.is_ok());
assert_eq!(sk.get_epoch(), 0); assert_eq!(sk.get_epoch(), 0);
@@ -1121,9 +1134,11 @@ mod tests {
h: ar_hdr, h: ar_hdr,
wal_data: Bytes::from_static(b"b"), wal_data: Bytes::from_static(b"b"),
}; };
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok()); assert!(resp.is_ok());
sk.wal_store.truncate_wal(Lsn(3)).unwrap(); // imitate the complete record at 3 %) sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
assert_eq!(sk.get_epoch(), 1); assert_eq!(sk.get_epoch(), 1);
} }
} }

View File

@@ -394,7 +394,7 @@ impl SafekeeperPostgresHandler {
// on this safekeeper itself. That's ok as (old) proposer will never be // on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL. // able to commit such WAL.
let stop_pos: Option<Lsn> = if self.is_walproposer_recovery() { let stop_pos: Option<Lsn> = if self.is_walproposer_recovery() {
let wal_end = tli.get_flush_lsn(); let wal_end = tli.get_flush_lsn().await;
Some(wal_end) Some(wal_end)
} else { } else {
None None
@@ -409,7 +409,7 @@ impl SafekeeperPostgresHandler {
// switch to copy // switch to copy
pgb.write_message(&BeMessage::CopyBothResponse).await?; pgb.write_message(&BeMessage::CopyBothResponse).await?;
let (_, persisted_state) = tli.get_state(); let (_, persisted_state) = tli.get_state().await;
let wal_reader = WalReader::new( let wal_reader = WalReader::new(
self.conf.workdir.clone(), self.conf.workdir.clone(),
self.conf.timeline_dir(&tli.ttid), self.conf.timeline_dir(&tli.ttid),
@@ -540,7 +540,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
.walsenders .walsenders
.get_ws_remote_consistent_lsn(self.ws_guard.id) .get_ws_remote_consistent_lsn(self.ws_guard.id)
{ {
if self.tli.should_walsender_stop(remote_consistent_lsn) { if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send. // Terminate if there is nothing more to send.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!( return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes", "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",

View File

@@ -2,12 +2,13 @@
//! to glue together SafeKeeper and all other background services. //! to glue together SafeKeeper and all other background services.
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::XLogSegNo; use postgres_ffi::XLogSegNo;
use tokio::fs;
use std::cmp::max; use std::cmp::max;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{ use tokio::{
sync::{mpsc::Sender, watch}, sync::{mpsc::Sender, watch},
time::Instant, time::Instant,
@@ -287,8 +288,9 @@ pub struct Timeline {
commit_lsn_watch_tx: watch::Sender<Lsn>, commit_lsn_watch_tx: watch::Sender<Lsn>,
commit_lsn_watch_rx: watch::Receiver<Lsn>, commit_lsn_watch_rx: watch::Receiver<Lsn>,
/// Safekeeper and other state, that should remain consistent and synchronized /// Safekeeper and other state, that should remain consistent and
/// with the disk. /// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
mutex: Mutex<SharedState>, mutex: Mutex<SharedState>,
walsenders: Arc<WalSenders>, walsenders: Arc<WalSenders>,
@@ -301,6 +303,9 @@ pub struct Timeline {
/// Directory where timeline state is stored. /// Directory where timeline state is stored.
pub timeline_dir: PathBuf, pub timeline_dir: PathBuf,
/// Data prepared for collection by synchronous prometheus scraper.
metrics_data: std::sync::Mutex<Option<FullTimelineInfo>>,
} }
impl Timeline { impl Timeline {
@@ -328,6 +333,7 @@ impl Timeline {
cancellation_rx, cancellation_rx,
cancellation_tx, cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid), timeline_dir: conf.timeline_dir(&ttid),
metrics_data: std::sync::Mutex::new(None),
}) })
} }
@@ -354,6 +360,7 @@ impl Timeline {
cancellation_rx, cancellation_rx,
cancellation_tx, cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid), timeline_dir: conf.timeline_dir(&ttid),
metrics_data: std::sync::Mutex::new(None),
}) })
} }
@@ -362,8 +369,8 @@ impl Timeline {
/// ///
/// Bootstrap is transactional, so if it fails, created files will be deleted, /// Bootstrap is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged. /// and state on disk should remain unchanged.
pub fn bootstrap(&self, shared_state: &mut MutexGuard<SharedState>) -> Result<()> { pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
match std::fs::metadata(&self.timeline_dir) { match fs::metadata(&self.timeline_dir).await {
Ok(_) => { Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged // Timeline directory exists on disk, we should leave state unchanged
// and return error. // and return error.
@@ -376,53 +383,51 @@ impl Timeline {
} }
// Create timeline directory. // Create timeline directory.
std::fs::create_dir_all(&self.timeline_dir)?; fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and TODO: start background tasks. // Write timeline to disk and TODO: start background tasks.
match || -> Result<()> { if let Err(e) = shared_state.sk.persist().await {
shared_state.sk.persist()?; // Bootstrap failed, cancel timeline and remove timeline directory.
// TODO: add more initialization steps here self.cancel(shared_state);
self.update_status(shared_state);
Ok(())
}() {
Ok(_) => Ok(()),
Err(e) => {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) { if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
warn!( warn!(
"failed to remove timeline {} directory after bootstrap failure: {}", "failed to remove timeline {} directory after bootstrap failure: {}",
self.ttid, fs_err self.ttid, fs_err
); );
}
Err(e)
} }
return Err(e);
} }
// TODO: add more initialization steps here
self.update_status(shared_state);
Ok(())
} }
/// Delete timeline from disk completely, by removing timeline directory. Background /// Delete timeline from disk completely, by removing timeline directory. Background
/// timeline activities will stop eventually. /// timeline activities will stop eventually.
pub fn delete_from_disk( pub async fn delete_from_disk(
&self, &self,
shared_state: &mut MutexGuard<SharedState>, shared_state: &mut MutexGuard<'_, SharedState>,
) -> Result<(bool, bool)> { ) -> Result<(bool, bool)> {
let was_active = shared_state.active; let was_active = shared_state.active;
self.cancel(shared_state); self.cancel(shared_state);
let dir_existed = delete_dir(&self.timeline_dir)?; let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok((dir_existed, was_active)) Ok((dir_existed, was_active))
} }
/// Cancel timeline to prevent further usage. Background tasks will stop /// Cancel timeline to prevent further usage. Background tasks will stop
/// eventually after receiving cancellation signal. /// eventually after receiving cancellation signal.
fn cancel(&self, shared_state: &mut MutexGuard<SharedState>) { ///
/// Note that we can't notify backup launcher here while holding
/// shared_state lock, as this is a potential deadlock: caller is
/// responsible for that. Generally we should probably make WAL backup tasks
/// to shut down on their own, checking once in a while whether it is the
/// time.
fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
info!("timeline {} is cancelled", self.ttid); info!("timeline {} is cancelled", self.ttid);
let _ = self.cancellation_tx.send(true); let _ = self.cancellation_tx.send(true);
let res = self.wal_backup_launcher_tx.blocking_send(self.ttid);
if let Err(e) = res {
error!("Failed to send stop signal to wal_backup_launcher: {}", e);
}
// Close associated FDs. Nobody will be able to touch timeline data once // Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again. // it is cancelled, so WAL storage won't be opened again.
shared_state.sk.wal_store.close(); shared_state.sk.wal_store.close();
@@ -434,8 +439,8 @@ impl Timeline {
} }
/// Take a writing mutual exclusive lock on timeline shared_state. /// Take a writing mutual exclusive lock on timeline shared_state.
pub fn write_shared_state(&self) -> MutexGuard<SharedState> { pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock() self.mutex.lock().await
} }
fn update_status(&self, shared_state: &mut SharedState) -> bool { fn update_status(&self, shared_state: &mut SharedState) -> bool {
@@ -451,7 +456,7 @@ impl Timeline {
let is_wal_backup_action_pending: bool; let is_wal_backup_action_pending: bool;
{ {
let mut shared_state = self.write_shared_state(); let mut shared_state = self.write_shared_state().await;
shared_state.num_computes += 1; shared_state.num_computes += 1;
is_wal_backup_action_pending = self.update_status(&mut shared_state); is_wal_backup_action_pending = self.update_status(&mut shared_state);
} }
@@ -465,22 +470,17 @@ impl Timeline {
/// De-register compute connection, shutting down timeline activity if /// De-register compute connection, shutting down timeline activity if
/// pageserver doesn't need catchup. /// pageserver doesn't need catchup.
pub fn on_compute_disconnect(&self) -> Result<()> { pub async fn on_compute_disconnect(&self) -> Result<()> {
let is_wal_backup_action_pending: bool; let is_wal_backup_action_pending: bool;
{ {
let mut shared_state = self.write_shared_state(); let mut shared_state = self.write_shared_state().await;
shared_state.num_computes -= 1; shared_state.num_computes -= 1;
is_wal_backup_action_pending = self.update_status(&mut shared_state); is_wal_backup_action_pending = self.update_status(&mut shared_state);
} }
// Wake up wal backup launcher, if it is time to stop the offloading. // Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending { if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all. // Can fail only if channel to a static thread got closed, which is not normal at all.
// self.wal_backup_launcher_tx.send(self.ttid).await?;
// Note: this is blocking_send because on_compute_disconnect is called in Drop, there is
// no async Drop and we use current thread runtimes. With current thread rt spawning
// task in drop impl is racy, as thread along with runtime might finish before the task.
// This should be switched send.await when/if we go to full async.
self.wal_backup_launcher_tx.blocking_send(self.ttid)?;
} }
Ok(()) Ok(())
} }
@@ -490,11 +490,11 @@ impl Timeline {
/// computes. While there might be nothing to stream already, we learn about /// computes. While there might be nothing to stream already, we learn about
/// remote_consistent_lsn update through replication feedback, and we want /// remote_consistent_lsn update through replication feedback, and we want
/// to stop pushing to the broker if pageserver is fully caughtup. /// to stop pushing to the broker if pageserver is fully caughtup.
pub fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
if self.is_cancelled() { if self.is_cancelled() {
return true; return true;
} }
let shared_state = self.write_shared_state(); let shared_state = self.write_shared_state().await;
if shared_state.num_computes == 0 { if shared_state.num_computes == 0 {
return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn; reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
@@ -504,12 +504,12 @@ impl Timeline {
/// Returns whether s3 offloading is required and sets current status as /// Returns whether s3 offloading is required and sets current status as
/// matching it. /// matching it.
pub fn wal_backup_attend(&self) -> bool { pub async fn wal_backup_attend(&self) -> bool {
if self.is_cancelled() { if self.is_cancelled() {
return false; return false;
} }
self.write_shared_state().wal_backup_attend() self.write_shared_state().await.wal_backup_attend()
} }
/// Returns commit_lsn watch channel. /// Returns commit_lsn watch channel.
@@ -518,7 +518,7 @@ impl Timeline {
} }
/// Pass arrived message to the safekeeper. /// Pass arrived message to the safekeeper.
pub fn process_msg( pub async fn process_msg(
&self, &self,
msg: &ProposerAcceptorMessage, msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> { ) -> Result<Option<AcceptorProposerMessage>> {
@@ -529,8 +529,8 @@ impl Timeline {
let mut rmsg: Option<AcceptorProposerMessage>; let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn; let commit_lsn: Lsn;
{ {
let mut shared_state = self.write_shared_state(); let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg)?; rmsg = shared_state.sk.process_msg(msg).await?;
// if this is AppendResponse, fill in proper pageserver and hot // if this is AppendResponse, fill in proper pageserver and hot
// standby feedback. // standby feedback.
@@ -547,37 +547,37 @@ impl Timeline {
} }
/// Returns wal_seg_size. /// Returns wal_seg_size.
pub fn get_wal_seg_size(&self) -> usize { pub async fn get_wal_seg_size(&self) -> usize {
self.write_shared_state().get_wal_seg_size() self.write_shared_state().await.get_wal_seg_size()
} }
/// Returns true only if the timeline is loaded and active. /// Returns true only if the timeline is loaded and active.
pub fn is_active(&self) -> bool { pub async fn is_active(&self) -> bool {
if self.is_cancelled() { if self.is_cancelled() {
return false; return false;
} }
self.write_shared_state().active self.write_shared_state().await.active
} }
/// Returns state of the timeline. /// Returns state of the timeline.
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
let state = self.write_shared_state(); let state = self.write_shared_state().await;
(state.sk.inmem.clone(), state.sk.state.clone()) (state.sk.inmem.clone(), state.sk.state.clone())
} }
/// Returns latest backup_lsn. /// Returns latest backup_lsn.
pub fn get_wal_backup_lsn(&self) -> Lsn { pub async fn get_wal_backup_lsn(&self) -> Lsn {
self.write_shared_state().sk.inmem.backup_lsn self.write_shared_state().await.sk.inmem.backup_lsn
} }
/// Sets backup_lsn to the given value. /// Sets backup_lsn to the given value.
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
if self.is_cancelled() { if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid)); bail!(TimelineError::Cancelled(self.ttid));
} }
let mut state = self.write_shared_state(); let mut state = self.write_shared_state().await;
state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn); state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
// we should check whether to shut down offloader, but this will be done // we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway. // soon by peer communication anyway.
@@ -585,8 +585,8 @@ impl Timeline {
} }
/// Get safekeeper info for broadcasting to broker and other peers. /// Get safekeeper info for broadcasting to broker and other peers.
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo { pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state(); let shared_state = self.write_shared_state().await;
shared_state.get_safekeeper_info( shared_state.get_safekeeper_info(
&self.ttid, &self.ttid,
conf, conf,
@@ -605,8 +605,8 @@ impl Timeline {
let is_wal_backup_action_pending: bool; let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn; let commit_lsn: Lsn;
{ {
let mut shared_state = self.write_shared_state(); let mut shared_state = self.write_shared_state().await;
shared_state.sk.record_safekeeper_info(&sk_info)?; shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now()); let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info); shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = self.update_status(&mut shared_state); is_wal_backup_action_pending = self.update_status(&mut shared_state);
@@ -623,8 +623,8 @@ impl Timeline {
/// Get our latest view of alive peers status on the timeline. /// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection /// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty. /// to the broker returned vec is empty.
pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> { pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state(); let shared_state = self.write_shared_state().await;
let now = Instant::now(); let now = Instant::now();
shared_state shared_state
.peers_info .peers_info
@@ -641,47 +641,47 @@ impl Timeline {
} }
/// Returns flush_lsn. /// Returns flush_lsn.
pub fn get_flush_lsn(&self) -> Lsn { pub async fn get_flush_lsn(&self) -> Lsn {
self.write_shared_state().sk.wal_store.flush_lsn() self.write_shared_state().await.sk.wal_store.flush_lsn()
} }
/// Delete WAL segments from disk that are no longer needed. This is determined /// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn. /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
if self.is_cancelled() { if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid)); bail!(TimelineError::Cancelled(self.ttid));
} }
let horizon_segno: XLogSegNo; let horizon_segno: XLogSegNo;
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>; let remover = {
{ let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state();
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
remover = shared_state.sk.wal_store.remove_up_to();
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(()); return Ok(()); // nothing to do
} }
let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
// release the lock before removing // release the lock before removing
} remover
};
// delete old WAL files // delete old WAL files
remover(horizon_segno - 1)?; remover.await?;
// update last_removed_segno // update last_removed_segno
let mut shared_state = self.write_shared_state(); let mut shared_state = self.write_shared_state().await;
shared_state.last_removed_segno = horizon_segno; shared_state.last_removed_segno = horizon_segno;
Ok(()) Ok(())
} }
/// Returns full timeline info, required for the metrics. If the timeline is /// Gather timeline data for metrics. If the timeline is not active, returns
/// not active, returns None instead. /// None, we do not collect these.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> { async fn gather_info_for_metrics(&self) -> Option<FullTimelineInfo> {
if self.is_cancelled() { if self.is_cancelled() {
return None; return None;
} }
let ps_feedback = self.walsenders.get_ps_feedback(); let ps_feedback = self.walsenders.get_ps_feedback();
let state = self.write_shared_state(); let state = self.write_shared_state().await;
if state.active { if state.active {
Some(FullTimelineInfo { Some(FullTimelineInfo {
ttid: self.ttid, ttid: self.ttid,
@@ -702,9 +702,20 @@ impl Timeline {
} }
} }
/// Gathers timeline data for metrics and puts it into .metrics_data
pub async fn set_info_for_metrics(&self) {
let metrics_data = self.gather_info_for_metrics().await;
*self.metrics_data.lock().unwrap() = metrics_data;
}
/// Synchronous method returning current metrics data.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
self.metrics_data.lock().unwrap().clone()
}
/// Returns in-memory timeline state to build a full debug dump. /// Returns in-memory timeline state to build a full debug dump.
pub fn memory_dump(&self) -> debug_dump::Memory { pub async fn memory_dump(&self) -> debug_dump::Memory {
let state = self.write_shared_state(); let state = self.write_shared_state().await;
let (write_lsn, write_record_lsn, flush_lsn, file_open) = let (write_lsn, write_record_lsn, flush_lsn, file_open) =
state.sk.wal_store.internal_state(); state.sk.wal_store.internal_state();
@@ -728,8 +739,8 @@ impl Timeline {
} }
/// Deletes directory and it's contents. Returns false if directory does not exist. /// Deletes directory and it's contents. Returns false if directory does not exist.
fn delete_dir(path: &PathBuf) -> Result<bool> { async fn delete_dir(path: &PathBuf) -> Result<bool> {
match std::fs::remove_dir_all(path) { match fs::remove_dir_all(path).await {
Ok(_) => Ok(true), Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(e) => Err(e.into()), Err(e) => Err(e.into()),

View File

@@ -113,9 +113,17 @@ impl GlobalTimelines {
Ok(()) Ok(())
} }
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any. /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any.
///
/// Note: This function (and all reading/loading below) is sync because
/// timelines are loaded while holding GlobalTimelinesState lock. Which is
/// fine as this is called only from single threaded main runtime on boot,
/// but clippy complains anyway, and suppressing that isn't trivial as async
/// is the keyword, ha. That only other user is pull_timeline.rs for which
/// being blocked is not that bad, and we can do spawn_blocking.
fn load_tenant_timelines( fn load_tenant_timelines(
state: &mut MutexGuard<GlobalTimelinesState>, state: &mut MutexGuard<'_, GlobalTimelinesState>,
tenant_id: TenantId, tenant_id: TenantId,
) -> Result<()> { ) -> Result<()> {
let timelines_dir = state.get_conf().tenant_dir(&tenant_id); let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
@@ -220,7 +228,7 @@ impl GlobalTimelines {
// Take a lock and finish the initialization holding this mutex. No other threads // Take a lock and finish the initialization holding this mutex. No other threads
// can interfere with creation after we will insert timeline into the map. // can interfere with creation after we will insert timeline into the map.
{ {
let mut shared_state = timeline.write_shared_state(); let mut shared_state = timeline.write_shared_state().await;
// We can get a race condition here in case of concurrent create calls, but only // We can get a race condition here in case of concurrent create calls, but only
// in theory. create() will return valid timeline on the next try. // in theory. create() will return valid timeline on the next try.
@@ -232,7 +240,7 @@ impl GlobalTimelines {
// Write the new timeline to the disk and start background workers. // Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted, // Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged. // and the state on disk should remain unchanged.
if let Err(e) = timeline.bootstrap(&mut shared_state) { if let Err(e) = timeline.bootstrap(&mut shared_state).await {
// Note: the most likely reason for bootstrap failure is that the timeline // Note: the most likely reason for bootstrap failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted // directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve // and wasn't loaded from disk on startup because of that. We want to preserve
@@ -294,15 +302,16 @@ impl GlobalTimelines {
} }
/// Cancels timeline, then deletes the corresponding data directory. /// Cancels timeline, then deletes the corresponding data directory.
pub fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> { pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid); let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res { match tli_res {
Ok(timeline) => { Ok(timeline) => {
// Take a lock and finish the deletion holding this mutex. // Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state(); let mut shared_state = timeline.write_shared_state().await;
info!("deleting timeline {}", ttid); info!("deleting timeline {}", ttid);
let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?; let (dir_existed, was_active) =
timeline.delete_from_disk(&mut shared_state).await?;
// Remove timeline from the map. // Remove timeline from the map.
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
@@ -335,7 +344,7 @@ impl GlobalTimelines {
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
/// created simultaneously. In that case the function will return error and the caller should /// created simultaneously. In that case the function will return error and the caller should
/// retry tenant deletion again later. /// retry tenant deletion again later.
pub fn delete_force_all_for_tenant( pub async fn delete_force_all_for_tenant(
tenant_id: &TenantId, tenant_id: &TenantId,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> { ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
info!("deleting all timelines for tenant {}", tenant_id); info!("deleting all timelines for tenant {}", tenant_id);
@@ -345,7 +354,7 @@ impl GlobalTimelines {
let mut deleted = HashMap::new(); let mut deleted = HashMap::new();
for tli in &to_delete { for tli in &to_delete {
match Self::delete_force(&tli.ttid) { match Self::delete_force(&tli.ttid).await {
Ok(result) => { Ok(result) => {
deleted.insert(tli.ttid, result); deleted.insert(tli.ttid, result);
} }

View File

@@ -15,7 +15,6 @@ use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI}; use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemotePath}; use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::fs::File; use tokio::fs::File;
use tokio::runtime::Builder;
use tokio::select; use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -33,30 +32,16 @@ use once_cell::sync::OnceCell;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
pub fn wal_backup_launcher_thread_main(
conf: SafeKeeperConf,
wal_backup_launcher_rx: Receiver<TenantTimelineId>,
) {
let mut builder = Builder::new_multi_thread();
if let Some(num_threads) = conf.backup_runtime_threads {
builder.worker_threads(num_threads);
}
let rt = builder
.enable_all()
.build()
.expect("failed to create wal backup runtime");
rt.block_on(async {
wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await;
});
}
/// Check whether wal backup is required for timeline. If yes, mark that launcher is /// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline. /// aware of current status and return the timeline.
fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> { async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
GlobalTimelines::get(ttid) match GlobalTimelines::get(ttid).ok() {
.ok() Some(tli) => {
.filter(|tli| tli.wal_backup_attend()) tli.wal_backup_attend().await;
Some(tli)
}
None => None,
}
} }
struct WalBackupTaskHandle { struct WalBackupTaskHandle {
@@ -140,8 +125,8 @@ async fn update_task(
ttid: TenantTimelineId, ttid: TenantTimelineId,
entry: &mut WalBackupTimelineEntry, entry: &mut WalBackupTimelineEntry,
) { ) {
let alive_peers = entry.timeline.get_peers(conf); let alive_peers = entry.timeline.get_peers(conf).await;
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn(); let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
let (offloader, election_dbg_str) = let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf); determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader; let elected_me = Some(conf.my_id) == offloader;
@@ -174,10 +159,10 @@ const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap /// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself. /// panics and separate elections from offloading itself.
async fn wal_backup_launcher_main_loop( pub async fn wal_backup_launcher_task_main(
conf: SafeKeeperConf, conf: SafeKeeperConf,
mut wal_backup_launcher_rx: Receiver<TenantTimelineId>, mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
) { ) -> anyhow::Result<()> {
info!( info!(
"WAL backup launcher started, remote config {:?}", "WAL backup launcher started, remote config {:?}",
conf.remote_storage conf.remote_storage
@@ -205,7 +190,7 @@ async fn wal_backup_launcher_main_loop(
if conf.remote_storage.is_none() || !conf.wal_backup_enabled { if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */ continue; /* just drain the channel and do nothing */
} }
let timeline = is_wal_backup_required(ttid); let timeline = is_wal_backup_required(ttid).await;
// do we need to do anything at all? // do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&ttid) { if timeline.is_some() != tasks.contains_key(&ttid) {
if let Some(timeline) = timeline { if let Some(timeline) = timeline {
@@ -258,7 +243,7 @@ async fn backup_task_main(
let tli = res.unwrap(); let tli = res.unwrap();
let mut wb = WalBackupTask { let mut wb = WalBackupTask {
wal_seg_size: tli.get_wal_seg_size(), wal_seg_size: tli.get_wal_seg_size().await,
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli, timeline: tli,
timeline_dir, timeline_dir,
@@ -314,7 +299,7 @@ impl WalBackupTask {
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
} }
// Perhaps peers advanced the position, check shmem value. // Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn(); backup_lsn = self.timeline.get_wal_backup_lsn().await;
if backup_lsn.segment_number(self.wal_seg_size) if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size) >= commit_lsn.segment_number(self.wal_seg_size)
{ {
@@ -366,6 +351,7 @@ pub async fn backup_lsn_range(
let new_backup_lsn = s.end_lsn; let new_backup_lsn = s.end_lsn;
timeline timeline
.set_wal_backup_lsn(new_backup_lsn) .set_wal_backup_lsn(new_backup_lsn)
.await
.context("setting wal_backup_lsn")?; .context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn; *backup_lsn = new_backup_lsn;
} }

View File

@@ -4,7 +4,7 @@
//! //!
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use postgres_backend::QueryError; use postgres_backend::QueryError;
use std::{future, thread, time::Duration}; use std::{future, time::Duration};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader; use tokio_io_timeout::TimeoutReader;
use tracing::*; use tracing::*;
@@ -16,104 +16,82 @@ use crate::SafeKeeperConf;
use postgres_backend::{AuthType, PostgresBackend}; use postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread. /// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: SafeKeeperConf, pg_listener: std::net::TcpListener) { pub async fn task_main(
let runtime = tokio::runtime::Builder::new_current_thread() conf: SafeKeeperConf,
.enable_all() pg_listener: std::net::TcpListener,
.build() ) -> anyhow::Result<()> {
.context("create runtime") // Tokio's from_std won't do this for us, per its comment.
// todo catch error in main thread pg_listener.set_nonblocking(true)?;
.expect("failed to create runtime");
runtime let listener = tokio::net::TcpListener::from_std(pg_listener)?;
.block_on(async move { let mut connection_count: ConnectionCount = 0;
// Tokio's from_std won't do this for us, per its comment.
pg_listener.set_nonblocking(true)?;
let listener = tokio::net::TcpListener::from_std(pg_listener)?;
let mut connection_count: ConnectionCount = 0;
loop { loop {
match listener.accept().await { let (socket, peer_addr) = listener.accept().await.context("accept")?;
Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr);
debug!("accepted connection from {}", peer_addr); let conf = conf.clone();
let conf = conf.clone(); let conn_id = issue_connection_id(&mut connection_count);
let conn_id = issue_connection_id(&mut connection_count);
let _ = thread::Builder::new() tokio::spawn(async move {
.name("WAL service thread".into()) if let Err(err) = handle_socket(socket, conf, conn_id)
.spawn(move || { .instrument(info_span!("", cid = %conn_id))
if let Err(err) = handle_socket(socket, conf, conn_id) { .await
error!("connection handler exited: {}", err); {
} error!("connection handler exited: {}", err);
})
.unwrap();
}
Err(e) => error!("Failed to accept connection: {}", e),
}
} }
#[allow(unreachable_code)] // hint compiler the closure return type });
Ok::<(), anyhow::Error>(()) }
})
.expect("listener failed")
} }
/// This is run by `thread_main` above, inside a background thread. /// This is run by `task_main` above, inside a background thread.
/// ///
fn handle_socket( async fn handle_socket(
socket: TcpStream, socket: TcpStream,
conf: SafeKeeperConf, conf: SafeKeeperConf,
conn_id: ConnectionId, conn_id: ConnectionId,
) -> Result<(), QueryError> { ) -> Result<(), QueryError> {
let _enter = info_span!("", cid = %conn_id).entered();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
socket.set_nodelay(true)?; socket.set_nodelay(true)?;
let peer_addr = socket.peer_addr()?; let peer_addr = socket.peer_addr()?;
// TimeoutReader wants async runtime during creation. // Set timeout on reading from the socket. It prevents hanged up connection
runtime.block_on(async move { // if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
// Set timeout on reading from the socket. It prevents hanged up connection // default, and tokio doesn't provide ability to set it out of the box.
// if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by let mut socket = TimeoutReader::new(socket);
// default, and tokio doesn't provide ability to set it out of the box. let wal_service_timeout = Duration::from_secs(60 * 10);
let mut socket = TimeoutReader::new(socket); socket.set_timeout(Some(wal_service_timeout));
let wal_service_timeout = Duration::from_secs(60 * 10); // pin! is here because TimeoutReader (due to storing sleep future inside)
socket.set_timeout(Some(wal_service_timeout)); // is not Unpin, and all pgbackend/framed/tokio dependencies require stream
// pin! is here because TimeoutReader (due to storing sleep future inside) // to be Unpin. Which is reasonable, as indeed something like TimeoutReader
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream // shouldn't be moved.
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader tokio::pin!(socket);
// shouldn't be moved.
tokio::pin!(socket);
let traffic_metrics = TrafficMetrics::new(); let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() { if let Some(current_az) = conf.availability_zone.as_deref() {
traffic_metrics.set_sk_az(current_az); traffic_metrics.set_sk_az(current_az);
} }
let socket = MeasuredStream::new( let socket = MeasuredStream::new(
socket, socket,
|cnt| { |cnt| {
traffic_metrics.observe_read(cnt); traffic_metrics.observe_read(cnt);
}, },
|cnt| { |cnt| {
traffic_metrics.observe_write(cnt); traffic_metrics.observe_write(cnt);
}, },
); );
let auth_type = match conf.auth { let auth_type = match conf.auth {
None => AuthType::Trust, None => AuthType::Trust,
Some(_) => AuthType::NeonJWT, Some(_) => AuthType::NeonJWT,
}; };
let mut conn_handler = let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone())); SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver // libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown. // We don't use shutdown.
pgbackend pgbackend
.run(&mut conn_handler, future::pending::<()>) .run(&mut conn_handler, future::pending::<()>)
.await .await
})
} }
/// Unique WAL service connection ids are logged in spans for observability. /// Unique WAL service connection ids are logged in spans for observability.

View File

@@ -8,54 +8,47 @@
//! Note that last file has `.partial` suffix, that's different from postgres. //! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use remote_storage::RemotePath; use bytes::Bytes;
use futures::future::BoxFuture;
use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName}; use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{XLogSegNo, PG_TLI}; use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::io::{self, SeekFrom};
use bytes::Bytes;
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin;
use tokio::fs::{self, remove_file, File, OpenOptions};
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tracing::*; use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{time_io_closure, WalStorageMetrics}; use crate::metrics::{time_io_closure, WalStorageMetrics};
use crate::safekeeper::SafeKeeperState; use crate::safekeeper::SafeKeeperState;
use crate::wal_backup::read_object; use crate::wal_backup::read_object;
use crate::SafeKeeperConf; use crate::SafeKeeperConf;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::XLogFileName; use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ; use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::SystemId; use pq_proto::SystemId;
use tokio::io::{AsyncReadExt, AsyncSeekExt}; use utils::{id::TenantTimelineId, lsn::Lsn};
#[async_trait::async_trait]
pub trait Storage { pub trait Storage {
/// LSN of last durably stored WAL record. /// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn; fn flush_lsn(&self) -> Lsn;
/// Write piece of WAL from buf to disk, but not necessarily sync it. /// Write piece of WAL from buf to disk, but not necessarily sync it.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
/// Truncate WAL at specified LSN, which must be the end of WAL record. /// Truncate WAL at specified LSN, which must be the end of WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>; async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
/// Durably store WAL on disk, up to the last written WAL record. /// Durably store WAL on disk, up to the last written WAL record.
fn flush_wal(&mut self) -> Result<()>; async fn flush_wal(&mut self) -> Result<()>;
/// Remove all segments <= given segno. Returns closure as we want to do /// Remove all segments <= given segno. Returns function doing that as we
/// that without timeline lock. /// want to perform it without timeline lock.
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>>; fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
/// Release resources associated with the storage -- technically, close FDs. /// Release resources associated with the storage -- technically, close FDs.
/// Currently we don't remove timelines until restart (#3146), so need to /// Currently we don't remove timelines until restart (#3146), so need to
@@ -178,33 +171,37 @@ impl PhysicalStorage {
} }
/// Call fdatasync if config requires so. /// Call fdatasync if config requires so.
fn fdatasync_file(&mut self, file: &mut File) -> Result<()> { async fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
if !self.conf.no_sync { if !self.conf.no_sync {
self.metrics self.metrics
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_data()?))?); .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
} }
Ok(()) Ok(())
} }
/// Call fsync if config requires so. /// Call fsync if config requires so.
fn fsync_file(&mut self, file: &mut File) -> Result<()> { async fn fsync_file(&mut self, file: &mut File) -> Result<()> {
if !self.conf.no_sync { if !self.conf.no_sync {
self.metrics self.metrics
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_all()?))?); .observe_flush_seconds(time_io_closure(file.sync_all()).await?);
} }
Ok(()) Ok(())
} }
/// Open or create WAL segment file. Caller must call seek to the wanted position. /// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`. /// Returns `file` and `is_partial`.
fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> { async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
let (wal_file_path, wal_file_partial_path) = let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
// Try to open already completed segment // Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
Ok((file, false)) Ok((file, false))
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) { } else if let Ok(file) = OpenOptions::new()
.write(true)
.open(&wal_file_partial_path)
.await
{
// Try to open existing partial file // Try to open existing partial file
Ok((file, true)) Ok((file, true))
} else { } else {
@@ -213,35 +210,36 @@ impl PhysicalStorage {
.create(true) .create(true)
.write(true) .write(true)
.open(&wal_file_partial_path) .open(&wal_file_partial_path)
.await
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?; .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
write_zeroes(&mut file, self.wal_seg_size)?; write_zeroes(&mut file, self.wal_seg_size).await?;
self.fsync_file(&mut file)?; self.fsync_file(&mut file).await?;
Ok((file, true)) Ok((file, true))
} }
} }
/// Write WAL bytes, which are known to be located in a single WAL segment. /// Write WAL bytes, which are known to be located in a single WAL segment.
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() { let mut file = if let Some(file) = self.file.take() {
file file
} else { } else {
let (mut file, is_partial) = self.open_or_create(segno)?; let (mut file, is_partial) = self.open_or_create(segno).await?;
assert!(is_partial, "unexpected write into non-partial segment file"); assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64))?; file.seek(SeekFrom::Start(xlogoff as u64)).await?;
file file
}; };
file.write_all(buf)?; file.write_all(buf).await?;
if xlogoff + buf.len() == self.wal_seg_size { if xlogoff + buf.len() == self.wal_seg_size {
// If we reached the end of a WAL segment, flush and close it. // If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&mut file)?; self.fdatasync_file(&mut file).await?;
// Rename partial file to completed file // Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) = let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path)?; fs::rename(wal_file_partial_path, wal_file_path).await?;
} else { } else {
// otherwise, file can be reused later // otherwise, file can be reused later
self.file = Some(file); self.file = Some(file);
@@ -255,11 +253,11 @@ impl PhysicalStorage {
/// be flushed separately later. /// be flushed separately later.
/// ///
/// Updates `write_lsn`. /// Updates `write_lsn`.
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
if self.write_lsn != pos { if self.write_lsn != pos {
// need to flush the file before discarding it // need to flush the file before discarding it
if let Some(mut file) = self.file.take() { if let Some(mut file) = self.file.take() {
self.fdatasync_file(&mut file)?; self.fdatasync_file(&mut file).await?;
} }
self.write_lsn = pos; self.write_lsn = pos;
@@ -277,7 +275,8 @@ impl PhysicalStorage {
buf.len() buf.len()
}; };
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?; self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
.await?;
self.write_lsn += bytes_write as u64; self.write_lsn += bytes_write as u64;
buf = &buf[bytes_write..]; buf = &buf[bytes_write..];
} }
@@ -286,6 +285,7 @@ impl PhysicalStorage {
} }
} }
#[async_trait::async_trait]
impl Storage for PhysicalStorage { impl Storage for PhysicalStorage {
/// flush_lsn returns LSN of last durably stored WAL record. /// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn { fn flush_lsn(&self) -> Lsn {
@@ -293,7 +293,7 @@ impl Storage for PhysicalStorage {
} }
/// Write WAL to disk. /// Write WAL to disk.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites. // Disallow any non-sequential writes, which can result in gaps or overwrites.
// If we need to move the pointer, use truncate_wal() instead. // If we need to move the pointer, use truncate_wal() instead.
if self.write_lsn > startpos { if self.write_lsn > startpos {
@@ -311,7 +311,7 @@ impl Storage for PhysicalStorage {
); );
} }
let write_seconds = time_io_closure(|| self.write_exact(startpos, buf))?; let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
// WAL is written, updating write metrics // WAL is written, updating write metrics
self.metrics.observe_write_seconds(write_seconds); self.metrics.observe_write_seconds(write_seconds);
self.metrics.observe_write_bytes(buf.len()); self.metrics.observe_write_bytes(buf.len());
@@ -340,14 +340,14 @@ impl Storage for PhysicalStorage {
Ok(()) Ok(())
} }
fn flush_wal(&mut self) -> Result<()> { async fn flush_wal(&mut self) -> Result<()> {
if self.flush_record_lsn == self.write_record_lsn { if self.flush_record_lsn == self.write_record_lsn {
// no need to do extra flush // no need to do extra flush
return Ok(()); return Ok(());
} }
if let Some(mut unflushed_file) = self.file.take() { if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?; self.fdatasync_file(&mut unflushed_file).await?;
self.file = Some(unflushed_file); self.file = Some(unflushed_file);
} else { } else {
// We have unflushed data (write_lsn != flush_lsn), but no file. // We have unflushed data (write_lsn != flush_lsn), but no file.
@@ -369,7 +369,7 @@ impl Storage for PhysicalStorage {
/// Truncate written WAL by removing all WAL segments after the given LSN. /// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record. /// end_pos must point to the end of the WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
// Streaming must not create a hole, so truncate cannot be called on non-written lsn // Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn { if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
bail!( bail!(
@@ -381,27 +381,27 @@ impl Storage for PhysicalStorage {
// Close previously opened file, if any // Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() { if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?; self.fdatasync_file(&mut unflushed_file).await?;
} }
let xlogoff = end_pos.segment_offset(self.wal_seg_size); let xlogoff = end_pos.segment_offset(self.wal_seg_size);
let segno = end_pos.segment_number(self.wal_seg_size); let segno = end_pos.segment_number(self.wal_seg_size);
// Remove all segments after the given LSN. // Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?; remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
let (mut file, is_partial) = self.open_or_create(segno)?; let (mut file, is_partial) = self.open_or_create(segno).await?;
// Fill end with zeroes // Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64))?; file.seek(SeekFrom::Start(xlogoff as u64)).await?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?; write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
self.fdatasync_file(&mut file)?; self.fdatasync_file(&mut file).await?;
if !is_partial { if !is_partial {
// Make segment partial once again // Make segment partial once again
let (wal_file_path, wal_file_partial_path) = let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_path, wal_file_partial_path)?; fs::rename(wal_file_path, wal_file_partial_path).await?;
} }
// Update LSNs // Update LSNs
@@ -411,11 +411,11 @@ impl Storage for PhysicalStorage {
Ok(()) Ok(())
} }
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> { fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
let timeline_dir = self.timeline_dir.clone(); let timeline_dir = self.timeline_dir.clone();
let wal_seg_size = self.wal_seg_size; let wal_seg_size = self.wal_seg_size;
Box::new(move |segno_up_to: XLogSegNo| { Box::pin(async move {
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to) remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
}) })
} }
@@ -430,7 +430,7 @@ impl Storage for PhysicalStorage {
} }
/// Remove all WAL segments in timeline_dir that match the given predicate. /// Remove all WAL segments in timeline_dir that match the given predicate.
fn remove_segments_from_disk( async fn remove_segments_from_disk(
timeline_dir: &Path, timeline_dir: &Path,
wal_seg_size: usize, wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool, remove_predicate: impl Fn(XLogSegNo) -> bool,
@@ -439,8 +439,8 @@ fn remove_segments_from_disk(
let mut min_removed = u64::MAX; let mut min_removed = u64::MAX;
let mut max_removed = u64::MIN; let mut max_removed = u64::MIN;
for entry in fs::read_dir(timeline_dir)? { let mut entries = fs::read_dir(timeline_dir).await?;
let entry = entry?; while let Some(entry) = entries.next_entry().await? {
let entry_path = entry.path(); let entry_path = entry.path();
let fname = entry_path.file_name().unwrap(); let fname = entry_path.file_name().unwrap();
@@ -451,7 +451,7 @@ fn remove_segments_from_disk(
} }
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size); let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if remove_predicate(segno) { if remove_predicate(segno) {
remove_file(entry_path)?; remove_file(entry_path).await?;
n_removed += 1; n_removed += 1;
min_removed = min(min_removed, segno); min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno); max_removed = max(max_removed, segno);
@@ -682,12 +682,12 @@ impl WalReader {
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/// Helper for filling file with zeroes. /// Helper for filling file with zeroes.
fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
while count >= XLOG_BLCKSZ { while count >= XLOG_BLCKSZ {
file.write_all(ZERO_BLOCK)?; file.write_all(ZERO_BLOCK).await?;
count -= XLOG_BLCKSZ; count -= XLOG_BLCKSZ;
} }
file.write_all(&ZERO_BLOCK[0..count])?; file.write_all(&ZERO_BLOCK[0..count]).await?;
Ok(()) Ok(())
} }