Move walreceiver start and stop behind a struct (#3973)

The PR changes module function-based walreceiver interface with a
`WalReceiver` struct that exposes a few public methods, `new`, `start`
and `stop` now.

Later, the same struct is planned to be used for getting walreceiver
stats (and, maybe, other extra data) to display during missing wal
errors for https://github.com/neondatabase/neon/issues/2106

Now though, the change required extra logic changes:

* due to the `WalReceiver` struct added, it became easier to pass `ctx`
and later do a `detached_child` instead of

bfee412701/pageserver/src/tenant/timeline.rs (L1379-L1381)

* `WalReceiver::start` which is now the public API to start the
walreceiver, could return an `Err` which now may turn a tenant into
`Broken`, same as the timeline that it tries to load during startup.

* `WalReceiverConf` was added to group walreceiver parameters from
pageserver's tenant config
This commit is contained in:
Kirill Bulatov
2023-04-12 12:39:02 +03:00
committed by GitHub
parent 06ce83c912
commit d8939d4162
5 changed files with 268 additions and 203 deletions

View File

@@ -177,9 +177,9 @@ impl UninitializedTimeline<'_> {
///
/// The new timeline is initialized in Active state, and its background jobs are
/// started
pub fn initialize(self, _ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(&mut timelines, true, true)
self.initialize_with_lock(ctx, &mut timelines, true, true)
}
/// Like `initialize`, but the caller is already holding lock on Tenant::timelines.
@@ -189,6 +189,7 @@ impl UninitializedTimeline<'_> {
/// been initialized.
fn initialize_with_lock(
mut self,
ctx: &RequestContext,
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
load_layer_map: bool,
activate: bool,
@@ -229,7 +230,9 @@ impl UninitializedTimeline<'_> {
new_timeline.maybe_spawn_flush_loop();
if activate {
new_timeline.activate();
new_timeline
.activate(ctx)
.context("initializing timeline activation")?;
}
}
}
@@ -469,7 +472,7 @@ impl Tenant {
local_metadata: Option<TimelineMetadata>,
ancestor: Option<Arc<Timeline>>,
first_save: bool,
_ctx: &RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_id;
@@ -504,7 +507,7 @@ impl Tenant {
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
// will ingest data which may require looking at the layers which are not yet available locally
match timeline.initialize_with_lock(&mut timelines_accessor, true, false) {
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) {
Ok(new_timeline) => new_timeline,
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
@@ -629,7 +632,7 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
#[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
async fn attach(self: &Arc<Tenant>, ctx: RequestContext) -> anyhow::Result<()> {
// Create directory with marker file to indicate attaching state.
// The load_local_tenants() function in tenant::mgr relies on the marker file
@@ -750,7 +753,7 @@ impl Tenant {
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate()?;
self.activate(&ctx)?;
info!("Done");
@@ -1022,7 +1025,7 @@ impl Tenant {
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate()?;
self.activate(ctx)?;
info!("Done");
@@ -1358,12 +1361,7 @@ impl Tenant {
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.tenant_id),
Some(timeline_id),
)
.await;
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
info!("waiting for timeline tasks to shutdown");
@@ -1450,7 +1448,7 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(&self) -> anyhow::Result<()> {
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut result = Ok(());
self.state.send_modify(|current_state| {
match *current_state {
@@ -1484,7 +1482,20 @@ impl Tenant {
tasks::start_background_loops(self.tenant_id);
for timeline in not_broken_timelines {
timeline.activate();
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
timeline.timeline_id, e
);
timeline.set_state(TimelineState::Broken);
*current_state = TenantState::Broken;
}
}
}
}
}
@@ -2093,7 +2104,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
_ctx: &RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let src_id = src_timeline.timeline_id;
@@ -2186,7 +2197,7 @@ impl Tenant {
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(&mut timelines, true, true)?;
.initialize_with_lock(ctx, &mut timelines, true, true)?;
drop(timelines);
// Root timeline gets its layers during creation and uploads them along with the metadata.
@@ -2299,7 +2310,7 @@ impl Tenant {
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(&mut timelines, false, true)?
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
};
info!(

View File

@@ -14,6 +14,7 @@ use pageserver_api::models::{
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -30,7 +31,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::is_broker_client_initialized;
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -71,10 +72,10 @@ use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
@@ -214,6 +215,7 @@ pub struct Timeline {
/// or None if WAL receiver has not received anything for this timeline
/// yet.
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
pub walreceiver: WalReceiver,
/// Relation size cache
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
@@ -866,10 +868,18 @@ impl Timeline {
Ok(())
}
pub fn activate(self: &Arc<Self>) {
pub fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
if is_broker_client_initialized() {
self.launch_wal_receiver(ctx, get_broker_client().clone())?;
} else if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
} else {
anyhow::bail!("broker client not initialized");
}
self.set_state(TimelineState::Active);
self.launch_wal_receiver();
self.launch_eviction_task();
Ok(())
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -1220,7 +1230,31 @@ impl Timeline {
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
let tenant_conf_guard = tenant_conf.read().unwrap();
let wal_connect_timeout = tenant_conf_guard
.walreceiver_connect_timeout
.unwrap_or(conf.default_tenant_conf.walreceiver_connect_timeout);
let lagging_wal_timeout = tenant_conf_guard
.lagging_wal_timeout
.unwrap_or(conf.default_tenant_conf.lagging_wal_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
Arc::new_cyclic(|myself| {
let walreceiver = WalReceiver::new(
TenantTimelineId::new(tenant_id, timeline_id),
Weak::clone(myself),
WalReceiverConf {
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: conf.availability_zone.clone(),
},
);
let mut result = Timeline {
conf,
tenant_conf,
@@ -1231,6 +1265,7 @@ impl Timeline {
layers: RwLock::new(LayerMap::default()),
walredo_mgr,
walreceiver,
remote_client: remote_client.map(Arc::new),
@@ -1350,44 +1385,17 @@ impl Timeline {
*flush_loop_state = FlushLoopState::Running;
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
if !is_broker_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
return;
} else {
panic!("broker client not initialized");
}
}
pub(super) fn launch_wal_receiver(
&self,
ctx: &RequestContext,
broker_client: BrokerClientChannel,
) -> anyhow::Result<()> {
info!(
"launching WAL receiver for timeline {} of tenant {}",
self.timeline_id, self.tenant_id
);
let tenant_conf_guard = self.tenant_conf.read().unwrap();
let lagging_wal_timeout = tenant_conf_guard
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
let walreceiver_connect_timeout = tenant_conf_guard
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
let background_ctx =
// XXX: this is a detached_child. Plumb through the ctx from call sites.
RequestContext::todo_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
spawn_connection_manager_task(
self_clone,
walreceiver_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
self.conf.availability_zone.clone(),
background_ctx,
);
self.walreceiver.start(ctx, broker_client)?;
Ok(())
}
///

View File

@@ -23,14 +23,133 @@
mod connection_manager;
mod walreceiver_connection;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
use crate::tenant::timeline::walreceiver::connection_manager::{
connection_manager_loop_step, ConnectionManagerState,
};
use anyhow::Context;
use std::future::Future;
use std::num::NonZeroU64;
use std::ops::ControlFlow;
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Weak};
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::select;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
pub use connection_manager::spawn_connection_manager_task;
use utils::id::TenantTimelineId;
use super::Timeline;
#[derive(Clone)]
pub struct WalReceiverConf {
/// The timeout on the connection to safekeeper for WAL streaming.
pub wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
pub lagging_wal_timeout: Duration,
/// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
pub max_lsn_wal_lag: NonZeroU64,
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
}
pub struct WalReceiver {
timeline: TenantTimelineId,
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
started: AtomicBool,
}
impl WalReceiver {
pub fn new(
timeline: TenantTimelineId,
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
) -> Self {
Self {
timeline,
timeline_ref,
conf,
started: AtomicBool::new(false),
}
}
pub fn start(
&self,
ctx: &RequestContext,
mut broker_client: BrokerClientChannel,
) -> anyhow::Result<()> {
if self.started.load(atomic::Ordering::Acquire) {
anyhow::bail!("Wal receiver is already started");
}
let timeline = self.timeline_ref.upgrade().with_context(|| {
format!("walreceiver start on a dropped timeline {}", self.timeline)
})?;
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
let walreceiver_ctx =
ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
let wal_receiver_conf = self.conf.clone();
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
let mut connection_manager_state = ConnectionManagerState::new(
timeline,
wal_receiver_conf,
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
}
},
}
}
}.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
);
self.started.store(true, atomic::Ordering::Release);
Ok(())
}
pub async fn stop(&self) {
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.timeline.tenant_id),
Some(self.timeline.timeline_id),
)
.await;
self.started.store(false, atomic::Ordering::Release);
}
}
/// A handle of an asynchronous task.
/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
@@ -39,26 +158,26 @@ pub use connection_manager::spawn_connection_manager_task;
/// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
/// That may lead to certain events not being observed by the listener.
#[derive(Debug)]
pub struct TaskHandle<E> {
struct TaskHandle<E> {
join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
events_receiver: watch::Receiver<TaskStateUpdate<E>>,
cancellation: CancellationToken,
}
pub enum TaskEvent<E> {
enum TaskEvent<E> {
Update(TaskStateUpdate<E>),
End(anyhow::Result<()>),
}
#[derive(Debug, Clone)]
pub enum TaskStateUpdate<E> {
enum TaskStateUpdate<E> {
Started,
Progress(E),
}
impl<E: Clone> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
pub fn spawn<Fut>(
fn spawn<Fut>(
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
) -> Self
where
@@ -131,7 +250,7 @@ impl<E: Clone> TaskHandle<E> {
}
/// Aborts current task, waiting for it to finish.
pub async fn shutdown(self) {
async fn shutdown(self) {
if let Some(jh) = self.join_handle {
self.cancellation.cancel();
match jh.await {

View File

@@ -11,11 +11,9 @@
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
use super::TaskStateUpdate;
use crate::broker_client::get_broker_client;
use super::{TaskStateUpdate, WalReceiverConf};
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::task_mgr::{self, TaskKind};
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
@@ -38,75 +36,17 @@ use utils::{
use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle};
/// Spawns the loop to take care of the timeline's WAL streaming connection.
pub fn spawn_connection_manager_task(
timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
ctx: RequestContext,
) {
let mut broker_client = get_broker_client().clone();
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
let mut walreceiver_state = WalreceiverState::new(
timeline,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
auth_token,
availability_zone,
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut walreceiver_state,
&ctx,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
}
},
}
}
}
.instrument(
info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id),
),
);
}
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
/// If storage broker subscription is cancelled, exits.
async fn connection_manager_loop_step(
pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
walreceiver_state: &mut WalreceiverState,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates();
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
match wait_for_active_timeline(&mut timeline_state_updates).await {
ControlFlow::Continue(()) => {}
@@ -117,8 +57,8 @@ async fn connection_manager_loop_step(
}
let id = TenantTimelineId {
tenant_id: walreceiver_state.timeline.tenant_id,
timeline_id: walreceiver_state.timeline.timeline_id,
tenant_id: connection_manager_state.timeline.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
};
// Subscribe to the broker updates. Stream shares underlying TCP connection
@@ -128,7 +68,7 @@ async fn connection_manager_loop_step(
info!("Subscribed for broker timeline updates");
loop {
let time_until_next_retry = walreceiver_state.time_until_next_retry();
let time_until_next_retry = connection_manager_state.time_until_next_retry();
// These things are happening concurrently:
//
@@ -141,12 +81,12 @@ async fn connection_manager_loop_step(
// - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
Some(wal_connection_update) = async {
match walreceiver_state.wal_connection.as_mut() {
match connection_manager_state.wal_connection.as_mut() {
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
None => None,
}
} => {
let wal_connection = walreceiver_state.wal_connection.as_mut()
let wal_connection = connection_manager_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Update(TaskStateUpdate::Started) => {},
@@ -156,7 +96,7 @@ async fn connection_manager_loop_step(
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = new_status;
}
@@ -165,7 +105,7 @@ async fn connection_manager_loop_step(
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
walreceiver_state.drop_old_connection(false).await;
connection_manager_state.drop_old_connection(false).await;
},
}
},
@@ -173,7 +113,7 @@ async fn connection_manager_loop_step(
// Got a new update from the broker
broker_update = broker_subscription.message() => {
match broker_update {
Ok(Some(broker_update)) => walreceiver_state.register_timeline_update(broker_update),
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(e) => {
error!("broker subscription failed: {e}");
return ControlFlow::Continue(());
@@ -187,12 +127,12 @@ async fn connection_manager_loop_step(
new_event = async {
loop {
if walreceiver_state.timeline.current_state() == TimelineState::Loading {
if connection_manager_state.timeline.current_state() == TimelineState::Loading {
warn!("wal connection manager should only be launched after timeline has become active");
}
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = walreceiver_state.timeline.current_state();
let new_state = connection_manager_state.timeline.current_state();
match new_state {
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
@@ -234,9 +174,9 @@ async fn connection_manager_loop_step(
} => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
}
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
info!("Switching to new connection candidate: {new_candidate:?}");
walreceiver_state
connection_manager_state
.change_connection(new_candidate, ctx)
.await
}
@@ -314,25 +254,17 @@ const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
struct WalreceiverState {
pub(super) struct ConnectionManagerState {
id: TenantTimelineId,
/// Use pageserver data about the timeline to filter out some of the safekeepers.
timeline: Arc<Timeline>,
/// The timeout on the connection to safekeeper for WAL streaming.
wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
lagging_wal_timeout: Duration,
/// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
max_lsn_wal_lag: NonZeroU64,
conf: WalReceiverConf,
/// Current connection to safekeeper for WAL streaming.
wal_connection: Option<WalConnection>,
/// Info about retries and unsuccessful attempts to connect to safekeepers.
wal_connection_retries: HashMap<NodeId, RetryInfo>,
/// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
}
/// Current connection data.
@@ -375,15 +307,8 @@ struct BrokerSkTimeline {
latest_update: NaiveDateTime,
}
impl WalreceiverState {
fn new(
timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
) -> Self {
impl ConnectionManagerState {
pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
let id = TenantTimelineId {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
@@ -391,14 +316,10 @@ impl WalreceiverState {
Self {
id,
timeline,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
conf,
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token,
availability_zone,
}
}
@@ -407,7 +328,7 @@ impl WalreceiverState {
self.drop_old_connection(true).await;
let id = self.id;
let connect_timeout = self.wal_connect_timeout;
let connect_timeout = self.conf.wal_connect_timeout;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
@@ -563,7 +484,7 @@ impl WalreceiverState {
(now - existing_wal_connection.status.latest_connection_update).to_std()
{
// Drop connection if we haven't received keepalive message for a while.
if latest_interaciton > self.wal_connect_timeout {
if latest_interaciton > self.conf.wal_connect_timeout {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
@@ -573,7 +494,7 @@ impl WalreceiverState {
existing_wal_connection.status.latest_connection_update,
),
check_time: now,
threshold: self.wal_connect_timeout,
threshold: self.conf.wal_connect_timeout,
},
});
}
@@ -589,7 +510,7 @@ impl WalreceiverState {
// Check if the new candidate has much more WAL than the current one.
match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
Some(new_sk_lsn_advantage) => {
if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
@@ -597,16 +518,16 @@ impl WalreceiverState {
reason: ReconnectReason::LaggingWal {
current_commit_lsn,
new_commit_lsn,
threshold: self.max_lsn_wal_lag,
threshold: self.conf.max_lsn_wal_lag,
},
});
}
// If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
// and the current one is not, switch to the new one.
if self.availability_zone.is_some()
if self.conf.availability_zone.is_some()
&& existing_wal_connection.availability_zone
!= self.availability_zone
&& self.availability_zone == new_availability_zone
!= self.conf.availability_zone
&& self.conf.availability_zone == new_availability_zone
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
@@ -677,7 +598,7 @@ impl WalreceiverState {
if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
if candidate_commit_lsn > current_commit_lsn
&& waiting_for_new_wal > self.lagging_wal_timeout
&& waiting_for_new_wal > self.conf.lagging_wal_timeout
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
@@ -691,7 +612,7 @@ impl WalreceiverState {
existing_wal_connection.status.latest_wal_update,
),
check_time: now,
threshold: self.lagging_wal_timeout,
threshold: self.conf.lagging_wal_timeout,
},
});
}
@@ -757,11 +678,11 @@ impl WalreceiverState {
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_ref(),
match &self.auth_token {
match &self.conf.auth_token {
None => None,
Some(x) => Some(x),
},
self.availability_zone.as_deref(),
self.conf.availability_zone.as_deref(),
) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {
@@ -775,7 +696,7 @@ impl WalreceiverState {
/// Remove candidates which haven't sent broker updates for a while.
fn cleanup_old_candidates(&mut self) {
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
let lagging_wal_timeout = self.lagging_wal_timeout;
let lagging_wal_timeout = self.conf.lagging_wal_timeout;
self.wal_stream_candidates.retain(|node_id, broker_info| {
if let Ok(time_since_latest_broker_update) =
@@ -799,7 +720,7 @@ impl WalreceiverState {
}
}
async fn shutdown(mut self) {
pub(super) async fn shutdown(mut self) {
if let Some(wal_connection) = self.wal_connection.take() {
wal_connection.connection_task.shutdown().await;
}
@@ -903,7 +824,7 @@ mod tests {
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
state.wal_connection = None;
@@ -914,7 +835,7 @@ mod tests {
(
NodeId(3),
dummy_broker_sk_timeline(
1 + state.max_lsn_wal_lag.get(),
1 + state.conf.max_lsn_wal_lag.get(),
"delay_over_threshold",
delay_over_threshold,
),
@@ -948,7 +869,7 @@ mod tests {
streaming_lsn: Some(Lsn(current_lsn)),
};
state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
@@ -966,7 +887,7 @@ mod tests {
(
connected_sk_id,
dummy_broker_sk_timeline(
current_lsn + state.max_lsn_wal_lag.get() * 2,
current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
DUMMY_SAFEKEEPER_HOST,
now,
),
@@ -978,7 +899,7 @@ mod tests {
(
NodeId(2),
dummy_broker_sk_timeline(
current_lsn + state.max_lsn_wal_lag.get() / 2,
current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
"not_enough_advanced_lsn",
now,
),
@@ -1003,7 +924,11 @@ mod tests {
state.wal_connection = None;
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
dummy_broker_sk_timeline(1 + state.max_lsn_wal_lag.get(), DUMMY_SAFEKEEPER_HOST, now),
dummy_broker_sk_timeline(
1 + state.conf.max_lsn_wal_lag.get(),
DUMMY_SAFEKEEPER_HOST,
now,
),
)]);
let only_candidate = state
@@ -1101,7 +1026,7 @@ mod tests {
let now = Utc::now().naive_utc();
let connected_sk_id = NodeId(0);
let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1);
let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
let connection_status = WalConnectionStatus {
is_connected: true,
@@ -1146,7 +1071,7 @@ mod tests {
ReconnectReason::LaggingWal {
current_commit_lsn: current_lsn,
new_commit_lsn: new_lsn,
threshold: state.max_lsn_wal_lag
threshold: state.conf.max_lsn_wal_lag
},
"Should select bigger WAL safekeeper if it starts to lag enough"
);
@@ -1165,7 +1090,7 @@ mod tests {
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
let wal_connect_timeout = chrono::Duration::from_std(state.wal_connect_timeout)?;
let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
@@ -1208,7 +1133,7 @@ mod tests {
..
} => {
assert_eq!(last_keep_alive, Some(time_over_threshold));
assert_eq!(threshold, state.lagging_wal_timeout);
assert_eq!(threshold, state.conf.lagging_wal_timeout);
}
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
@@ -1228,7 +1153,7 @@ mod tests {
let new_lsn = Lsn(100_100).align();
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
@@ -1275,7 +1200,7 @@ mod tests {
assert_eq!(current_commit_lsn, current_lsn);
assert_eq!(candidate_commit_lsn, new_lsn);
assert_eq!(last_wal_interaction, Some(time_over_threshold));
assert_eq!(threshold, state.lagging_wal_timeout);
assert_eq!(threshold, state.conf.lagging_wal_timeout);
}
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
@@ -1289,27 +1214,29 @@ mod tests {
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState {
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.expect("Failed to create an empty timeline for dummy wal connection manager");
let timeline = timeline.initialize(&ctx).unwrap();
WalreceiverState {
ConnectionManagerState {
id: TenantTimelineId {
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
},
timeline,
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
conf: WalReceiverConf {
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
auth_token: None,
availability_zone: None,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token: None,
availability_zone: None,
}
}
@@ -1321,7 +1248,7 @@ mod tests {
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
let mut state = dummy_state(&harness).await;
state.availability_zone = test_az.clone();
state.conf.availability_zone = test_az.clone();
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();

View File

@@ -42,7 +42,7 @@ use utils::lsn::Lsn;
/// Status of the connection.
#[derive(Debug, Clone, Copy)]
pub struct WalConnectionStatus {
pub(super) struct WalConnectionStatus {
/// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
pub is_connected: bool,
/// Defines a healthy connection as one on which pageserver received WAL from safekeeper
@@ -60,7 +60,7 @@ pub struct WalConnectionStatus {
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
pub async fn handle_walreceiver_connection(
pub(super) async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connconf: PgConnectionConfig,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,