Compare commits

..

1 Commits

Author SHA1 Message Date
Stas Kelvich
892723cc7e Local auth renew proxy 2025-03-22 03:31:54 +02:00
24 changed files with 774 additions and 625 deletions

View File

@@ -1,18 +1,19 @@
use crate::auth::{Claims, JwtAuth};
use crate::http::error;
use anyhow::Context;
use anyhow::{anyhow, Context};
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService};
use tokio::task::JoinError;
use tracing::{self, debug, info, info_span, warn, Instrument};
use std::future::Future;
use std::net::TcpListener;
use std::str::FromStr;
use super::error::ApiError;
@@ -340,6 +341,40 @@ pub fn check_permission_with(
}
}
///
/// Start listening for HTTP requests on given socket.
///
/// 'shutdown_future' can be used to stop. If the Future becomes
/// ready, we stop listening for new requests, and the function returns.
///
pub fn serve_thread_main<S>(
router_builder: RouterBuilder<hyper::Body, ApiError>,
listener: TcpListener,
shutdown_future: S,
) -> anyhow::Result<()>
where
S: Future<Output = ()> + Send + Sync,
{
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
// Create a Service from the router above to handle incoming requests.
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();
// Enter a single-threaded tokio runtime bound to the current thread
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let _guard = runtime.enter();
let server = Server::from_tcp(listener)?
.serve(service)
.with_graceful_shutdown(shutdown_future);
runtime.block_on(server)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -0,0 +1,183 @@
use std::{net::SocketAddr, sync::Arc};
use tokio::{io::AsyncWriteExt, net::TcpListener};
use anyhow::Context;
use clap::{self, Arg};
use futures::TryFutureExt;
use proxy::{
auth::{self, AuthFlow},
cancellation::CancelMap,
compute::ConnCfg,
console::messages::MetricsAuxInfo,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::config::SslMode;
use tokio_util::sync::CancellationToken;
use utils::project_git_version;
use tracing::{error, info, warn};
project_git_version!(GIT_VERSION);
fn cli() -> clap::Command {
clap::Command::new("Auth renew proxy")
.disable_help_flag(true)
.version(GIT_VERSION)
.arg(
Arg::new("listen")
.short('l')
.long("listen")
.help("listen for incoming client connections on ip:port")
.default_value("127.0.0.1:4432"),
)
.arg(
Arg::new("dest-host")
.long("dest-host")
.help("destination hosts")
.required(true),
)
.arg(
Arg::new("dest-port")
.long("dest-port")
.help("destination port")
.default_value("5432"),
)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let args = cli().get_matches();
let dest_host: String = args.get_one::<String>("dest-host").unwrap().parse()?;
let dest_port: u16 = args.get_one::<String>("dest-port").unwrap().parse()?;
let listen_address: SocketAddr = args.get_one::<String>("listen").unwrap().parse()?;
// Start listening for incoming client connections
info!("Starting proxy on {listen_address}");
let proxy_listener = TcpListener::bind(listen_address).await?;
let cancellation_token = CancellationToken::new();
let main = proxy::flatten_err(tokio::spawn(task_main(
Arc::new(dest_host),
dest_port,
proxy_listener,
cancellation_token.clone(),
)));
let signals_task = proxy::flatten_err(tokio::spawn(proxy::handle_signals(cancellation_token)));
tokio::select! {
res = main => { res?; },
res = signals_task => { res?; },
}
Ok(())
}
async fn task_main(
dest_host: Arc<String>,
dest_port: u16,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let mut connections = tokio::task::JoinSet::new();
let cancel_map = Arc::new(CancelMap::default());
loop {
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
info!("accepted postgres client connection from {peer_addr}");
let cancel_map = Arc::clone(&cancel_map);
let dest_host = Arc::clone(&dest_host);
connections.spawn(
async move {
info!("spawned a task for {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(dest_host, dest_port, &cancel_map, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
}),
);
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
}
// Drain connections
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
Ok(())
}
async fn handle_client(
dest_host: Arc<String>,
dest_port: u16,
cancel_map: &CancelMap,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let do_handshake = proxy::proxy::handshake(stream, None, cancel_map);
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
// Here we force plain test auth for the client and using received password to authenticate
// to the destination server. Instead we can always trust the client and take the password / JWT
// each time we get a connection.
let password = AuthFlow::new(&mut stream)
.begin(auth::CleartextPassword)
.await?
.authenticate()
.await?;
let mut conn_cfg = ConnCfg::new();
conn_cfg.set_startup_params(&params);
conn_cfg.password(password);
conn_cfg.host(dest_host.as_str());
conn_cfg.port(dest_port);
conn_cfg.ssl_mode(SslMode::Require);
info!("destination: {:?}:{}", dest_host, dest_port);
let mut conn = conn_cfg
.connect(false)
.or_else(|e| stream.throw_error(e))
.await?;
cancel_map
.with_session(|session| async {
proxy::proxy::prepare_client_connection(&conn, false, session, &mut stream).await?;
let (stream, read_buf) = stream.into_inner();
conn.stream.write_all(&read_buf).await?;
let metrics_aux: MetricsAuxInfo = Default::default();
proxy::proxy::proxy_pass(stream, conn.stream, &metrics_aux).await
})
.await
}

View File

@@ -1,4 +1,3 @@
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
prelude::*,
@@ -22,13 +21,8 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.with_writer(std::io::stderr)
.with_target(false);
let otlp_layer = tracing_utils::init_tracing("proxy")
.await
.map(OpenTelemetryLayer::new);
tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.try_init()?;

View File

@@ -213,7 +213,7 @@ async fn handle_client(
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
/// we also take an extra care of propagating only the select handshake errors to client.
#[tracing::instrument(skip_all)]
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
mut tls: Option<&TlsConfig>,
cancel_map: &CancelMap,
@@ -350,7 +350,7 @@ async fn connect_to_compute(
/// Finish client connection initialization: confirm auth success, send params, etc.
#[tracing::instrument(skip_all)]
async fn prepare_client_connection(
pub async fn prepare_client_connection(
node: &compute::PostgresConnection,
reported_auth_ok: bool,
session: cancellation::Session<'_>,

View File

@@ -3,16 +3,15 @@
//
use anyhow::{bail, Context, Result};
use clap::Parser;
use futures::FutureExt;
use remote_storage::RemoteStorageConfig;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document;
use utils::signals::ShutdownSignals;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use storage_broker::Uri;
use tokio::sync::mpsc;
@@ -36,6 +35,7 @@ use safekeeper::SafeKeeperConf;
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::JwtAuth;
use utils::{
http::endpoint,
id::NodeId,
logging::{self, LogFormat},
project_git_version,
@@ -120,8 +120,7 @@ struct Args {
log_format: String,
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
let args = Args::parse();
if let Some(addr) = args.dump_control_file {
@@ -181,6 +180,7 @@ async fn main() -> anyhow::Result<()> {
heartbeat_timeout: args.heartbeat_timeout,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,
backup_runtime_threads: args.wal_backup_threads,
wal_backup_enabled: !args.disable_wal_backup,
auth,
};
@@ -190,10 +190,10 @@ async fn main() -> anyhow::Result<()> {
Some(GIT_VERSION.into()),
&[("node_id", &conf.my_id.to_string())],
);
start_safekeeper(conf).await
start_safekeeper(conf)
}
async fn start_safekeeper(conf: SafeKeeperConf) -> anyhow::Result<()> {
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
@@ -204,18 +204,14 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> anyhow::Result<()> {
// we need to release the lock file only when the current process is gone
std::mem::forget(lock_file);
info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
})?;
info!(
"starting safekeeper HTTP service on {}",
conf.listen_http_addr
);
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
info!("starting safekeeper on {}", conf.listen_pg_addr);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
e
})?;
@@ -224,90 +220,71 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> anyhow::Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;
let mut threads = vec![];
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
fn named_should_never_return(
name: &'static str,
unexpected: Result<Result<(), anyhow::Error>, JoinError>,
) -> anyhow::Result<()> {
let res = match unexpected {
Ok(Ok(())) => Err(anyhow::anyhow!("unexpected Ok(()) return")),
Ok(Err(e)) => Err(e),
Err(e) => Err(anyhow::Error::new(e)),
};
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
let router = http::make_router(conf_);
endpoint::serve_thread_main(
router,
http_listener,
std::future::pending(), // never shut down
)
.unwrap();
})?,
);
// was not able to get this working with `enum Void {}`
res.with_context(|| format!("task {name} unexpectedly joined"))
}
let conf_cloned = conf.clone();
let safekeeper_thread = thread::Builder::new()
.name("WAL service thread".into())
.spawn(|| wal_service::thread_main(conf_cloned, pg_listener))
.unwrap();
threads.push(safekeeper_thread);
let conf_ = conf.clone();
let wal_service_handle = tokio::spawn(wal_service::task_main(conf_, pg_listener))
// wrap with task name for error reporting
.map(|res| named_should_never_return("WAL service main", res));
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
broker::thread_main(conf_);
})?,
);
let conf_ = conf.clone();
let http_handle = tokio::spawn(http::task_main(conf_, http_listener))
.map(|res| named_should_never_return("HTTP service main", res));
threads.push(
thread::Builder::new()
.name("WAL removal thread".into())
.spawn(|| {
remove_wal::thread_main(conf_);
})?,
);
let conf_ = conf.clone();
let broker_task_handle =
tokio::spawn(broker::task_main(conf_).instrument(info_span!("broker")))
.map(|res| named_should_never_return("broker main", res));
let conf_ = conf.clone();
let wal_remover_handle = tokio::spawn(remove_wal::task_main(conf_))
.map(|res| named_should_never_return("WAL remover", res));
let conf_ = conf.clone();
let wal_backup_handle = tokio::spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| named_should_never_return("WAL backup launcher", res));
let metrics_shifter_handle = tokio::spawn(safekeeper::metrics::metrics_shifter())
.map(|res| named_should_never_return("metrics shifter", res));
threads.push(
thread::Builder::new()
.name("WAL backup launcher thread".into())
.spawn(move || {
wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx);
})?,
);
set_build_info_metric(GIT_VERSION);
// TODO: put more thoughts into handling of failed threads
// We should catch & die if they are in trouble.
// TODO: update tokio-stream, convert to real async Stream with
// SignalStream, map it to obtain missing signal name, combine streams into
// single stream we can easily sit on.
let mut sigquit_stream = signal(SignalKind::quit())?;
let mut sigint_stream = signal(SignalKind::interrupt())?;
let mut sigterm_stream = signal(SignalKind::terminate())?;
let tasks = async move {
tokio::try_join!(
wal_service_handle,
http_handle,
broker_task_handle,
wal_remover_handle,
wal_backup_handle,
metrics_shifter_handle
)
};
tokio::select! {
res = tasks => {
// this will be the first reason to stop a safekeeper, but not necessarily the only one
// which will get to happen before we exit
match res {
Ok(_) => unreachable!("because of named_should_never_return, we can never end up here, cannot use ! yet"),
Err(e) => return Err(e),
}
}
// On any shutdown signal, log receival and exit. Additionally, handling
// SIGQUIT prevents coredump.
_ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
_ = sigint_stream.recv() => info!("received SIGINT, terminating"),
_ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
}
Ok(())
// On any shutdown signal, log receival and exit. Additionally, handling
// SIGQUIT prevents coredump.
ShutdownSignals::handle(|signal| {
info!("received {}, terminating", signal.name());
std::process::exit(0);
})
}
/// Determine safekeeper id.

View File

@@ -15,7 +15,7 @@ use storage_broker::Request;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio::{runtime, time::sleep};
use tracing::*;
use crate::GlobalTimelines;
@@ -24,6 +24,20 @@ use crate::SafeKeeperConf;
const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
pub fn thread_main(conf: SafeKeeperConf) {
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _enter = info_span!("broker").entered();
info!("started, broker endpoint {:?}", conf.broker_endpoint);
runtime.block_on(async {
main_loop(conf).await;
});
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = BrokerServiceClient::connect(conf.broker_endpoint.clone()).await?;
@@ -35,15 +49,10 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let all_tlis = GlobalTimelines::get_all();
for tli in &all_tlis {
// filtering alternative futures::stream::iter(all_tlis)
// .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
// doesn't look better, and I'm not sure how to do that without collect.
if !tli.is_active().await {
continue;
}
let sk_info = tli.get_safekeeper_info(&conf).await;
let mut active_tlis = GlobalTimelines::get_all();
active_tlis.retain(|tli| tli.is_active());
for tli in &active_tlis {
let sk_info = tli.get_safekeeper_info(&conf);
yield sk_info;
}
sleep(push_interval).await;
@@ -88,13 +97,10 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
bail!("end of stream");
}
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
info!("started, broker endpoint {:?}", conf.broker_endpoint);
async fn main_loop(conf: SafeKeeperConf) {
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
// Selecting on JoinHandles requires some squats; is there a better way to
// reap tasks individually?

View File

@@ -2,10 +2,9 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use tokio::fs::{self, File};
use tokio::io::AsyncWriteExt;
use std::io::Read;
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
@@ -26,10 +25,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Storage should keep actual state inside of it. It should implement Deref
/// trait to access state fields and have persist method for updating that state.
#[async_trait::async_trait]
pub trait Storage: Deref<Target = SafeKeeperState> {
/// Persist safekeeper state on disk and update internal state.
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
}
#[derive(Debug)]
@@ -76,7 +74,7 @@ impl FileStorage {
/// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
// Read the version independent part
let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
let magic = buf.read_u32::<LittleEndian>()?;
if magic != SK_MAGIC {
bail!(
"bad control file magic: {:X}, expected {:X}",
@@ -84,7 +82,7 @@ impl FileStorage {
SK_MAGIC
);
}
let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
let version = buf.read_u32::<LittleEndian>()?;
if version == SK_FORMAT_VERSION {
let res = SafeKeeperState::des(buf)?;
return Ok(res);
@@ -104,7 +102,7 @@ impl FileStorage {
/// Read in the control file.
pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
let mut control_file = std::fs::OpenOptions::new()
let mut control_file = OpenOptions::new()
.read(true)
.write(true)
.open(&control_file_path)
@@ -153,31 +151,30 @@ impl Deref for FileStorage {
}
}
#[async_trait::async_trait]
impl Storage for FileStorage {
/// persists state durably to underlying storage
/// for description see https://lwn.net/Articles/457667/
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
// write data to safekeeper.control.partial
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
let mut control_partial = File::create(&control_partial_path).with_context(|| {
format!(
"failed to create partial control file at: {}",
&control_partial_path.display()
)
})?;
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
buf.write_u32::<LittleEndian>(SK_MAGIC)?;
buf.write_u32::<LittleEndian>(SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
control_partial.write_all(&buf).await.with_context(|| {
control_partial.write_all(&buf).with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
control_partial_path.display()
@@ -186,7 +183,7 @@ impl Storage for FileStorage {
// fsync the file
if !self.conf.no_sync {
control_partial.sync_all().await.with_context(|| {
control_partial.sync_all().with_context(|| {
format!(
"failed to sync partial control file at {}",
control_partial_path.display()
@@ -197,22 +194,21 @@ impl Storage for FileStorage {
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
// rename should be atomic
fs::rename(&control_partial_path, &control_path).await?;
fs::rename(&control_partial_path, &control_path)?;
// this sync is not required by any standard but postgres does this (see durable_rename)
if !self.conf.no_sync {
let new_f = File::open(&control_path).await?;
new_f.sync_all().await.with_context(|| {
format!(
"failed to sync control file at: {}",
&control_path.display()
)
})?;
File::open(&control_path)
.and_then(|f| f.sync_all())
.with_context(|| {
format!(
"failed to sync control file at: {}",
&control_path.display()
)
})?;
// fsync the directory (linux specific)
let tli_dir = File::open(&self.timeline_dir).await?;
tli_dir
.sync_all()
.await
File::open(&self.timeline_dir)
.and_then(|f| f.sync_all())
.context("failed to sync control file directory")?;
}
@@ -228,6 +224,7 @@ mod test {
use super::*;
use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
use anyhow::Result;
use std::fs;
use utils::{id::TenantTimelineId, lsn::Lsn};
fn stub_conf() -> SafeKeeperConf {
@@ -238,75 +235,59 @@ mod test {
}
}
async fn load_from_control_file(
fn load_from_control_file(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
.await
.expect("failed to create timeline dir");
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir");
Ok((
FileStorage::restore_new(ttid, conf)?,
FileStorage::load_control_file_conf(conf, ttid)?,
))
}
async fn create(
fn create(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
.await
.expect("failed to create timeline dir");
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir");
let state = SafeKeeperState::empty();
let storage = FileStorage::create_new(ttid, conf, state.clone())?;
Ok((storage, state))
}
#[tokio::test]
async fn test_read_write_safekeeper_state() {
#[test]
fn test_read_write_safekeeper_state() {
let conf = stub_conf();
let ttid = TenantTimelineId::generate();
{
let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to create state");
let (mut storage, mut state) = create(&conf, &ttid).expect("failed to create state");
// change something
state.commit_lsn = Lsn(42);
storage
.persist(&state)
.await
.expect("failed to persist state");
storage.persist(&state).expect("failed to persist state");
}
let (_, state) = load_from_control_file(&conf, &ttid)
.await
.expect("failed to read state");
let (_, state) = load_from_control_file(&conf, &ttid).expect("failed to read state");
assert_eq!(state.commit_lsn, Lsn(42));
}
#[tokio::test]
async fn test_safekeeper_state_checksum_mismatch() {
#[test]
fn test_safekeeper_state_checksum_mismatch() {
let conf = stub_conf();
let ttid = TenantTimelineId::generate();
{
let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to read state");
let (mut storage, mut state) = create(&conf, &ttid).expect("failed to read state");
// change something
state.commit_lsn = Lsn(42);
storage
.persist(&state)
.await
.expect("failed to persist state");
storage.persist(&state).expect("failed to persist state");
}
let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).await.unwrap();
let mut data = fs::read(&control_path).unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data)
.await
.expect("failed to write control file");
fs::write(&control_path, &data).expect("failed to write control file");
match load_from_control_file(&conf, &ttid).await {
match load_from_control_file(&conf, &ttid) {
Err(err) => assert!(err
.to_string()
.contains("safekeeper control file checksum mismatch")),

View File

@@ -121,7 +121,7 @@ pub struct FileInfo {
}
/// Build debug dump response, using the provided [`Args`] filters.
pub async fn build(args: Args) -> Result<Response> {
pub fn build(args: Args) -> Result<Response> {
let start_time = Utc::now();
let timelines_count = GlobalTimelines::timelines_count();
@@ -155,7 +155,7 @@ pub async fn build(args: Args) -> Result<Response> {
}
let control_file = if args.dump_control_file {
let mut state = tli.get_state().await.1;
let mut state = tli.get_state().1;
if !args.dump_term_history {
state.acceptor_state.term_history = TermHistory(vec![]);
}
@@ -165,7 +165,7 @@ pub async fn build(args: Args) -> Result<Response> {
};
let memory = if args.dump_memory {
Some(tli.memory_dump().await)
Some(tli.memory_dump())
} else {
None
};

View File

@@ -241,14 +241,14 @@ impl SafekeeperPostgresHandler {
let lsn = if self.is_walproposer_recovery() {
// walproposer should get all local WAL until flush_lsn
tli.get_flush_lsn().await
tli.get_flush_lsn()
} else {
// other clients shouldn't get any uncommitted WAL
tli.get_state().await.0.commit_lsn
tli.get_state().0.commit_lsn
}
.to_string();
let sysid = tli.get_state().await.1.server.system_id.to_string();
let sysid = tli.get_state().1.server.system_id.to_string();
let lsn_bytes = lsn.as_bytes();
let tli = PG_TLI.to_string();
let tli_bytes = tli.as_bytes();

View File

@@ -2,18 +2,3 @@ pub mod routes;
pub use routes::make_router;
pub use safekeeper_api::models;
use crate::SafeKeeperConf;
pub async fn task_main(
conf: SafeKeeperConf,
http_listener: std::net::TcpListener,
) -> anyhow::Result<()> {
let router = make_router(conf)
.build()
.map_err(|err| anyhow::anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?;
server.serve(service).await?;
Ok(()) // unreachable
}

View File

@@ -13,6 +13,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::task::JoinError;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
@@ -115,8 +116,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let (inmem, state) = tli.get_state().await;
let flush_lsn = tli.get_flush_lsn().await;
let (inmem, state) = tli.get_state();
let flush_lsn = tli.get_flush_lsn();
let epoch = state.acceptor_state.get_epoch(flush_lsn);
let term_history = state
@@ -231,11 +232,13 @@ async fn timeline_delete_force_handler(
);
check_permission(&request, Some(ttid.tenant_id))?;
ensure_no_body(&mut request).await?;
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to.
let resp = GlobalTimelines::delete_force(&ttid)
.await
.map_err(ApiError::InternalServerError)?;
let resp = tokio::task::spawn_blocking(move || {
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to.
GlobalTimelines::delete_force(&ttid).map_err(ApiError::InternalServerError)
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
json_response(StatusCode::OK, resp)
}
@@ -247,11 +250,14 @@ async fn tenant_delete_force_handler(
let tenant_id = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
.await
.map_err(ApiError::InternalServerError)?;
let delete_info = tokio::task::spawn_blocking(move || {
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it
GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
.map_err(ApiError::InternalServerError)
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
json_response(
StatusCode::OK,
delete_info
@@ -347,9 +353,11 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
timeline_id,
};
let resp = debug_dump::build(args)
.await
.map_err(ApiError::InternalServerError)?;
let resp = tokio::task::spawn_blocking(move || {
debug_dump::build(args).map_err(ApiError::InternalServerError)
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
// TODO: use streaming response
json_response(StatusCode::OK, resp)
@@ -357,8 +365,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
/// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
use utils::http::endpoint::RequestSpan;
let mut router = endpoint::make_router();
if conf.auth.is_some() {
router = router.middleware(auth_middleware(|request| {
@@ -380,34 +386,29 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
router
.data(Arc::new(conf))
.data(auth)
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.get("/v1/status", status_handler)
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
RequestSpan(timeline_create_handler).handle(r)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
RequestSpan(timeline_status_handler).handle(r)
})
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
RequestSpan(timeline_delete_force_handler).handle(r)
})
.delete("/v1/tenant/:tenant_id", |r| {
RequestSpan(tenant_delete_force_handler).handle(r)
})
.post("/v1/pull_timeline", |r| {
RequestSpan(timeline_pull_handler).handle(r)
})
.post("/v1/tenant/timeline", timeline_create_handler)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_status_handler,
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_force_handler,
)
.delete("/v1/tenant/:tenant_id", tenant_delete_force_handler)
.post("/v1/pull_timeline", timeline_pull_handler)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
|r| RequestSpan(timeline_files_handler).handle(r),
timeline_files_handler,
)
// for tests
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
RequestSpan(record_safekeeper_info).handle(r)
})
.get("/v1/debug_dump", |r| {
RequestSpan(dump_debug_handler).handle(r)
})
.post(
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
record_safekeeper_info,
)
.get("/v1/debug_dump", dump_debug_handler)
}
#[cfg(test)]

View File

@@ -73,12 +73,12 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?;
}
let inserted_wal = append_logical_message(&tli, append_request).await?;
let inserted_wal = append_logical_message(&tli, append_request)?;
let response = AppendResult {
state: tli.get_state().await.1,
state: tli.get_state().1,
inserted_wal,
};
let response_data = serde_json::to_vec(&response)
@@ -114,9 +114,9 @@ async fn prepare_safekeeper(
.await
}
async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
// add new term to existing history
let history = tli.get_state().await.1.acceptor_state.term_history;
let history = tli.get_state().1.acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
@@ -129,7 +129,7 @@ async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> any
timeline_start_lsn: lsn,
});
tli.process_msg(&proposer_elected_request).await?;
tli.process_msg(&proposer_elected_request)?;
Ok(())
}
@@ -142,12 +142,12 @@ pub struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
pub async fn append_logical_message(
pub fn append_logical_message(
tli: &Arc<Timeline>,
msg: &AppendLogicalMessage,
) -> anyhow::Result<InsertedWAL> {
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = tli.get_state().await.1;
let sk_state = tli.get_state().1;
let begin_lsn = msg.begin_lsn;
let end_lsn = begin_lsn + wal_data.len() as u64;
@@ -171,7 +171,7 @@ pub async fn append_logical_message(
wal_data: Bytes::from(wal_data),
});
let response = tli.process_msg(&append_request).await?;
let response = tli.process_msg(&append_request)?;
let append_response = match response {
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,

View File

@@ -36,6 +36,7 @@ pub mod defaults {
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
}
@@ -59,6 +60,7 @@ pub struct SafeKeeperConf {
pub heartbeat_timeout: Duration,
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
pub backup_runtime_threads: Option<usize>,
pub wal_backup_enabled: bool,
pub auth: Option<Arc<JwtAuth>>,
}
@@ -89,6 +91,7 @@ impl SafeKeeperConf {
.parse()
.expect("failed to parse default broker endpoint"),
broker_keepalive_interval: Duration::from_secs(5),
backup_runtime_threads: None,
wal_backup_enabled: true,
auth: None,
heartbeat_timeout: Duration::new(5, 0),

View File

@@ -2,12 +2,11 @@
use std::{
sync::{Arc, RwLock},
time::{Duration, Instant, SystemTime},
time::{Instant, SystemTime},
};
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
use anyhow::Result;
use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
@@ -16,7 +15,6 @@ use metrics::{
use once_cell::sync::Lazy;
use postgres_ffi::XLogSegNo;
use tokio::time::interval;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn};
@@ -223,17 +221,14 @@ impl WalStorageMetrics {
}
}
/// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
pub async fn time_io_closure<E: Into<anyhow::Error>>(
closure: impl Future<Output = Result<(), E>>,
) -> Result<f64> {
/// Accepts a closure that returns a result, and returns the duration of the closure.
pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result<f64> {
let start = std::time::Instant::now();
closure.await.map_err(|e| e.into())?;
closure()?;
Ok(start.elapsed().as_secs_f64())
}
/// Metrics for a single timeline.
#[derive(Clone)]
pub struct FullTimelineInfo {
pub ttid: TenantTimelineId,
pub ps_feedback: PageserverFeedback,
@@ -616,18 +611,3 @@ impl Collector for TimelineCollector {
mfs
}
}
/// Prometheus crate Collector interface is sync, and all safekeeper code is
/// async. To bridge the gap, this function wakes once in scrape interval and
/// copies metrics from under async lock to sync where collection can take it.
pub async fn metrics_shifter() -> anyhow::Result<()> {
let scrape_interval = Duration::from_secs(30);
let mut interval = interval(scrape_interval);
loop {
interval.tick().await;
let timelines = GlobalTimelines::get_all();
for tli in timelines {
tli.set_info_for_metrics().await;
}
}
}

View File

@@ -231,7 +231,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
info!(
"Loaded timeline {}, flush_lsn={}",
ttid,
tli.get_flush_lsn().await
tli.get_flush_lsn()
);
Ok(Response {

View File

@@ -18,14 +18,15 @@ use postgres_backend::QueryError;
use pq_proto::BeMessage;
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task;
use tokio::task::JoinHandle;
use tokio::task::spawn_blocking;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing::*;
@@ -96,7 +97,7 @@ impl SafekeeperPostgresHandler {
Err(res.expect_err("no error with WalAcceptor not spawn"))
}
Some(handle) => {
let wal_acceptor_res = handle.await;
let wal_acceptor_res = handle.join();
// If there was any network error, return it.
res?;
@@ -106,7 +107,7 @@ impl SafekeeperPostgresHandler {
Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
"WalAcceptor task panicked",
"WalAcceptor thread panicked",
))),
}
}
@@ -153,12 +154,10 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
}
};
*self.acceptor_handle = Some(WalAcceptor::spawn(
tli.clone(),
msg_rx,
reply_tx,
self.conn_id,
));
*self.acceptor_handle = Some(
WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, self.conn_id)
.context("spawn WalAcceptor thread")?,
);
// Forward all messages to WalAcceptor
read_network_loop(self.pgb_reader, msg_tx, next_msg).await
@@ -227,19 +226,28 @@ impl WalAcceptor {
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: ConnectionId,
) -> JoinHandle<anyhow::Result<()>> {
task::spawn(async move {
let mut wa = WalAcceptor {
tli,
msg_rx,
reply_tx,
};
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
let thread_name = format!("WAL acceptor {}", tli.ttid);
thread::Builder::new()
.name(thread_name)
.spawn(move || -> anyhow::Result<()> {
let mut wa = WalAcceptor {
tli,
msg_rx,
reply_tx,
};
let span_ttid = wa.tli.ttid; // satisfy borrow checker
wa.run()
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
.await
})
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let span_ttid = wa.tli.ttid; // satisfy borrow checker
runtime.block_on(
wa.run()
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)),
)
})
.map_err(anyhow::Error::from)
}
/// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
@@ -273,7 +281,7 @@ impl WalAcceptor {
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
if let Some(reply) = self.tli.process_msg(&noflush_msg)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
@@ -292,12 +300,10 @@ impl WalAcceptor {
}
// flush all written WAL to the disk
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
} else {
// process message other than AppendRequest
self.tli.process_msg(&next_msg).await?
self.tli.process_msg(&next_msg)?
};
if let Some(reply) = reply_msg {
@@ -320,8 +326,8 @@ impl Drop for ComputeConnectionGuard {
let tli = self.timeline.clone();
// tokio forbids to call blocking_send inside the runtime, and see
// comments in on_compute_disconnect why we call blocking_send.
tokio::spawn(async move {
if let Err(e) = tli.on_compute_disconnect().await {
spawn_blocking(move || {
if let Err(e) = tli.on_compute_disconnect() {
error!("failed to unregister compute connection: {}", e);
}
});

View File

@@ -1,29 +1,26 @@
//! Thread removing old WAL.
use std::time::Duration;
use std::{thread, time::Duration};
use tokio::time::sleep;
use tracing::*;
use crate::{GlobalTimelines, SafeKeeperConf};
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
pub fn thread_main(conf: SafeKeeperConf) {
let wal_removal_interval = Duration::from_millis(5000);
loop {
let tlis = GlobalTimelines::get_all();
for tli in &tlis {
if !tli.is_active().await {
if !tli.is_active() {
continue;
}
let ttid = tli.ttid;
if let Err(e) = tli
.remove_old_wal(conf.wal_backup_enabled)
.instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id))
.await
{
error!("failed to remove WAL: {}", e);
let _enter =
info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered();
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
warn!("failed to remove WAL: {}", e);
}
}
sleep(wal_removal_interval).await;
thread::sleep(wal_removal_interval)
}
}

View File

@@ -567,27 +567,25 @@ where
/// Process message from proposer and possibly form reply. Concurrent
/// callers must exclude each other.
pub async fn process_msg(
pub fn process_msg(
&mut self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
match msg {
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
ProposerAcceptorMessage::AppendRequest(msg) => {
self.handle_append_request(msg, true).await
}
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true),
ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
self.handle_append_request(msg, false).await
self.handle_append_request(msg, false)
}
ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
ProposerAcceptorMessage::FlushWAL => self.handle_flush(),
}
}
/// Handle initial message from proposer: check its sanity and send my
/// current term.
async fn handle_greeting(
fn handle_greeting(
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
@@ -649,7 +647,7 @@ where
if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version;
}
self.state.persist(&state).await?;
self.state.persist(&state)?;
}
info!(
@@ -664,7 +662,7 @@ where
}
/// Give vote for the given term, if we haven't done that previously.
async fn handle_vote_request(
fn handle_vote_request(
&mut self,
msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> {
@@ -678,7 +676,7 @@ where
// handle_elected instead. Currently not a big deal, as proposer is the
// only source of WAL; with peer2peer recovery it would be more
// important.
self.wal_store.flush_wal().await?;
self.wal_store.flush_wal()?;
// initialize with refusal
let mut resp = VoteResponse {
term: self.state.acceptor_state.term,
@@ -692,7 +690,7 @@ where
let mut state = self.state.clone();
state.acceptor_state.term = msg.term;
// persist vote before sending it out
self.state.persist(&state).await?;
self.state.persist(&state)?;
resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64;
@@ -715,15 +713,12 @@ where
ar
}
async fn handle_elected(
&mut self,
msg: &ProposerElected,
) -> Result<Option<AcceptorProposerMessage>> {
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone();
state.acceptor_state.term = msg.term;
self.state.persist(&state).await?;
self.state.persist(&state)?;
}
// If our term is higher, ignore the message (next feedback will inform the compute)
@@ -753,7 +748,7 @@ where
// intersection of our history and history from msg
// truncate wal, update the LSNs
self.wal_store.truncate_wal(msg.start_streaming_at).await?;
self.wal_store.truncate_wal(msg.start_streaming_at)?;
// and now adopt term history from proposer
{
@@ -787,7 +782,7 @@ where
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
state.acceptor_state.term_history = msg.term_history.clone();
self.persist_control_file(state).await?;
self.persist_control_file(state)?;
}
info!("start receiving WAL since {:?}", msg.start_streaming_at);
@@ -799,7 +794,7 @@ where
///
/// Note: it is assumed that 'WAL we have is from the right term' check has
/// already been done outside.
async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
// Both peers and walproposer communicate this value, we might already
// have a fresher (higher) version.
candidate = max(candidate, self.inmem.commit_lsn);
@@ -821,29 +816,29 @@ where
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
self.persist_control_file(self.state.clone()).await?;
self.persist_control_file(self.state.clone())?;
}
Ok(())
}
/// Persist control file to disk, called only after timeline creation (bootstrap).
pub async fn persist(&mut self) -> Result<()> {
self.persist_control_file(self.state.clone()).await
pub fn persist(&mut self) -> Result<()> {
self.persist_control_file(self.state.clone())
}
/// Persist in-memory state to the disk, taking other data from state.
async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
state.commit_lsn = self.inmem.commit_lsn;
state.backup_lsn = self.inmem.backup_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.proposer_uuid = self.inmem.proposer_uuid;
self.state.persist(&state).await
self.state.persist(&state)
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
async fn handle_append_request(
fn handle_append_request(
&mut self,
msg: &AppendRequest,
require_flush: bool,
@@ -866,19 +861,17 @@ where
// do the job
if !msg.wal_data.is_empty() {
self.wal_store
.write_wal(msg.h.begin_lsn, &msg.wal_data)
.await?;
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
}
// flush wal to the disk, if required
if require_flush {
self.wal_store.flush_wal().await?;
self.wal_store.flush_wal()?;
}
// Update commit_lsn.
if msg.h.commit_lsn != Lsn(0) {
self.update_commit_lsn(msg.h.commit_lsn).await?;
self.update_commit_lsn(msg.h.commit_lsn)?;
}
// Value calculated by walproposer can always lag:
// - safekeepers can forget inmem value and send to proposer lower
@@ -894,7 +887,7 @@ where
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn
{
self.persist_control_file(self.state.clone()).await?;
self.persist_control_file(self.state.clone())?;
}
trace!(
@@ -916,15 +909,15 @@ where
}
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal().await?;
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal()?;
Ok(Some(AcceptorProposerMessage::AppendResponse(
self.append_response(),
)))
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
let mut sync_control_file = false;
if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
@@ -932,7 +925,7 @@ where
// commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider.
if sk_info.last_log_term == self.get_epoch() {
self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
self.update_commit_lsn(Lsn(sk_info.commit_lsn))?;
}
}
@@ -959,7 +952,7 @@ where
// persisting cf -- that is not much needed currently. We could do
// that by storing Arc to walsenders in Safekeeper.
state.remote_consistent_lsn = new_remote_consistent_lsn;
self.persist_control_file(state).await?;
self.persist_control_file(state)?;
}
Ok(())
}
@@ -983,7 +976,6 @@ where
#[cfg(test)]
mod tests {
use futures::future::BoxFuture;
use postgres_ffi::WAL_SEGMENT_SIZE;
use super::*;
@@ -995,9 +987,8 @@ mod tests {
persisted_state: SafeKeeperState,
}
#[async_trait::async_trait]
impl control_file::Storage for InMemoryState {
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
self.persisted_state = s.clone();
Ok(())
}
@@ -1023,28 +1014,27 @@ mod tests {
lsn: Lsn,
}
#[async_trait::async_trait]
impl wal_storage::Storage for DummyWalStore {
fn flush_lsn(&self) -> Lsn {
self.lsn
}
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())
}
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
self.lsn = end_pos;
Ok(())
}
async fn flush_wal(&mut self) -> Result<()> {
fn flush_wal(&mut self) -> Result<()> {
Ok(())
}
fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
Box::pin(async { Ok(()) })
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
}
fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
@@ -1052,8 +1042,8 @@ mod tests {
}
}
#[tokio::test]
async fn test_voting() {
#[test]
fn test_voting() {
let storage = InMemoryState {
persisted_state: test_sk_state(),
};
@@ -1062,7 +1052,7 @@ mod tests {
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let mut vote_resp = sk.process_msg(&vote_request).await;
let mut vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
r => panic!("unexpected response: {:?}", r),
@@ -1077,15 +1067,15 @@ mod tests {
sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request).await;
vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
r => panic!("unexpected response: {:?}", r),
}
}
#[tokio::test]
async fn test_epoch_switch() {
#[test]
fn test_epoch_switch() {
let storage = InMemoryState {
persisted_state: test_sk_state(),
};
@@ -1117,13 +1107,10 @@ mod tests {
timeline_start_lsn: Lsn(0),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
// check that AppendRequest before epochStartLsn doesn't switch epoch
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok());
assert_eq!(sk.get_epoch(), 0);
@@ -1134,11 +1121,9 @@ mod tests {
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok());
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
sk.wal_store.truncate_wal(Lsn(3)).unwrap(); // imitate the complete record at 3 %)
assert_eq!(sk.get_epoch(), 1);
}
}

View File

@@ -394,7 +394,7 @@ impl SafekeeperPostgresHandler {
// on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL.
let stop_pos: Option<Lsn> = if self.is_walproposer_recovery() {
let wal_end = tli.get_flush_lsn().await;
let wal_end = tli.get_flush_lsn();
Some(wal_end)
} else {
None
@@ -409,7 +409,7 @@ impl SafekeeperPostgresHandler {
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse).await?;
let (_, persisted_state) = tli.get_state().await;
let (_, persisted_state) = tli.get_state();
let wal_reader = WalReader::new(
self.conf.workdir.clone(),
self.conf.timeline_dir(&tli.ttid),
@@ -540,7 +540,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
.walsenders
.get_ws_remote_consistent_lsn(self.ws_guard.id)
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
if self.tli.should_walsender_stop(remote_consistent_lsn) {
// Terminate if there is nothing more to send.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",

View File

@@ -2,13 +2,12 @@
//! to glue together SafeKeeper and all other background services.
use anyhow::{anyhow, bail, Result};
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::XLogSegNo;
use tokio::fs;
use std::cmp::max;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
sync::{mpsc::Sender, watch},
time::Instant,
@@ -288,9 +287,8 @@ pub struct Timeline {
commit_lsn_watch_tx: watch::Sender<Lsn>,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
/// Safekeeper and other state, that should remain consistent and
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
/// Safekeeper and other state, that should remain consistent and synchronized
/// with the disk.
mutex: Mutex<SharedState>,
walsenders: Arc<WalSenders>,
@@ -303,9 +301,6 @@ pub struct Timeline {
/// Directory where timeline state is stored.
pub timeline_dir: PathBuf,
/// Data prepared for collection by synchronous prometheus scraper.
metrics_data: std::sync::Mutex<Option<FullTimelineInfo>>,
}
impl Timeline {
@@ -333,7 +328,6 @@ impl Timeline {
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
metrics_data: std::sync::Mutex::new(None),
})
}
@@ -360,7 +354,6 @@ impl Timeline {
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
metrics_data: std::sync::Mutex::new(None),
})
}
@@ -369,8 +362,8 @@ impl Timeline {
///
/// Bootstrap is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged.
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
pub fn bootstrap(&self, shared_state: &mut MutexGuard<SharedState>) -> Result<()> {
match std::fs::metadata(&self.timeline_dir) {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
// and return error.
@@ -383,51 +376,53 @@ impl Timeline {
}
// Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?;
std::fs::create_dir_all(&self.timeline_dir)?;
// Write timeline to disk and TODO: start background tasks.
if let Err(e) = shared_state.sk.persist().await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
match || -> Result<()> {
shared_state.sk.persist()?;
// TODO: add more initialization steps here
self.update_status(shared_state);
Ok(())
}() {
Ok(_) => Ok(()),
Err(e) => {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
warn!(
"failed to remove timeline {} directory after bootstrap failure: {}",
self.ttid, fs_err
);
if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) {
warn!(
"failed to remove timeline {} directory after bootstrap failure: {}",
self.ttid, fs_err
);
}
Err(e)
}
return Err(e);
}
// TODO: add more initialization steps here
self.update_status(shared_state);
Ok(())
}
/// Delete timeline from disk completely, by removing timeline directory. Background
/// timeline activities will stop eventually.
pub async fn delete_from_disk(
pub fn delete_from_disk(
&self,
shared_state: &mut MutexGuard<'_, SharedState>,
shared_state: &mut MutexGuard<SharedState>,
) -> Result<(bool, bool)> {
let was_active = shared_state.active;
self.cancel(shared_state);
let dir_existed = delete_dir(&self.timeline_dir).await?;
let dir_existed = delete_dir(&self.timeline_dir)?;
Ok((dir_existed, was_active))
}
/// Cancel timeline to prevent further usage. Background tasks will stop
/// eventually after receiving cancellation signal.
///
/// Note that we can't notify backup launcher here while holding
/// shared_state lock, as this is a potential deadlock: caller is
/// responsible for that. Generally we should probably make WAL backup tasks
/// to shut down on their own, checking once in a while whether it is the
/// time.
fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
fn cancel(&self, shared_state: &mut MutexGuard<SharedState>) {
info!("timeline {} is cancelled", self.ttid);
let _ = self.cancellation_tx.send(true);
let res = self.wal_backup_launcher_tx.blocking_send(self.ttid);
if let Err(e) = res {
error!("Failed to send stop signal to wal_backup_launcher: {}", e);
}
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.wal_store.close();
@@ -439,8 +434,8 @@ impl Timeline {
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
pub fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock()
}
fn update_status(&self, shared_state: &mut SharedState) -> bool {
@@ -456,7 +451,7 @@ impl Timeline {
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state();
shared_state.num_computes += 1;
is_wal_backup_action_pending = self.update_status(&mut shared_state);
}
@@ -470,17 +465,22 @@ impl Timeline {
/// De-register compute connection, shutting down timeline activity if
/// pageserver doesn't need catchup.
pub async fn on_compute_disconnect(&self) -> Result<()> {
pub fn on_compute_disconnect(&self) -> Result<()> {
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state();
shared_state.num_computes -= 1;
is_wal_backup_action_pending = self.update_status(&mut shared_state);
}
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all.
self.wal_backup_launcher_tx.send(self.ttid).await?;
//
// Note: this is blocking_send because on_compute_disconnect is called in Drop, there is
// no async Drop and we use current thread runtimes. With current thread rt spawning
// task in drop impl is racy, as thread along with runtime might finish before the task.
// This should be switched send.await when/if we go to full async.
self.wal_backup_launcher_tx.blocking_send(self.ttid)?;
}
Ok(())
}
@@ -490,11 +490,11 @@ impl Timeline {
/// computes. While there might be nothing to stream already, we learn about
/// remote_consistent_lsn update through replication feedback, and we want
/// to stop pushing to the broker if pageserver is fully caughtup.
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
pub fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
if self.is_cancelled() {
return true;
}
let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state();
if shared_state.num_computes == 0 {
return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
@@ -504,12 +504,12 @@ impl Timeline {
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub async fn wal_backup_attend(&self) -> bool {
pub fn wal_backup_attend(&self) -> bool {
if self.is_cancelled() {
return false;
}
self.write_shared_state().await.wal_backup_attend()
self.write_shared_state().wal_backup_attend()
}
/// Returns commit_lsn watch channel.
@@ -518,7 +518,7 @@ impl Timeline {
}
/// Pass arrived message to the safekeeper.
pub async fn process_msg(
pub fn process_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
@@ -529,8 +529,8 @@ impl Timeline {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
let mut shared_state = self.write_shared_state();
rmsg = shared_state.sk.process_msg(msg)?;
// if this is AppendResponse, fill in proper pageserver and hot
// standby feedback.
@@ -547,37 +547,37 @@ impl Timeline {
}
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
self.write_shared_state().await.get_wal_seg_size()
pub fn get_wal_seg_size(&self) -> usize {
self.write_shared_state().get_wal_seg_size()
}
/// Returns true only if the timeline is loaded and active.
pub async fn is_active(&self) -> bool {
pub fn is_active(&self) -> bool {
if self.is_cancelled() {
return false;
}
self.write_shared_state().await.active
self.write_shared_state().active
}
/// Returns state of the timeline.
pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
let state = self.write_shared_state().await;
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
let state = self.write_shared_state();
(state.sk.inmem.clone(), state.sk.state.clone())
}
/// Returns latest backup_lsn.
pub async fn get_wal_backup_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.inmem.backup_lsn
pub fn get_wal_backup_lsn(&self) -> Lsn {
self.write_shared_state().sk.inmem.backup_lsn
}
/// Sets backup_lsn to the given value.
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut state = self.write_shared_state().await;
let mut state = self.write_shared_state();
state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
@@ -585,8 +585,8 @@ impl Timeline {
}
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state().await;
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state();
shared_state.get_safekeeper_info(
&self.ttid,
conf,
@@ -605,8 +605,8 @@ impl Timeline {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state().await;
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let mut shared_state = self.write_shared_state();
shared_state.sk.record_safekeeper_info(&sk_info)?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = self.update_status(&mut shared_state);
@@ -623,8 +623,8 @@ impl Timeline {
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state().await;
pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state();
let now = Instant::now();
shared_state
.peers_info
@@ -641,47 +641,47 @@ impl Timeline {
}
/// Returns flush_lsn.
pub async fn get_flush_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.wal_store.flush_lsn()
pub fn get_flush_lsn(&self) -> Lsn {
self.write_shared_state().sk.wal_store.flush_lsn()
}
/// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let horizon_segno: XLogSegNo;
let remover = {
let shared_state = self.write_shared_state().await;
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
{
let shared_state = self.write_shared_state();
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
remover = shared_state.sk.wal_store.remove_up_to();
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(()); // nothing to do
return Ok(());
}
let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
// release the lock before removing
remover
};
}
// delete old WAL files
remover.await?;
remover(horizon_segno - 1)?;
// update last_removed_segno
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state();
shared_state.last_removed_segno = horizon_segno;
Ok(())
}
/// Gather timeline data for metrics. If the timeline is not active, returns
/// None, we do not collect these.
async fn gather_info_for_metrics(&self) -> Option<FullTimelineInfo> {
/// Returns full timeline info, required for the metrics. If the timeline is
/// not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
if self.is_cancelled() {
return None;
}
let ps_feedback = self.walsenders.get_ps_feedback();
let state = self.write_shared_state().await;
let state = self.write_shared_state();
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
@@ -702,20 +702,9 @@ impl Timeline {
}
}
/// Gathers timeline data for metrics and puts it into .metrics_data
pub async fn set_info_for_metrics(&self) {
let metrics_data = self.gather_info_for_metrics().await;
*self.metrics_data.lock().unwrap() = metrics_data;
}
/// Synchronous method returning current metrics data.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
self.metrics_data.lock().unwrap().clone()
}
/// Returns in-memory timeline state to build a full debug dump.
pub async fn memory_dump(&self) -> debug_dump::Memory {
let state = self.write_shared_state().await;
pub fn memory_dump(&self) -> debug_dump::Memory {
let state = self.write_shared_state();
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
state.sk.wal_store.internal_state();
@@ -739,8 +728,8 @@ impl Timeline {
}
/// Deletes directory and it's contents. Returns false if directory does not exist.
async fn delete_dir(path: &PathBuf) -> Result<bool> {
match fs::remove_dir_all(path).await {
fn delete_dir(path: &PathBuf) -> Result<bool> {
match std::fs::remove_dir_all(path) {
Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(e) => Err(e.into()),

View File

@@ -113,17 +113,9 @@ impl GlobalTimelines {
Ok(())
}
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any.
///
/// Note: This function (and all reading/loading below) is sync because
/// timelines are loaded while holding GlobalTimelinesState lock. Which is
/// fine as this is called only from single threaded main runtime on boot,
/// but clippy complains anyway, and suppressing that isn't trivial as async
/// is the keyword, ha. That only other user is pull_timeline.rs for which
/// being blocked is not that bad, and we can do spawn_blocking.
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any.
fn load_tenant_timelines(
state: &mut MutexGuard<'_, GlobalTimelinesState>,
state: &mut MutexGuard<GlobalTimelinesState>,
tenant_id: TenantId,
) -> Result<()> {
let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
@@ -228,7 +220,7 @@ impl GlobalTimelines {
// Take a lock and finish the initialization holding this mutex. No other threads
// can interfere with creation after we will insert timeline into the map.
{
let mut shared_state = timeline.write_shared_state().await;
let mut shared_state = timeline.write_shared_state();
// We can get a race condition here in case of concurrent create calls, but only
// in theory. create() will return valid timeline on the next try.
@@ -240,7 +232,7 @@ impl GlobalTimelines {
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline.bootstrap(&mut shared_state).await {
if let Err(e) = timeline.bootstrap(&mut shared_state) {
// Note: the most likely reason for bootstrap failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
@@ -302,16 +294,15 @@ impl GlobalTimelines {
}
/// Cancels timeline, then deletes the corresponding data directory.
pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
pub fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res {
Ok(timeline) => {
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
let mut shared_state = timeline.write_shared_state();
info!("deleting timeline {}", ttid);
let (dir_existed, was_active) =
timeline.delete_from_disk(&mut shared_state).await?;
let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?;
// Remove timeline from the map.
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
@@ -344,7 +335,7 @@ impl GlobalTimelines {
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
/// created simultaneously. In that case the function will return error and the caller should
/// retry tenant deletion again later.
pub async fn delete_force_all_for_tenant(
pub fn delete_force_all_for_tenant(
tenant_id: &TenantId,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
@@ -354,7 +345,7 @@ impl GlobalTimelines {
let mut deleted = HashMap::new();
for tli in &to_delete {
match Self::delete_force(&tli.ttid).await {
match Self::delete_force(&tli.ttid) {
Ok(result) => {
deleted.insert(tli.ttid, result);
}

View File

@@ -15,6 +15,7 @@ use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::fs::File;
use tokio::runtime::Builder;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -32,16 +33,30 @@ use once_cell::sync::OnceCell;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
pub fn wal_backup_launcher_thread_main(
conf: SafeKeeperConf,
wal_backup_launcher_rx: Receiver<TenantTimelineId>,
) {
let mut builder = Builder::new_multi_thread();
if let Some(num_threads) = conf.backup_runtime_threads {
builder.worker_threads(num_threads);
}
let rt = builder
.enable_all()
.build()
.expect("failed to create wal backup runtime");
rt.block_on(async {
wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await;
});
}
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
match GlobalTimelines::get(ttid).ok() {
Some(tli) => {
tli.wal_backup_attend().await;
Some(tli)
}
None => None,
}
fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
GlobalTimelines::get(ttid)
.ok()
.filter(|tli| tli.wal_backup_attend())
}
struct WalBackupTaskHandle {
@@ -125,8 +140,8 @@ async fn update_task(
ttid: TenantTimelineId,
entry: &mut WalBackupTimelineEntry,
) {
let alive_peers = entry.timeline.get_peers(conf).await;
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
let alive_peers = entry.timeline.get_peers(conf);
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn();
let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
@@ -159,10 +174,10 @@ const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself.
pub async fn wal_backup_launcher_task_main(
async fn wal_backup_launcher_main_loop(
conf: SafeKeeperConf,
mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
) -> anyhow::Result<()> {
) {
info!(
"WAL backup launcher started, remote config {:?}",
conf.remote_storage
@@ -190,7 +205,7 @@ pub async fn wal_backup_launcher_task_main(
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
let timeline = is_wal_backup_required(ttid).await;
let timeline = is_wal_backup_required(ttid);
// do we need to do anything at all?
if timeline.is_some() != tasks.contains_key(&ttid) {
if let Some(timeline) = timeline {
@@ -243,7 +258,7 @@ async fn backup_task_main(
let tli = res.unwrap();
let mut wb = WalBackupTask {
wal_seg_size: tli.get_wal_seg_size().await,
wal_seg_size: tli.get_wal_seg_size(),
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli,
timeline_dir,
@@ -299,7 +314,7 @@ impl WalBackupTask {
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn().await;
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
@@ -351,7 +366,6 @@ pub async fn backup_lsn_range(
let new_backup_lsn = s.end_lsn;
timeline
.set_wal_backup_lsn(new_backup_lsn)
.await
.context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn;
}

View File

@@ -4,7 +4,7 @@
//!
use anyhow::{Context, Result};
use postgres_backend::QueryError;
use std::{future, time::Duration};
use std::{future, thread, time::Duration};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader;
use tracing::*;
@@ -16,82 +16,104 @@ use crate::SafeKeeperConf;
use postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
pub async fn task_main(
conf: SafeKeeperConf,
pg_listener: std::net::TcpListener,
) -> anyhow::Result<()> {
// Tokio's from_std won't do this for us, per its comment.
pg_listener.set_nonblocking(true)?;
pub fn thread_main(conf: SafeKeeperConf, pg_listener: std::net::TcpListener) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("create runtime")
// todo catch error in main thread
.expect("failed to create runtime");
let listener = tokio::net::TcpListener::from_std(pg_listener)?;
let mut connection_count: ConnectionCount = 0;
runtime
.block_on(async move {
// Tokio's from_std won't do this for us, per its comment.
pg_listener.set_nonblocking(true)?;
let listener = tokio::net::TcpListener::from_std(pg_listener)?;
let mut connection_count: ConnectionCount = 0;
loop {
let (socket, peer_addr) = listener.accept().await.context("accept")?;
debug!("accepted connection from {}", peer_addr);
let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count);
loop {
match listener.accept().await {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count);
tokio::spawn(async move {
if let Err(err) = handle_socket(socket, conf, conn_id)
.instrument(info_span!("", cid = %conn_id))
.await
{
error!("connection handler exited: {}", err);
let _ = thread::Builder::new()
.name("WAL service thread".into())
.spawn(move || {
if let Err(err) = handle_socket(socket, conf, conn_id) {
error!("connection handler exited: {}", err);
}
})
.unwrap();
}
Err(e) => error!("Failed to accept connection: {}", e),
}
}
});
}
#[allow(unreachable_code)] // hint compiler the closure return type
Ok::<(), anyhow::Error>(())
})
.expect("listener failed")
}
/// This is run by `task_main` above, inside a background thread.
/// This is run by `thread_main` above, inside a background thread.
///
async fn handle_socket(
fn handle_socket(
socket: TcpStream,
conf: SafeKeeperConf,
conn_id: ConnectionId,
) -> Result<(), QueryError> {
let _enter = info_span!("", cid = %conn_id).entered();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
socket.set_nodelay(true)?;
let peer_addr = socket.peer_addr()?;
// Set timeout on reading from the socket. It prevents hanged up connection
// if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
// default, and tokio doesn't provide ability to set it out of the box.
let mut socket = TimeoutReader::new(socket);
let wal_service_timeout = Duration::from_secs(60 * 10);
socket.set_timeout(Some(wal_service_timeout));
// pin! is here because TimeoutReader (due to storing sleep future inside)
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
// shouldn't be moved.
tokio::pin!(socket);
// TimeoutReader wants async runtime during creation.
runtime.block_on(async move {
// Set timeout on reading from the socket. It prevents hanged up connection
// if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
// default, and tokio doesn't provide ability to set it out of the box.
let mut socket = TimeoutReader::new(socket);
let wal_service_timeout = Duration::from_secs(60 * 10);
socket.set_timeout(Some(wal_service_timeout));
// pin! is here because TimeoutReader (due to storing sleep future inside)
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
// shouldn't be moved.
tokio::pin!(socket);
let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() {
traffic_metrics.set_sk_az(current_az);
}
let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() {
traffic_metrics.set_sk_az(current_az);
}
let socket = MeasuredStream::new(
socket,
|cnt| {
traffic_metrics.observe_read(cnt);
},
|cnt| {
traffic_metrics.observe_write(cnt);
},
);
let socket = MeasuredStream::new(
socket,
|cnt| {
traffic_metrics.observe_read(cnt);
},
|cnt| {
traffic_metrics.observe_write(cnt);
},
);
let auth_type = match conf.auth {
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
};
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend
.run(&mut conn_handler, future::pending::<()>)
.await
let auth_type = match conf.auth {
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
};
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend
.run(&mut conn_handler, future::pending::<()>)
.await
})
}
/// Unique WAL service connection ids are logged in spans for observability.

View File

@@ -8,47 +8,54 @@
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use futures::future::BoxFuture;
use remote_storage::RemotePath;
use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use std::cmp::{max, min};
use std::io::{self, SeekFrom};
use bytes::Bytes;
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use tokio::fs::{self, remove_file, File, OpenOptions};
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{time_io_closure, WalStorageMetrics};
use crate::safekeeper::SafeKeeperState;
use crate::wal_backup::read_object;
use crate::SafeKeeperConf;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ;
use pq_proto::SystemId;
use utils::{id::TenantTimelineId, lsn::Lsn};
#[async_trait::async_trait]
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::SystemId;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage {
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Write piece of WAL from buf to disk, but not necessarily sync it.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
/// Truncate WAL at specified LSN, which must be the end of WAL record.
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
/// Durably store WAL on disk, up to the last written WAL record.
async fn flush_wal(&mut self) -> Result<()>;
fn flush_wal(&mut self) -> Result<()>;
/// Remove all segments <= given segno. Returns function doing that as we
/// want to perform it without timeline lock.
fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
/// Remove all segments <= given segno. Returns closure as we want to do
/// that without timeline lock.
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>>;
/// Release resources associated with the storage -- technically, close FDs.
/// Currently we don't remove timelines until restart (#3146), so need to
@@ -171,37 +178,33 @@ impl PhysicalStorage {
}
/// Call fdatasync if config requires so.
async fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
if !self.conf.no_sync {
self.metrics
.observe_flush_seconds(time_io_closure(file.sync_data()).await?);
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_data()?))?);
}
Ok(())
}
/// Call fsync if config requires so.
async fn fsync_file(&mut self, file: &mut File) -> Result<()> {
fn fsync_file(&mut self, file: &mut File) -> Result<()> {
if !self.conf.no_sync {
self.metrics
.observe_flush_seconds(time_io_closure(file.sync_all()).await?);
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_all()?))?);
}
Ok(())
}
/// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`.
async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
// Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
Ok((file, false))
} else if let Ok(file) = OpenOptions::new()
.write(true)
.open(&wal_file_partial_path)
.await
{
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
// Try to open existing partial file
Ok((file, true))
} else {
@@ -210,36 +213,35 @@ impl PhysicalStorage {
.create(true)
.write(true)
.open(&wal_file_partial_path)
.await
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
write_zeroes(&mut file, self.wal_seg_size).await?;
self.fsync_file(&mut file).await?;
write_zeroes(&mut file, self.wal_seg_size)?;
self.fsync_file(&mut file)?;
Ok((file, true))
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
let (mut file, is_partial) = self.open_or_create(segno).await?;
let (mut file, is_partial) = self.open_or_create(segno)?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
file.seek(SeekFrom::Start(xlogoff as u64))?;
file
};
file.write_all(buf).await?;
file.write_all(buf)?;
if xlogoff + buf.len() == self.wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&mut file).await?;
self.fdatasync_file(&mut file)?;
// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
fs::rename(wal_file_partial_path, wal_file_path)?;
} else {
// otherwise, file can be reused later
self.file = Some(file);
@@ -253,11 +255,11 @@ impl PhysicalStorage {
/// be flushed separately later.
///
/// Updates `write_lsn`.
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(mut file) = self.file.take() {
self.fdatasync_file(&mut file).await?;
self.fdatasync_file(&mut file)?;
}
self.write_lsn = pos;
@@ -275,8 +277,7 @@ impl PhysicalStorage {
buf.len()
};
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
.await?;
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?;
self.write_lsn += bytes_write as u64;
buf = &buf[bytes_write..];
}
@@ -285,7 +286,6 @@ impl PhysicalStorage {
}
}
#[async_trait::async_trait]
impl Storage for PhysicalStorage {
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
@@ -293,7 +293,7 @@ impl Storage for PhysicalStorage {
}
/// Write WAL to disk.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.
// If we need to move the pointer, use truncate_wal() instead.
if self.write_lsn > startpos {
@@ -311,7 +311,7 @@ impl Storage for PhysicalStorage {
);
}
let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
let write_seconds = time_io_closure(|| self.write_exact(startpos, buf))?;
// WAL is written, updating write metrics
self.metrics.observe_write_seconds(write_seconds);
self.metrics.observe_write_bytes(buf.len());
@@ -340,14 +340,14 @@ impl Storage for PhysicalStorage {
Ok(())
}
async fn flush_wal(&mut self) -> Result<()> {
fn flush_wal(&mut self) -> Result<()> {
if self.flush_record_lsn == self.write_record_lsn {
// no need to do extra flush
return Ok(());
}
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file).await?;
self.fdatasync_file(&mut unflushed_file)?;
self.file = Some(unflushed_file);
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file.
@@ -369,7 +369,7 @@ impl Storage for PhysicalStorage {
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
bail!(
@@ -381,27 +381,27 @@ impl Storage for PhysicalStorage {
// Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file).await?;
self.fdatasync_file(&mut unflushed_file)?;
}
let xlogoff = end_pos.segment_offset(self.wal_seg_size);
let segno = end_pos.segment_number(self.wal_seg_size);
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?;
let (mut file, is_partial) = self.open_or_create(segno).await?;
let (mut file, is_partial) = self.open_or_create(segno)?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
self.fdatasync_file(&mut file).await?;
file.seek(SeekFrom::Start(xlogoff as u64))?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?;
self.fdatasync_file(&mut file)?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_path, wal_file_partial_path).await?;
fs::rename(wal_file_path, wal_file_partial_path)?;
}
// Update LSNs
@@ -411,11 +411,11 @@ impl Storage for PhysicalStorage {
Ok(())
}
fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
let timeline_dir = self.timeline_dir.clone();
let wal_seg_size = self.wal_seg_size;
Box::pin(async move {
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
Box::new(move |segno_up_to: XLogSegNo| {
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
})
}
@@ -430,7 +430,7 @@ impl Storage for PhysicalStorage {
}
/// Remove all WAL segments in timeline_dir that match the given predicate.
async fn remove_segments_from_disk(
fn remove_segments_from_disk(
timeline_dir: &Path,
wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
@@ -439,8 +439,8 @@ async fn remove_segments_from_disk(
let mut min_removed = u64::MAX;
let mut max_removed = u64::MIN;
let mut entries = fs::read_dir(timeline_dir).await?;
while let Some(entry) = entries.next_entry().await? {
for entry in fs::read_dir(timeline_dir)? {
let entry = entry?;
let entry_path = entry.path();
let fname = entry_path.file_name().unwrap();
@@ -451,7 +451,7 @@ async fn remove_segments_from_disk(
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if remove_predicate(segno) {
remove_file(entry_path).await?;
remove_file(entry_path)?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
@@ -682,12 +682,12 @@ impl WalReader {
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/// Helper for filling file with zeroes.
async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
while count >= XLOG_BLCKSZ {
file.write_all(ZERO_BLOCK).await?;
file.write_all(ZERO_BLOCK)?;
count -= XLOG_BLCKSZ;
}
file.write_all(&ZERO_BLOCK[0..count]).await?;
file.write_all(&ZERO_BLOCK[0..count])?;
Ok(())
}