diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 06a14d8a41..e78f96074e 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -352,7 +352,6 @@ impl PostgresNode { // This isn't really a supported configuration, but can be useful for // testing. conf.append("synchronous_standby_names", "pageserver"); - conf.append("neon.callmemaybe_connstring", &self.connstr()); } let mut file = File::create(self.pgdata().join("postgresql.conf"))?; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 24cdbce8f3..a8f21406fb 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::io::Write; use std::net::TcpStream; +use std::num::NonZeroU64; use std::path::PathBuf; use std::process::Command; use std::time::Duration; @@ -11,6 +12,7 @@ use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest}; +use pageserver::tenant_mgr::TenantInfo; use pageserver::timelines::TimelineInfo; use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -26,7 +28,6 @@ use utils::{ use crate::local_env::LocalEnv; use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; -use pageserver::tenant_mgr::TenantInfo; #[derive(Error, Debug)] pub enum PageserverHttpError { @@ -37,6 +38,12 @@ pub enum PageserverHttpError { Response(String), } +impl From for PageserverHttpError { + fn from(e: anyhow::Error) -> Self { + Self::Response(e.to_string()) + } +} + type Result = result::Result; pub trait ResponseErrorMessageExt: Sized { @@ -410,6 +417,15 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose()?, pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()), + walreceiver_connect_timeout: settings + .get("walreceiver_connect_timeout") + .map(|x| x.to_string()), + lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()), + max_lsn_wal_lag: settings + .get("max_lsn_wal_lag") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?, }) .send()? .error_from_body()? @@ -433,22 +449,41 @@ impl PageServerNode { tenant_id, checkpoint_distance: settings .get("checkpoint_distance") - .map(|x| x.parse::().unwrap()), + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'checkpoint_distance' as an integer")?, compaction_target_size: settings .get("compaction_target_size") - .map(|x| x.parse::().unwrap()), + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'compaction_target_size' as an integer")?, compaction_period: settings.get("compaction_period").map(|x| x.to_string()), compaction_threshold: settings .get("compaction_threshold") - .map(|x| x.parse::().unwrap()), + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'compaction_threshold' as an integer")?, gc_horizon: settings .get("gc_horizon") - .map(|x| x.parse::().unwrap()), + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'gc_horizon' as an integer")?, gc_period: settings.get("gc_period").map(|x| x.to_string()), image_creation_threshold: settings .get("image_creation_threshold") - .map(|x| x.parse::().unwrap()), + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'image_creation_threshold' as non zero integer")?, pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()), + walreceiver_connect_timeout: settings + .get("walreceiver_connect_timeout") + .map(|x| x.to_string()), + lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()), + max_lsn_wal_lag: settings + .get("max_lsn_wal_lag") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?, }) .send()? .error_from_body()?; diff --git a/docs/settings.md b/docs/settings.md index 0ca7223faa..4d828f22bc 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -31,7 +31,7 @@ broker_endpoints = ['some://etcd'] # [remote_storage] ``` -The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user, +The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user, see the corresponding section below. Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank. Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start. @@ -54,7 +54,7 @@ Note that TOML distinguishes between strings and integers, the former require si A list of endpoints (etcd currently) to connect and pull the information from. Mandatory, does not have a default, since requires etcd to be started as a separate process, -and its connection url should be specified separately. +and its connection url should be specified separately. #### broker_etcd_prefix @@ -111,6 +111,20 @@ L0 delta layer threshold for L1 image layer creation. Default is 3. WAL retention duration for PITR branching. Default is 30 days. +#### walreceiver_connect_timeout + +Time to wait to establish the wal receiver connection before failing + +#### lagging_wal_timeout + +Time the pageserver did not get any WAL updates from safekeeper (if any). +Avoids lagging pageserver preemptively by forcing to switch it from stalled connections. + +#### max_lsn_wal_lag + +Difference between Lsn values of the latest available WAL on safekeepers: if currently connected safekeeper starts to lag too long and too much, +it gets swapped to the different one. + #### initial_superuser_name Name of the initial superuser role, passed to initdb when a new tenant diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 7fe142502b..0bfce66a5d 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -31,7 +31,7 @@ struct SafekeeperTimeline { /// Published data about safekeeper's timeline. Fields made optional for easy migrations. #[serde_as] -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct SkTimelineInfo { /// Term of the last entry. pub last_log_term: Option, @@ -55,7 +55,9 @@ pub struct SkTimelineInfo { #[serde(default)] pub peer_horizon_lsn: Option, #[serde(default)] - pub safekeeper_connection_string: Option, + pub safekeeper_connstr: Option, + #[serde(default)] + pub pageserver_connstr: Option, } #[derive(Debug, thiserror::Error)] diff --git a/libs/utils/src/postgres_backend.rs b/libs/utils/src/postgres_backend.rs index 5fdb1ff9d2..ff71423122 100644 --- a/libs/utils/src/postgres_backend.rs +++ b/libs/utils/src/postgres_backend.rs @@ -336,11 +336,11 @@ impl PostgresBackend { let have_tls = self.tls_config.is_some(); match msg { FeMessage::StartupPacket(m) => { - trace!("got startup message {:?}", m); + trace!("got startup message {m:?}"); match m { FeStartupPacket::SslRequest => { - info!("SSL requested"); + debug!("SSL requested"); self.write_message(&BeMessage::EncryptionResponse(have_tls))?; if have_tls { @@ -349,7 +349,7 @@ impl PostgresBackend { } } FeStartupPacket::GssEncRequest => { - info!("GSS requested"); + debug!("GSS requested"); self.write_message(&BeMessage::EncryptionResponse(false))?; } FeStartupPacket::StartupMessage { .. } => { @@ -433,12 +433,7 @@ impl PostgresBackend { // full cause of the error, not just the top-level context + its trace. // We don't want to send that in the ErrorResponse though, // because it's not relevant to the compute node logs. - if query_string.starts_with("callmemaybe") { - // FIXME avoid printing a backtrace for tenant x not found errors until this is properly fixed - error!("query handler for '{}' failed: {}", query_string, e); - } else { - error!("query handler for '{}' failed: {:?}", query_string, e); - } + error!("query handler for '{}' failed: {:?}", query_string, e); self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?; // TODO: untangle convoluted control flow if e.to_string().contains("failed to run") { diff --git a/libs/utils/src/zid.rs b/libs/utils/src/zid.rs index 0ef174da4d..6da5355f61 100644 --- a/libs/utils/src/zid.rs +++ b/libs/utils/src/zid.rs @@ -193,7 +193,7 @@ pub struct ZTenantId(ZId); zid_newtype!(ZTenantId); // A pair uniquely identifying Zenith instance. -#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ZTenantTimelineId { pub tenant_id: ZTenantId, pub timeline_id: ZTimelineId, diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index d78d3622c4..298addb838 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [features] # It is simpler infra-wise to have failpoints enabled by default -# It shouldn't affect perf in any way because failpoints +# It shouldn't affect performance in any way because failpoints # are not placed in hot code paths default = ["failpoints"] profiling = ["pprof"] diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f44b0846a8..01b626e046 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -480,6 +480,21 @@ impl PageServerConf { if let Some(pitr_interval) = item.get("pitr_interval") { t_conf.pitr_interval = Some(parse_toml_duration("pitr_interval", pitr_interval)?); } + if let Some(walreceiver_connect_timeout) = item.get("walreceiver_connect_timeout") { + t_conf.walreceiver_connect_timeout = Some(parse_toml_duration( + "walreceiver_connect_timeout", + walreceiver_connect_timeout, + )?); + } + if let Some(lagging_wal_timeout) = item.get("lagging_wal_timeout") { + t_conf.lagging_wal_timeout = Some(parse_toml_duration( + "lagging_wal_timeout", + lagging_wal_timeout, + )?); + } + if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") { + t_conf.max_lsn_wal_lag = Some(parse_toml_from_str("max_lsn_wal_lag", max_lsn_wal_lag)?); + } Ok(t_conf) } diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index e00ccda2a1..c947cebcb6 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroU64; + use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use utils::{ @@ -33,6 +35,9 @@ pub struct TenantCreateRequest { pub gc_period: Option, pub image_creation_threshold: Option, pub pitr_interval: Option, + pub walreceiver_connect_timeout: Option, + pub lagging_wal_timeout: Option, + pub max_lsn_wal_lag: Option, } #[serde_as] @@ -68,6 +73,9 @@ pub struct TenantConfigRequest { pub gc_period: Option, pub image_creation_threshold: Option, pub pitr_interval: Option, + pub walreceiver_connect_timeout: Option, + pub lagging_wal_timeout: Option, + pub max_lsn_wal_lag: Option, } impl TenantConfigRequest { @@ -82,6 +90,21 @@ impl TenantConfigRequest { gc_period: None, image_creation_threshold: None, pitr_interval: None, + walreceiver_connect_timeout: None, + lagging_wal_timeout: None, + max_lsn_wal_lag: None, } } } + +/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`. +/// We keep one WAL receiver active per timeline. +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct WalReceiverEntry { + pub wal_producer_connstr: Option, + #[serde_as(as = "Option")] + pub last_received_msg_lsn: Option, + /// the timestamp (in microseconds) of the last received message + pub last_received_msg_ts: Option, +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index bb650a34ed..a1198051a8 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -229,23 +229,16 @@ async fn wal_receiver_get_handler(request: Request) -> Result) -> Result, ApiError> { @@ -402,6 +395,19 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result Duration { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .walreceiver_connect_timeout + .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout) + } + + pub fn get_lagging_wal_timeout(&self) -> Duration { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .lagging_wal_timeout + .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout) + } + + pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .max_lsn_wal_lag + .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag) + } + pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> { let mut tenant_conf = self.tenant_conf.write().unwrap(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 4f0fca4797..df43b8c0df 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,7 +7,6 @@ // *status* -- show actual info about this pageserver, // *pagestream* -- enter mode where smgr and pageserver talk with their // custom protocol. -// *callmemaybe $url* -- ask pageserver to start walreceiver on $url // use anyhow::{bail, ensure, Context, Result}; @@ -38,7 +37,6 @@ use crate::repository::Timeline; use crate::tenant_mgr; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; -use crate::walreceiver; use crate::CheckpointConfig; use metrics::{register_histogram_vec, HistogramVec}; use postgres_ffi::xlog_utils::to_pg_timestamp; @@ -716,30 +714,6 @@ impl postgres_backend::Handler for PageServerHandler { // Check that the timeline exists self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with("callmemaybe ") { - // callmemaybe - // TODO lazy static - let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap(); - let caps = re - .captures(query_string) - .with_context(|| format!("invalid callmemaybe: '{}'", query_string))?; - - let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - let connstr = caps.get(3).unwrap().as_str().to_owned(); - - self.check_permission(Some(tenantid))?; - - let _enter = - info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered(); - - // Check that the timeline exists - tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) - .context("Cannot load local timeline")?; - - walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?; - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 5bf128e66b..9d5056cd16 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -469,6 +469,9 @@ pub mod repo_harness { gc_period: Some(tenant_conf.gc_period), image_creation_threshold: Some(tenant_conf.image_creation_threshold), pitr_interval: Some(tenant_conf.pitr_interval), + walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout), + lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout), + max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag), } } } diff --git a/pageserver/src/tenant_config.rs b/pageserver/src/tenant_config.rs index 9bf223e59e..f68a820e95 100644 --- a/pageserver/src/tenant_config.rs +++ b/pageserver/src/tenant_config.rs @@ -10,6 +10,7 @@ //! use crate::config::PageServerConf; use serde::{Deserialize, Serialize}; +use std::num::NonZeroU64; use std::path::PathBuf; use std::time::Duration; use utils::zid::ZTenantId; @@ -34,6 +35,9 @@ pub mod defaults { pub const DEFAULT_GC_PERIOD: &str = "100 s"; pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3; pub const DEFAULT_PITR_INTERVAL: &str = "30 days"; + pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; + pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; + pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1_000_000; } /// Per-tenant configuration options @@ -68,6 +72,17 @@ pub struct TenantConf { // Page versions older than this are garbage collected away. #[serde(with = "humantime_serde")] pub pitr_interval: Duration, + /// Maximum amount of time to wait while opening a connection to receive wal, before erroring. + #[serde(with = "humantime_serde")] + pub walreceiver_connect_timeout: Duration, + /// Considers safekeepers stalled after no WAL updates were received longer than this threshold. + /// A stalled safekeeper will be changed to a newer one when it appears. + #[serde(with = "humantime_serde")] + pub lagging_wal_timeout: Duration, + /// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold. + /// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update, + /// to avoid eager reconnects. + pub max_lsn_wal_lag: NonZeroU64, } /// Same as TenantConf, but this struct preserves the information about @@ -85,6 +100,11 @@ pub struct TenantConfOpt { pub image_creation_threshold: Option, #[serde(with = "humantime_serde")] pub pitr_interval: Option, + #[serde(with = "humantime_serde")] + pub walreceiver_connect_timeout: Option, + #[serde(with = "humantime_serde")] + pub lagging_wal_timeout: Option, + pub max_lsn_wal_lag: Option, } impl TenantConfOpt { @@ -108,6 +128,13 @@ impl TenantConfOpt { .image_creation_threshold .unwrap_or(global_conf.image_creation_threshold), pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval), + walreceiver_connect_timeout: self + .walreceiver_connect_timeout + .unwrap_or(global_conf.walreceiver_connect_timeout), + lagging_wal_timeout: self + .lagging_wal_timeout + .unwrap_or(global_conf.lagging_wal_timeout), + max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag), } } @@ -136,6 +163,15 @@ impl TenantConfOpt { if let Some(pitr_interval) = other.pitr_interval { self.pitr_interval = Some(pitr_interval); } + if let Some(walreceiver_connect_timeout) = other.walreceiver_connect_timeout { + self.walreceiver_connect_timeout = Some(walreceiver_connect_timeout); + } + if let Some(lagging_wal_timeout) = other.lagging_wal_timeout { + self.lagging_wal_timeout = Some(lagging_wal_timeout); + } + if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag { + self.max_lsn_wal_lag = Some(max_lsn_wal_lag); + } } } @@ -155,6 +191,14 @@ impl TenantConf { image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD, pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL) .expect("cannot parse default PITR interval"), + walreceiver_connect_timeout: humantime::parse_duration( + DEFAULT_WALRECEIVER_CONNECT_TIMEOUT, + ) + .expect("cannot parse default walreceiver connect timeout"), + lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT) + .expect("cannot parse default walreceiver lagging wal timeout"), + max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG) + .expect("cannot parse default max walreceiver Lsn wal lag"), } } @@ -175,6 +219,16 @@ impl TenantConf { gc_period: Duration::from_secs(10), image_creation_threshold: defaults::DEFAULT_IMAGE_CREATION_THRESHOLD, pitr_interval: Duration::from_secs(60 * 60), + walreceiver_connect_timeout: humantime::parse_duration( + defaults::DEFAULT_WALRECEIVER_CONNECT_TIMEOUT, + ) + .unwrap(), + lagging_wal_timeout: humantime::parse_duration( + defaults::DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT, + ) + .unwrap(), + max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG) + .unwrap(), } } } diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index cc35d79d16..c48b021d1f 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -8,11 +8,10 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate}; use crate::storage_sync::index::RemoteIndex; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; -use crate::thread_mgr; use crate::thread_mgr::ThreadKind; -use crate::timelines; use crate::timelines::CreateRepo; use crate::walredo::PostgresRedoManager; +use crate::{thread_mgr, timelines, walreceiver}; use crate::{DatadirTimelineImpl, RepositoryImpl}; use anyhow::{bail, Context}; use serde::{Deserialize, Serialize}; @@ -21,23 +20,30 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; use std::sync::Arc; +use tokio::sync::mpsc; use tracing::*; use utils::lsn::Lsn; -use utils::zid::{ZTenantId, ZTimelineId}; +use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; mod tenants_state { + use anyhow::ensure; use std::{ collections::HashMap, sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, }; + use tokio::sync::mpsc; + use tracing::{debug, error}; use utils::zid::ZTenantId; - use crate::tenant_mgr::Tenant; + use crate::tenant_mgr::{LocalTimelineUpdate, Tenant}; lazy_static::lazy_static! { static ref TENANTS: RwLock> = RwLock::new(HashMap::new()); + /// Sends updates to the local timelines (creation and deletion) to the WAL receiver, + /// so that it can enable/disable corresponding processes. + static ref TIMELINE_UPDATE_SENDER: RwLock>> = RwLock::new(None); } pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap> { @@ -51,6 +57,39 @@ mod tenants_state { .write() .expect("Failed to write() tenants lock, it got poisoned") } + + pub(super) fn set_timeline_update_sender( + timeline_updates_sender: mpsc::UnboundedSender, + ) -> anyhow::Result<()> { + let mut sender_guard = TIMELINE_UPDATE_SENDER + .write() + .expect("Failed to write() timeline_update_sender lock, it got poisoned"); + ensure!(sender_guard.is_none(), "Timeline update sender already set"); + *sender_guard = Some(timeline_updates_sender); + Ok(()) + } + + pub(super) fn try_send_timeline_update(update: LocalTimelineUpdate) { + match TIMELINE_UPDATE_SENDER + .read() + .expect("Failed to read() timeline_update_sender lock, it got poisoned") + .as_ref() + { + Some(sender) => { + if let Err(e) = sender.send(update) { + error!("Failed to send timeline update: {}", e); + } + } + None => debug!("Timeline update sender is not enabled, cannot send update {update:?}"), + } + } + + pub(super) fn stop_timeline_update_sender() { + TIMELINE_UPDATE_SENDER + .write() + .expect("Failed to write() timeline_update_sender lock, it got poisoned") + .take(); + } } struct Tenant { @@ -87,10 +126,10 @@ pub enum TenantState { impl fmt::Display for TenantState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TenantState::Active => f.write_str("Active"), - TenantState::Idle => f.write_str("Idle"), - TenantState::Stopping => f.write_str("Stopping"), - TenantState::Broken => f.write_str("Broken"), + Self::Active => f.write_str("Active"), + Self::Idle => f.write_str("Idle"), + Self::Stopping => f.write_str("Stopping"), + Self::Broken => f.write_str("Broken"), } } } @@ -99,6 +138,11 @@ impl fmt::Display for TenantState { /// Timelines that are only partially available locally (remote storage has more data than this pageserver) /// are scheduled for download and added to the repository once download is completed. pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result { + let (timeline_updates_sender, timeline_updates_receiver) = + mpsc::unbounded_channel::(); + tenants_state::set_timeline_update_sender(timeline_updates_sender)?; + walreceiver::init_wal_receiver_main_thread(conf, timeline_updates_receiver)?; + let SyncStartupData { remote_index, local_timeline_init_statuses, @@ -113,16 +157,27 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result), +} + +impl std::fmt::Debug for LocalTimelineUpdate { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Detach(ttid) => f.debug_tuple("Remove").field(ttid).finish(), + Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(), + } + } +} + /// Updates tenants' repositories, changing their timelines state in memory. pub fn apply_timeline_sync_status_updates( conf: &'static PageServerConf, @@ -160,6 +215,7 @@ pub fn apply_timeline_sync_status_updates( /// Shut down all tenants. This runs as part of pageserver shutdown. /// pub fn shutdown_all_tenants() { + tenants_state::stop_timeline_update_sender(); let mut m = tenants_state::write_tenants(); let mut tenantids = Vec::new(); for (tenantid, tenant) in m.iter_mut() { @@ -173,7 +229,7 @@ pub fn shutdown_all_tenants() { } drop(m); - thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None); + thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None); thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None); thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None); @@ -247,32 +303,49 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option { Some(tenants_state::read_tenants().get(&tenantid)?.state) } -/// -/// Change the state of a tenant to Active and launch its compactor and GC -/// threads. If the tenant was already in Active state or Stopping, does nothing. -/// -pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> { +pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow::Result<()> { let mut m = tenants_state::write_tenants(); let tenant = m .get_mut(&tenant_id) .with_context(|| format!("Tenant not found for id {tenant_id}"))?; + let old_state = tenant.state; + tenant.state = new_state; + drop(m); - info!("activating tenant {tenant_id}"); - - match tenant.state { - // If the tenant is already active, nothing to do. - TenantState::Active => {} - - // If it's Idle, launch the compactor and GC threads - TenantState::Idle => { - thread_mgr::spawn( + match (old_state, new_state) { + (TenantState::Broken, TenantState::Broken) + | (TenantState::Active, TenantState::Active) + | (TenantState::Idle, TenantState::Idle) + | (TenantState::Stopping, TenantState::Stopping) => { + debug!("tenant {tenant_id} already in state {new_state}"); + } + (TenantState::Broken, ignored) => { + debug!("Ignoring {ignored} since tenant {tenant_id} is in broken state"); + } + (_, TenantState::Broken) => { + debug!("Setting tenant {tenant_id} status to broken"); + } + (TenantState::Stopping, ignored) => { + debug!("Ignoring {ignored} since tenant {tenant_id} is in stopping state"); + } + (TenantState::Idle, TenantState::Active) => { + info!("activating tenant {tenant_id}"); + let compactor_spawn_result = thread_mgr::spawn( ThreadKind::Compactor, Some(tenant_id), None, "Compactor thread", false, move || crate::tenant_threads::compact_loop(tenant_id), - )?; + ); + if compactor_spawn_result.is_err() { + let mut m = tenants_state::write_tenants(); + m.get_mut(&tenant_id) + .with_context(|| format!("Tenant not found for id {tenant_id}"))? + .state = old_state; + drop(m); + } + compactor_spawn_result?; let gc_spawn_result = thread_mgr::spawn( ThreadKind::GarbageCollector, @@ -286,21 +359,31 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> { .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); if let Err(e) = &gc_spawn_result { + let mut m = tenants_state::write_tenants(); + m.get_mut(&tenant_id) + .with_context(|| format!("Tenant not found for id {tenant_id}"))? + .state = old_state; + drop(m); error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}"); thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); return gc_spawn_result; } - tenant.state = TenantState::Active; } - - TenantState::Stopping => { - // don't re-activate it if it's being stopped + (TenantState::Idle, TenantState::Stopping) => { + info!("stopping idle tenant {tenant_id}"); } - - TenantState::Broken => { - // cannot activate + (TenantState::Active, TenantState::Stopping | TenantState::Idle) => { + info!("stopping tenant {tenant_id} threads due to new state {new_state}"); + thread_mgr::shutdown_threads( + Some(ThreadKind::WalReceiverManager), + Some(tenant_id), + None, + ); + thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None); + thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); } } + Ok(()) } @@ -325,15 +408,15 @@ pub fn get_local_timeline_with_load( .with_context(|| format!("Tenant {tenant_id} not found"))?; if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) { - return Ok(Arc::clone(page_tline)); + Ok(Arc::clone(page_tline)) + } else { + let page_tline = load_local_timeline(&tenant.repo, timeline_id) + .with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?; + tenant + .local_timelines + .insert(timeline_id, Arc::clone(&page_tline)); + Ok(page_tline) } - - let page_tline = load_local_timeline(&tenant.repo, timeline_id) - .with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?; - tenant - .local_timelines - .insert(timeline_id, Arc::clone(&page_tline)); - Ok(page_tline) } pub fn detach_timeline( @@ -351,6 +434,9 @@ pub fn detach_timeline( .detach_timeline(timeline_id) .context("Failed to detach inmem tenant timeline")?; tenant.local_timelines.remove(&timeline_id); + tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach( + ZTenantTimelineId::new(tenant_id, timeline_id), + )); } None => bail!("Tenant {tenant_id} not found in local tenant state"), } @@ -379,6 +465,12 @@ fn load_local_timeline( repartition_distance, )); page_tline.init_logical_size()?; + + tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach( + ZTenantTimelineId::new(repo.tenant_id(), timeline_id), + Arc::clone(&page_tline), + )); + Ok(page_tline) } diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index 473cddda58..8264bdd97c 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -91,8 +91,8 @@ pub enum ThreadKind { // associated with one later, after receiving a command from the client. PageRequestHandler, - // Thread that connects to a safekeeper to fetch WAL for one timeline. - WalReceiver, + // Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL. + WalReceiverManager, // Thread that handles compaction of all timelines for a tenant. Compactor, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index b8f349af8f..df8dd2fc29 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -1,61 +1,77 @@ +//! WAL receiver manages an open connection to safekeeper, to get the WAL it streams into. +//! To do so, a current implementation needs to do the following: //! -//! WAL receiver connects to the WAL safekeeper service, streams WAL, -//! decodes records and saves them in the repository for the correct -//! timeline. +//! * acknowledge the timelines that it needs to stream WAL into. +//! Pageserver is able to dynamically (un)load tenants on attach and detach, +//! hence WAL receiver needs to react on such events. //! -//! We keep one WAL receiver active per timeline. +//! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming. +//! For that, it watches specific keys in etcd broker and pulls the relevant data periodically. +//! The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other. +//! Without this data, no WAL streaming is possible currently. +//! +//! Only one active WAL streaming connection is allowed at a time. +//! The connection is supposed to be updated periodically, based on safekeeper timeline data. +//! +//! * handle the actual connection and WAL streaming +//! +//! Handle happens dynamically, by portions of WAL being processed and registered in the server. +//! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection. +//! +//! ## Implementation details +//! +//! WAL receiver's implementation consists of 3 kinds of nested loops, separately handling the logic from the bullets above: +//! +//! * [`init_wal_receiver_main_thread`], a wal receiver main thread, containing the control async loop: timeline addition/removal and interruption of a whole thread handling. +//! The loop is infallible, always trying to continue with the new tasks, the only place where it can fail is its initialization. +//! All of the code inside the loop is either async or a spawn_blocking wrapper around the sync code. +//! +//! * [`timeline_wal_broker_loop_step`], a broker task, handling the etcd broker subscription and polling, safekeeper selection logic and [re]connects. +//! On every concequent broker/wal streamer connection attempt, the loop steps are forced to wait for some time before running, +//! increasing with the number of attempts (capped with some fixed value). +//! This is done endlessly, to ensure we don't miss the WAL streaming when it gets available on one of the safekeepers. +//! +//! Apart from the broker management, it keeps the wal streaming connection open, with the safekeeper having the most advanced timeline state. +//! The connection could be closed from safekeeper side (with error or not), could be cancelled from pageserver side from time to time. +//! +//! * [`connection_handler::handle_walreceiver_connection`], a wal streaming task, opening the libpq connection and reading the data out of it to the end. +//! Does periodic reporting of the progress, to share some of the data via external HTTP API and to ensure we're able to switch connections when needed. +//! +//! Every task is cancellable via its separate cancellation channel, +//! also every such task's dependency (broker subscription or the data source channel) cancellation/drop triggers the corresponding task cancellation either. + +mod connection_handler; use crate::config::PageServerConf; -use crate::repository::{Repository, Timeline}; -use crate::tenant_mgr; -use crate::thread_mgr; +use crate::http::models::WalReceiverEntry; +use crate::repository::Timeline; +use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState}; use crate::thread_mgr::ThreadKind; -use crate::walingest::WalIngest; -use anyhow::{bail, Context, Error, Result}; -use bytes::BytesMut; -use fail::fail_point; -use lazy_static::lazy_static; -use postgres_ffi::waldecoder::*; -use postgres_protocol::message::backend::ReplicationMessage; -use postgres_types::PgLsn; -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; +use crate::{thread_mgr, DatadirTimelineImpl}; +use anyhow::{ensure, Context}; +use chrono::{NaiveDateTime, Utc}; +use etcd_broker::{Client, SkTimelineInfo, SkTimelineSubscription, SkTimelineSubscriptionKind}; +use itertools::Itertools; +use once_cell::sync::Lazy; use std::cell::Cell; -use std::collections::HashMap; -use std::str::FromStr; -use std::sync::Mutex; +use std::collections::{hash_map, HashMap, HashSet}; +use std::num::NonZeroU64; +use std::ops::ControlFlow; +use std::sync::Arc; use std::thread_local; -use std::time::SystemTime; -use tokio::pin; -use tokio_postgres::replication::ReplicationStream; -use tokio_postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; -use tokio_stream::StreamExt; -use tracing::*; -use utils::{ - lsn::Lsn, - pq_proto::ZenithFeedback, - zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}, +use std::time::Duration; +use tokio::select; +use tokio::{ + sync::{mpsc, watch, RwLock}, + task::JoinHandle, }; +use tracing::*; +use url::Url; +use utils::lsn::Lsn; +use utils::pq_proto::ZenithFeedback; +use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}; -/// -/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`. -/// We keep one WAL receiver active per timeline. -/// -#[serde_as] -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WalReceiverEntry { - thread_id: u64, - wal_producer_connstr: String, - #[serde_as(as = "Option")] - last_received_msg_lsn: Option, - /// the timestamp (in microseconds) of the last received message - last_received_msg_ts: Option, -} - -lazy_static! { - static ref WAL_RECEIVERS: Mutex> = - Mutex::new(HashMap::new()); -} +use self::connection_handler::{WalConnectionEvent, WalReceiverConnection}; thread_local! { // Boolean that is true only for WAL receiver threads @@ -64,375 +80,1133 @@ thread_local! { pub(crate) static IS_WAL_RECEIVER: Cell = Cell::new(false); } -fn drop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) { - let mut receivers = WAL_RECEIVERS.lock().unwrap(); - receivers.remove(&(tenantid, timelineid)); -} +/// WAL receiver state for sharing with the outside world. +/// Only entries for timelines currently available in pageserver are stored. +static WAL_RECEIVER_ENTRIES: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); -// Launch a new WAL receiver, or tell one that's running about change in connection string -pub fn launch_wal_receiver( - conf: &'static PageServerConf, - tenantid: ZTenantId, - timelineid: ZTimelineId, - wal_producer_connstr: &str, -) -> Result<()> { - let mut receivers = WAL_RECEIVERS.lock().unwrap(); - - match receivers.get_mut(&(tenantid, timelineid)) { - Some(receiver) => { - debug!("wal receiver already running, updating connection string"); - receiver.wal_producer_connstr = wal_producer_connstr.into(); - } - None => { - let thread_id = thread_mgr::spawn( - ThreadKind::WalReceiver, - Some(tenantid), - Some(timelineid), - "WAL receiver thread", - false, - move || { - IS_WAL_RECEIVER.with(|c| c.set(true)); - thread_main(conf, tenantid, timelineid); - Ok(()) - }, - )?; - - let receiver = WalReceiverEntry { - thread_id, - wal_producer_connstr: wal_producer_connstr.into(), - last_received_msg_lsn: None, - last_received_msg_ts: None, - }; - receivers.insert((tenantid, timelineid), receiver); - - // Update tenant state and start tenant threads, if they are not running yet. - tenant_mgr::activate_tenant(tenantid)?; - } - }; - Ok(()) -} - -/// Look up a WAL receiver's data in the global `WAL_RECEIVERS` -pub fn get_wal_receiver_entry( +/// Gets the public WAL streaming entry for a certain timeline. +pub async fn get_wal_receiver_entry( tenant_id: ZTenantId, timeline_id: ZTimelineId, ) -> Option { - let receivers = WAL_RECEIVERS.lock().unwrap(); - receivers.get(&(tenant_id, timeline_id)).cloned() + WAL_RECEIVER_ENTRIES + .read() + .await + .get(&ZTenantTimelineId::new(tenant_id, timeline_id)) + .cloned() } -// -// This is the entry point for the WAL receiver thread. -// -fn thread_main(conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id: ZTimelineId) { - let _enter = info_span!("WAL receiver", timeline = %timeline_id, tenant = %tenant_id).entered(); - info!("WAL receiver thread started"); - - // Look up the current WAL producer address - let wal_producer_connstr = { - match get_wal_receiver_entry(tenant_id, timeline_id) { - Some(e) => e.wal_producer_connstr, - None => { - info!( - "Unable to create the WAL receiver thread: no WAL receiver entry found for tenant {} and timeline {}", - tenant_id, timeline_id - ); - return; - } - } - }; - - // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, - // and start streaming WAL from it. - let res = walreceiver_main(conf, tenant_id, timeline_id, &wal_producer_connstr); - - // TODO cleanup info messages - if let Err(e) = res { - info!("WAL streaming connection failed ({})", e); - } else { - info!( - "walreceiver disconnected tenant {}, timelineid {}", - tenant_id, timeline_id - ); - } - - // Drop it from list of active WAL_RECEIVERS - // so that next callmemaybe request launched a new thread - drop_wal_receiver(tenant_id, timeline_id); -} - -fn walreceiver_main( - _conf: &PageServerConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, - wal_producer_connstr: &str, -) -> anyhow::Result<(), Error> { - // Connect to the database in replication mode. - info!("connecting to {:?}", wal_producer_connstr); - let connect_cfg = format!( - "{} application_name=pageserver replication=true", - wal_producer_connstr +/// Sets up the main WAL receiver thread that manages the rest of the subtasks inside of it, per timeline. +/// See comments in [`wal_receiver_main_thread_loop_step`] for more details on per timeline activities. +pub fn init_wal_receiver_main_thread( + conf: &'static PageServerConf, + mut timeline_updates_receiver: mpsc::UnboundedReceiver, +) -> anyhow::Result<()> { + let etcd_endpoints = conf.broker_endpoints.clone(); + ensure!( + !etcd_endpoints.is_empty(), + "Cannot start wal receiver: etcd endpoints are empty" ); - - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let (mut replication_client, connection) = - runtime.block_on(tokio_postgres::connect(&connect_cfg, NoTls))?; - // This is from tokio-postgres docs, but it is a bit weird in our case because we extensively use block_on - runtime.spawn(async move { - if let Err(e) = connection.await { - error!("connection error: {}", e); - } - }); - - info!("connected!"); - - // Immediately increment the gauge, then create a job to decrement it on thread exit. - // One of the pros of `defer!` is that this will *most probably* - // get called, even in presence of panics. - let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]); - gauge.inc(); - scopeguard::defer! { - gauge.dec(); - } - - let identify = runtime.block_on(identify_system(&mut replication_client))?; - info!("{:?}", identify); - let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); - let mut caught_up = false; - - let repo = tenant_mgr::get_repository_for_tenant(tenant_id) - .with_context(|| format!("no repository found for tenant {}", tenant_id))?; - let timeline = - tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id).with_context(|| { - format!( - "local timeline {} not found for tenant {}", - timeline_id, tenant_id - ) - })?; - let remote_index = repo.get_remote_index(); - - // - // Start streaming the WAL, from where we left off previously. - // - // If we had previously received WAL up to some point in the middle of a WAL record, we - // better start from the end of last full WAL record, not in the middle of one. - let mut last_rec_lsn = timeline.get_last_record_lsn(); - let mut startpoint = last_rec_lsn; - - if startpoint == Lsn(0) { - bail!("No previous WAL position"); - } - - // There might be some padding after the last full record, skip it. - startpoint += startpoint.calc_padding(8u32); - + let broker_prefix = &conf.broker_etcd_prefix; info!( - "last_record_lsn {} starting replication from {}, server is at {}...", - last_rec_lsn, startpoint, end_of_wal + "Starting wal receiver main thread, etdc endpoints: {}", + etcd_endpoints.iter().map(Url::to_string).join(", ") ); - let query = format!("START_REPLICATION PHYSICAL {}", startpoint); + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("wal-receiver-runtime-thread") + .worker_threads(40) + .enable_all() + .on_thread_start(|| IS_WAL_RECEIVER.with(|c| c.set(true))) + .build() + .context("Failed to create storage sync runtime")?; + let etcd_client = runtime + .block_on(etcd_broker::Client::connect(etcd_endpoints, None)) + .context("Failed to connect to etcd")?; - let copy_stream = runtime.block_on(replication_client.copy_both_simple(&query))?; - let physical_stream = ReplicationStream::new(copy_stream); - pin!(physical_stream); - - let mut waldecoder = WalStreamDecoder::new(startpoint); - - let mut walingest = WalIngest::new(&*timeline, startpoint)?; - - while let Some(replication_message) = runtime.block_on(async { - let shutdown_watcher = thread_mgr::shutdown_watcher(); - tokio::select! { - // check for shutdown first - biased; - _ = shutdown_watcher => { - info!("walreceiver interrupted"); - None - } - replication_message = physical_stream.next() => replication_message, - } - }) { - let replication_message = replication_message?; - let status_update = match replication_message { - ReplicationMessage::XLogData(xlog_data) => { - // Pass the WAL data to the decoder, and see if we can decode - // more records as a result. - let data = xlog_data.data(); - let startlsn = Lsn::from(xlog_data.wal_start()); - let endlsn = startlsn + data.len() as u64; - - trace!("received XLogData between {} and {}", startlsn, endlsn); - - waldecoder.feed_bytes(data); - - while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let _enter = info_span!("processing record", lsn = %lsn).entered(); - - // It is important to deal with the aligned records as lsn in getPage@LSN is - // aligned and can be several bytes bigger. Without this alignment we are - // at risk of hitting a deadlock. - anyhow::ensure!(lsn.is_aligned()); - - walingest.ingest_record(&timeline, recdata, lsn)?; - - fail_point!("walreceiver-after-ingest"); - - last_rec_lsn = lsn; + thread_mgr::spawn( + ThreadKind::WalReceiverManager, + None, + None, + "WAL receiver manager main thread", + true, + move || { + runtime.block_on(async move { + let mut local_timeline_wal_receivers = HashMap::new(); + loop { + select! { + _ = thread_mgr::shutdown_watcher() => { + info!("Shutdown signal received"); + shutdown_all_wal_connections(&mut local_timeline_wal_receivers).await; + break; + }, + _ = wal_receiver_main_thread_loop_step( + broker_prefix, + &etcd_client, + &mut timeline_updates_receiver, + &mut local_timeline_wal_receivers, + ) => {}, + } } + }.instrument(info_span!("wal_receiver_main"))); - if !caught_up && endlsn >= end_of_wal { - info!("caught up at LSN {}", endlsn); - caught_up = true; + info!("Wal receiver main thread stopped"); + Ok(()) + }, + ) + .map(|_thread_id| ()) + .context("Failed to spawn wal receiver main thread") +} + +/// A step to process timeline attach/detach events to enable/disable the corresponding WAL receiver machinery. +/// In addition to WAL streaming management, the step ensures that corresponding tenant has its service threads enabled or disabled. +/// This is done here, since only walreceiver knows when a certain tenant has no streaming enabled. +/// +/// Cannot fail, should always try to process the next timeline event even if the other one was not processed properly. +async fn wal_receiver_main_thread_loop_step<'a>( + broker_prefix: &'a str, + etcd_client: &'a Client, + timeline_updates_receiver: &'a mut mpsc::UnboundedReceiver, + local_timeline_wal_receivers: &'a mut HashMap< + ZTenantId, + HashMap, + >, +) { + // Only react on updates from [`tenant_mgr`] on local timeline attach/detach. + match timeline_updates_receiver.recv().await { + Some(update) => { + info!("Processing timeline update: {update:?}"); + match update { + // Timeline got detached, stop all related tasks and remove public timeline data. + LocalTimelineUpdate::Detach(id) => { + match local_timeline_wal_receivers.get_mut(&id.tenant_id) { + Some(wal_receivers) => { + if let hash_map::Entry::Occupied(mut o) = wal_receivers.entry(id.timeline_id) { + if let Err(e) = o.get_mut().shutdown(id).await { + error!("Failed to shut down timeline {id} wal receiver handle: {e:#}"); + return; + } else { + o.remove(); + } + } + if wal_receivers.is_empty() { + if let Err(e) = change_tenant_state(id.tenant_id, TenantState::Idle).await { + error!("Failed to make tenant idle for id {id}: {e:#}"); + } + } + } + None => warn!("Timeline {id} does not have a tenant entry in wal receiver main thread"), + }; + { + WAL_RECEIVER_ENTRIES.write().await.remove(&id); + } } + // Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly. + LocalTimelineUpdate::Attach(new_id, new_timeline) => { + let timelines = local_timeline_wal_receivers + .entry(new_id.tenant_id) + .or_default(); - timeline.tline.check_checkpoint_distance()?; + if timelines.is_empty() { + if let Err(e) = + change_tenant_state(new_id.tenant_id, TenantState::Active).await + { + error!("Failed to make tenant active for id {new_id}: {e:#}"); + return; + } + } - Some(endlsn) - } + let vacant_timeline_entry = match timelines.entry(new_id.timeline_id) { + hash_map::Entry::Occupied(_) => { + debug!("Attepted to readd an existing timeline {new_id}, ignoring"); + return; + } + hash_map::Entry::Vacant(v) => v, + }; - ReplicationMessage::PrimaryKeepAlive(keepalive) => { - let wal_end = keepalive.wal_end(); - let timestamp = keepalive.timestamp(); - let reply_requested = keepalive.reply() != 0; + let (wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag) = + match fetch_tenant_settings(new_id.tenant_id).await { + Ok(settings) => settings, + Err(e) => { + error!("Failed to fetch tenant settings for id {new_id}: {e:#}"); + return; + } + }; - trace!( - "received PrimaryKeepAlive(wal_end: {}, timestamp: {:?} reply: {})", - wal_end, - timestamp, - reply_requested, - ); - - if reply_requested { - Some(last_rec_lsn) - } else { - None - } - } - - _ => None, - }; - - if let Some(last_lsn) = status_update { - let timeline_remote_consistent_lsn = runtime.block_on(async { - remote_index - .read() - .await - // here we either do not have this timeline in remote index - // or there were no checkpoints for it yet - .timeline_entry(&ZTenantTimelineId { - tenant_id, - timeline_id, - }) - .map(|remote_timeline| remote_timeline.metadata.disk_consistent_lsn()) - .unwrap_or(Lsn(0)) // no checkpoint was uploaded - }); - - // The last LSN we processed. It is not guaranteed to survive pageserver crash. - let write_lsn = u64::from(last_lsn); - // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data - let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn()); - // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash - // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. - let apply_lsn = u64::from(timeline_remote_consistent_lsn); - let ts = SystemTime::now(); - - // Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS` - { - let mut receivers = WAL_RECEIVERS.lock().unwrap(); - let entry = match receivers.get_mut(&(tenant_id, timeline_id)) { - Some(e) => e, - None => { - anyhow::bail!( - "no WAL receiver entry found for tenant {} and timeline {}", - tenant_id, - timeline_id + { + WAL_RECEIVER_ENTRIES.write().await.insert( + new_id, + WalReceiverEntry { + wal_producer_connstr: None, + last_received_msg_lsn: None, + last_received_msg_ts: None, + }, ); } - }; - entry.last_received_msg_lsn = Some(last_lsn); - entry.last_received_msg_ts = Some( - ts.duration_since(SystemTime::UNIX_EPOCH) - .expect("Received message time should be before UNIX EPOCH!") - .as_micros(), - ); + let (cancellation_sender, mut cancellation_receiver) = watch::channel(()); + let mut wal_connection_manager = WalConnectionManager { + id: new_id, + timeline: Arc::clone(&new_timeline), + wal_connect_timeout, + lagging_wal_timeout, + max_lsn_wal_lag, + wal_connection_data: None, + wal_connection_attempt: 0, + }; + + let broker_prefix = broker_prefix.to_string(); + let mut loop_client = etcd_client.clone(); + let broker_join_handle = tokio::spawn(async move { + info!("WAL receiver broker started, connecting to etcd"); + let mut cancellation = cancellation_receiver.clone(); + loop { + select! { + _ = cancellation.changed() => { + info!("Wal broker loop cancelled, shutting down"); + break; + }, + step_result = timeline_wal_broker_loop_step( + &broker_prefix, + &mut loop_client, + &mut wal_connection_manager, + &mut cancellation_receiver, + ) => match step_result { + Ok(ControlFlow::Break(())) => { + break; + } + Ok(ControlFlow::Continue(())) => {} + Err(e) => warn!("Error during wal receiver main thread step for timeline {new_id}: {e:#}"), + } + } + } + }.instrument(info_span!("timeline", id = %new_id))); + + vacant_timeline_entry.insert(TimelineWalBrokerLoopHandle { + broker_join_handle, + cancellation_sender, + }); + } } + } + None => { + info!("Local timeline update channel closed"); + shutdown_all_wal_connections(local_timeline_wal_receivers).await; + } + } +} - // Send zenith feedback message. - // Regular standby_status_update fields are put into this message. - let zenith_status_update = ZenithFeedback { - current_timeline_size: timeline.get_current_logical_size() as u64, - ps_writelsn: write_lsn, - ps_flushlsn: flush_lsn, - ps_applylsn: apply_lsn, - ps_replytime: ts, - }; +async fn fetch_tenant_settings( + tenant_id: ZTenantId, +) -> anyhow::Result<(Duration, Duration, NonZeroU64)> { + tokio::task::spawn_blocking(move || { + let repo = tenant_mgr::get_repository_for_tenant(tenant_id) + .with_context(|| format!("no repository found for tenant {tenant_id}"))?; + Ok::<_, anyhow::Error>(( + repo.get_wal_receiver_connect_timeout(), + repo.get_lagging_wal_timeout(), + repo.get_max_lsn_wal_lag(), + )) + }) + .await + .with_context(|| format!("Failed to join on tenant {tenant_id} settings fetch task"))? +} - debug!("zenith_status_update {:?}", zenith_status_update); +async fn change_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow::Result<()> { + tokio::task::spawn_blocking(move || { + tenant_mgr::set_tenant_state(tenant_id, new_state) + .with_context(|| format!("Failed to activate tenant {tenant_id}")) + }) + .await + .with_context(|| format!("Failed to spawn activation task for tenant {tenant_id}"))? +} - let mut data = BytesMut::new(); - zenith_status_update.serialize(&mut data)?; - runtime.block_on( - physical_stream - .as_mut() - .zenith_status_update(data.len() as u64, &data), - )?; +async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) { + if n == 0 { + return; + } + let seconds_to_wait = base.powf(f64::from(n) - 1.0).min(max_seconds); + info!("Backoff: waiting {seconds_to_wait} seconds before proceeding with the task"); + tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; +} + +async fn shutdown_all_wal_connections( + local_timeline_wal_receivers: &mut HashMap< + ZTenantId, + HashMap, + >, +) { + info!("Shutting down all WAL connections"); + let mut broker_join_handles = Vec::new(); + for (tenant_id, timelines) in local_timeline_wal_receivers.drain() { + for (timeline_id, handles) in timelines { + handles.cancellation_sender.send(()).ok(); + broker_join_handles.push(( + ZTenantTimelineId::new(tenant_id, timeline_id), + handles.broker_join_handle, + )); } } - Ok(()) -} - -/// Data returned from the postgres `IDENTIFY_SYSTEM` command -/// -/// See the [postgres docs] for more details. -/// -/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html -#[derive(Debug)] -// As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as -// unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900 -#[allow(dead_code)] -pub struct IdentifySystem { - systemid: u64, - timeline: u32, - xlogpos: PgLsn, - dbname: Option, -} - -/// There was a problem parsing the response to -/// a postgres IDENTIFY_SYSTEM command. -#[derive(Debug, thiserror::Error)] -#[error("IDENTIFY_SYSTEM parse error")] -pub struct IdentifyError; - -/// Run the postgres `IDENTIFY_SYSTEM` command -pub async fn identify_system(client: &mut Client) -> Result { - let query_str = "IDENTIFY_SYSTEM"; - let response = client.simple_query(query_str).await?; - - // get(N) from row, then parse it as some destination type. - fn get_parse(row: &SimpleQueryRow, idx: usize) -> Result - where - T: FromStr, + let mut tenants = HashSet::with_capacity(broker_join_handles.len()); + for (id, broker_join_handle) in broker_join_handles { + tenants.insert(id.tenant_id); + debug!("Waiting for wal broker for timeline {id} to finish"); + if let Err(e) = broker_join_handle.await { + error!("Failed to join on wal broker for timeline {id}: {e}"); + } + } + if let Err(e) = tokio::task::spawn_blocking(move || { + for tenant_id in tenants { + if let Err(e) = tenant_mgr::set_tenant_state(tenant_id, TenantState::Idle) { + error!("Failed to make tenant {tenant_id} idle: {e:?}"); + } + } + }) + .await { - let val = row.get(idx).ok_or(IdentifyError)?; - val.parse::().or(Err(IdentifyError)) - } - - // extract the row contents into an IdentifySystem struct. - // written as a closure so I can use ? for Option here. - if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) { - Ok(IdentifySystem { - systemid: get_parse(first_row, 0)?, - timeline: get_parse(first_row, 1)?, - xlogpos: get_parse(first_row, 2)?, - dbname: get_parse(first_row, 3).ok(), - }) - } else { - Err(IdentifyError.into()) + error!("Failed to spawn a task to make all tenants idle: {e:?}"); + } +} + +/// Broker WAL loop handle to cancel the loop safely when needed. +struct TimelineWalBrokerLoopHandle { + broker_join_handle: JoinHandle<()>, + cancellation_sender: watch::Sender<()>, +} + +impl TimelineWalBrokerLoopHandle { + /// Stops the broker loop, waiting for its current task to finish. + async fn shutdown(&mut self, id: ZTenantTimelineId) -> anyhow::Result<()> { + self.cancellation_sender.send(()).context( + "Unexpected: cancellation sender is dropped before the receiver in the loop is", + )?; + debug!("Waiting for wal receiver for timeline {id} to finish"); + let handle = &mut self.broker_join_handle; + handle + .await + .with_context(|| format!("Failed to join the wal reveiver broker for timeline {id}")) + } +} + +/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. +/// Based on the updates, desides whether to start, keep or stop a WAL receiver task. +async fn timeline_wal_broker_loop_step( + broker_prefix: &str, + etcd_client: &mut Client, + wal_connection_manager: &mut WalConnectionManager, + cancellation: &mut watch::Receiver<()>, +) -> anyhow::Result> { + let id = wal_connection_manager.id; + + // Endlessly try to subscribe for broker updates for a given timeline. + // If there are no safekeepers to maintain the lease, the timeline subscription will be inavailable in the broker and the operation will fail constantly. + // This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway. + let mut broker_subscription: SkTimelineSubscription; + let mut attempt = 0; + loop { + select! { + _ = cancellation.changed() => { + info!("Subscription backoff cancelled, shutting down"); + return Ok(ControlFlow::Break(())); + }, + _ = exponential_backoff(attempt, 2.0, 60.0) => {}, + } + attempt += 1; + + select! { + _ = cancellation.changed() => { + info!("Broker subscription loop cancelled, shutting down"); + return Ok(ControlFlow::Break(())); + }, + new_subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates( + etcd_client, + SkTimelineSubscriptionKind::timeline(broker_prefix.to_owned(), id), + ) + .instrument(info_span!("etcd_subscription")) => match new_subscription { + Ok(new_subscription) => { + broker_subscription = new_subscription; + break; + } + Err(e) => { + warn!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in etcd: {e:#}"); + continue; + } + }, + + } + } + + info!("Subscribed for etcd timeline changes, considering walreceiver connections"); + + loop { + select! { + // the order of the polls is especially important here, since the first task to complete gets selected and the others get dropped (cancelled). + // place more frequetly updated tasks below to ensure the "slow" tasks are also reacted to. + biased; + // first, the cancellations are checked, to ensure we exit eagerly + _ = cancellation.changed() => { + info!("Broker loop cancelled, shutting down"); + break; + } + // then, we check for new events from the WAL connection: the existing connection should either return some progress data, + // or block, allowing other tasks in this `select!` to run first. + // + // We set a "timebomb" in the polling method, that waits long enough and cancels the entire loop if nothing happens during the wait. + // The wait is only initiated when no data (or a "channel closed" data) is received from the loop, ending with the break flow return. + // While waiting, more broker events are expected to be retrieved from etcd (currently, every safekeeper posts ~1 message/second). + // The timebomb ensures that we don't get stuck for too long on any of the WAL/etcd event polling, rather restarting the subscription entirely. + // + // We cannot return here eagerly on no WAL task data, since the result will get selected to early, not allowing etcd tasks to be polled properly. + // We cannot move etcd tasks above this select, since they are very frequent to finish and WAL events might get ignored. + // We need WAL events to periodically update the external data, so we cannot simply await the task result on the handler here. + wal_receiver_poll_result = wal_connection_manager.poll_connection_event_or_cancel() => match wal_receiver_poll_result { + ControlFlow::Break(()) => break, + ControlFlow::Continue(()) => {}, + }, + // finally, if no other tasks are completed, get another broker update and possibly reconnect + updates = broker_subscription.fetch_data() => match updates { + Some(mut all_timeline_updates) => { + if let Some(subscribed_timeline_updates) = all_timeline_updates.remove(&id) { + match wal_connection_manager.select_connection_candidate(subscribed_timeline_updates) { + Some(candidate) => { + info!("Switching to different safekeeper {} for timeline {id}, reason: {:?}", candidate.safekeeper_id, candidate.reason); + wal_connection_manager.change_connection(candidate.safekeeper_id, candidate.wal_producer_connstr).await; + }, + None => {} + } + } + }, + None => { + info!("Subscription source end was dropped, no more updates are possible, shutting down"); + break; + }, + }, + } + } + + info!("Waiting for the current connection to close"); + wal_connection_manager.close_connection().await; + broker_subscription + .cancel() + .await + .with_context(|| format!("Failed to cancel timeline {id} subscription in etcd"))?; + Ok(ControlFlow::Continue(())) +} + +/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible. +struct WalConnectionManager { + id: ZTenantTimelineId, + timeline: Arc, + wal_connect_timeout: Duration, + lagging_wal_timeout: Duration, + max_lsn_wal_lag: NonZeroU64, + wal_connection_attempt: u32, + wal_connection_data: Option, +} + +#[derive(Debug)] +struct WalConnectionData { + safekeeper_id: NodeId, + connection: WalReceiverConnection, + connection_init_time: NaiveDateTime, + last_wal_receiver_data: Option<(ZenithFeedback, NaiveDateTime)>, +} + +#[derive(Debug, PartialEq, Eq)] +struct NewWalConnectionCandidate { + safekeeper_id: NodeId, + wal_producer_connstr: String, + reason: ReconnectReason, +} + +/// Stores the reason why WAL connection was switched, for furter debugging purposes. +#[derive(Debug, PartialEq, Eq)] +enum ReconnectReason { + NoExistingConnection, + LaggingWal { + current_lsn: Lsn, + new_lsn: Lsn, + threshold: NonZeroU64, + }, + NoWalTimeout { + last_wal_interaction: NaiveDateTime, + check_time: NaiveDateTime, + threshold: Duration, + }, +} + +impl WalConnectionManager { + /// Tries to get more data from the WAL connection. + /// If the WAL connection channel is dropped or no data is retrieved, a "timebomb" future is started to break the existing broker subscription. + /// This future is intended to be used in the `select!` loop, so lengthy future normally gets dropped due to other futures completing. + /// If not, it's better to cancel the entire "stuck" loop and start over. + async fn poll_connection_event_or_cancel(&mut self) -> ControlFlow<(), ()> { + let (connection_data, wal_receiver_event) = match self.wal_connection_data.as_mut() { + Some(connection_data) => match connection_data.connection.next_event().await { + Some(event) => (connection_data, event), + None => { + warn!("WAL receiver event source stopped sending messages, waiting for other events to arrive"); + tokio::time::sleep(Duration::from_secs(30)).await; + warn!("WAL receiver without a connection spent sleeping 30s without being interrupted, aborting the loop"); + return ControlFlow::Break(()); + } + }, + None => { + tokio::time::sleep(Duration::from_secs(30)).await; + warn!("WAL receiver without a connection spent sleeping 30s without being interrupted, aborting the loop"); + return ControlFlow::Break(()); + } + }; + + match wal_receiver_event { + WalConnectionEvent::Started => { + self.wal_connection_attempt = 0; + } + WalConnectionEvent::NewWal(new_wal_data) => { + self.wal_connection_attempt = 0; + connection_data.last_wal_receiver_data = + Some((new_wal_data, Utc::now().naive_utc())); + } + WalConnectionEvent::End(wal_receiver_result) => { + match wal_receiver_result { + Ok(()) => { + info!("WAL receiver task finished, reconnecting"); + self.wal_connection_attempt = 0; + } + Err(e) => { + error!("WAL receiver task failed: {e:#}, reconnecting"); + self.wal_connection_attempt += 1; + } + } + self.close_connection().await; + } + } + + ControlFlow::Continue(()) + } + + /// Shuts down current connection (if any), waiting for it to finish. + async fn close_connection(&mut self) { + if let Some(data) = self.wal_connection_data.as_mut() { + match data.connection.shutdown().await { + Err(e) => { + error!("Failed to shutdown wal receiver connection: {e:#}"); + } + Ok(()) => self.wal_connection_data = None, + } + } + } + + /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. + async fn change_connection( + &mut self, + new_safekeeper_id: NodeId, + new_wal_producer_connstr: String, + ) { + self.close_connection().await; + self.wal_connection_data = Some(WalConnectionData { + safekeeper_id: new_safekeeper_id, + connection: WalReceiverConnection::open( + self.id, + new_safekeeper_id, + new_wal_producer_connstr, + self.wal_connect_timeout, + ), + connection_init_time: Utc::now().naive_utc(), + last_wal_receiver_data: None, + }); + } + + /// Checks current state against every fetched safekeeper state of a given timeline. + /// Returns a new candidate, if the current state is somewhat lagging, or `None` otherwise. + /// The current rules for approving new candidates: + /// * pick the safekeeper with biggest `commit_lsn` that's after than pageserver's latest Lsn for the timeline + /// * if the leader is a different SK and either + /// * no WAL updates happened after certain time (either none since the connection time or none since the last event after the connection) — reconnect + /// * same time amount had passed since the connection, WAL updates happened recently, but the new leader SK has timeline Lsn way ahead of the old one — reconnect + /// + /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently. + /// Both thresholds are configured per tenant. + fn select_connection_candidate( + &self, + safekeeper_timelines: HashMap, + ) -> Option { + let (&new_sk_id, new_sk_timeline, new_wal_producer_connstr) = safekeeper_timelines + .iter() + .filter(|(_, info)| { + info.commit_lsn > Some(self.timeline.tline.get_last_record_lsn()) + }) + .filter_map(|(sk_id, info)| { + match wal_stream_connection_string( + self.id, + info.safekeeper_connstr.as_deref()?, + info.pageserver_connstr.as_deref()?, + ) { + Ok(connstr) => Some((sk_id, info, connstr)), + Err(e) => { + error!("Failed to create wal receiver connection string from broker data of safekeeper node {sk_id}: {e:#}"); + None + } + } + }) + .max_by_key(|(_, info, _)| info.commit_lsn)?; + + match self.wal_connection_data.as_ref() { + None => Some(NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason: ReconnectReason::NoExistingConnection, + }), + Some(current_connection) => { + if current_connection.safekeeper_id == new_sk_id { + None + } else { + self.reason_to_reconnect(current_connection, new_sk_timeline) + .map(|reason| NewWalConnectionCandidate { + safekeeper_id: new_sk_id, + wal_producer_connstr: new_wal_producer_connstr, + reason, + }) + } + } + } + } + + fn reason_to_reconnect( + &self, + current_connection: &WalConnectionData, + new_sk_timeline: &SkTimelineInfo, + ) -> Option { + let last_sk_interaction_time = match current_connection.last_wal_receiver_data.as_ref() { + Some((last_wal_receiver_data, data_submission_time)) => { + let new_lsn = new_sk_timeline.commit_lsn?; + match new_lsn.0.checked_sub(last_wal_receiver_data.ps_writelsn) + { + Some(sk_lsn_advantage) => { + if sk_lsn_advantage >= self.max_lsn_wal_lag.get() { + return Some(ReconnectReason::LaggingWal { current_lsn: Lsn(last_wal_receiver_data.ps_writelsn), new_lsn, threshold: self.max_lsn_wal_lag }); + } + } + None => debug!("Best SK candidate has its commit Lsn behind the current timeline's latest consistent Lsn"), + } + *data_submission_time + } + None => current_connection.connection_init_time, + }; + + let now = Utc::now().naive_utc(); + match (now - last_sk_interaction_time).to_std() { + Ok(last_interaction) => { + if last_interaction > self.lagging_wal_timeout { + return Some(ReconnectReason::NoWalTimeout { + last_wal_interaction: last_sk_interaction_time, + check_time: now, + threshold: self.lagging_wal_timeout, + }); + } + } + Err(_e) => { + warn!("Last interaction with safekeeper {} happened in the future, ignoring the candidate. Interaction time: {last_sk_interaction_time}, now: {now}", + current_connection.safekeeper_id); + } + } + None + } +} + +fn wal_stream_connection_string( + ZTenantTimelineId { + tenant_id, + timeline_id, + }: ZTenantTimelineId, + listen_pg_addr_str: &str, + pageserver_connstr: &str, +) -> anyhow::Result { + let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); + let me_conf = sk_connstr + .parse::() + .with_context(|| { + format!("Failed to parse pageserver connection string '{sk_connstr}' as a postgres one") + })?; + let (host, port) = utils::connstring::connection_host_port(&me_conf); + Ok(format!( + "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'", + )) +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use crate::repository::{ + repo_harness::{RepoHarness, TIMELINE_ID}, + Repository, + }; + + use super::*; + + #[test] + fn no_connection_no_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("no_connection_no_candidate")?; + let mut data_manager_with_no_connection = dummy_wal_connection_manager(&harness); + data_manager_with_no_connection.wal_connection_data = None; + + let no_candidate = + data_manager_with_no_connection.select_connection_candidate(HashMap::from([ + ( + NodeId(0), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: None, + pageserver_connstr: Some("no safekeeper_connstr".to_string()), + }, + ), + ( + NodeId(1), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("no pageserver_connstr".to_string()), + pageserver_connstr: None, + }, + ), + ( + NodeId(2), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: None, + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), + pageserver_connstr: Some("no commit_lsn (p)".to_string()), + }, + ), + ( + NodeId(3), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: None, + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), + pageserver_connstr: Some("no commit_lsn (p)".to_string()), + }, + ), + ])); + + assert!( + no_candidate.is_none(), + "Expected no candidate selected out of non full data options, but got {no_candidate:?}" + ); + + Ok(()) + } + + #[tokio::test] + async fn connection_no_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("connection_no_candidate")?; + + let current_lsn = 100_000; + let connected_sk_id = NodeId(0); + let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); + let mut dummy_connection_data = dummy_connection_data( + ZTenantTimelineId { + tenant_id: harness.tenant_id, + timeline_id: TIMELINE_ID, + }, + connected_sk_id, + ) + .await; + let now = Utc::now().naive_utc(); + dummy_connection_data.last_wal_receiver_data = Some(( + ZenithFeedback { + current_timeline_size: 1, + ps_writelsn: 1, + ps_applylsn: current_lsn, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + }, + now, + )); + dummy_connection_data.connection_init_time = now; + data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); + + let no_candidate = + data_manager_with_connection.select_connection_candidate(HashMap::from([ + ( + connected_sk_id, + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn( + current_lsn + data_manager_with_connection.max_lsn_wal_lag.get() * 2 + )), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), + }, + ), + ( + NodeId(1), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(current_lsn)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("not advanced Lsn".to_string()), + pageserver_connstr: Some("not advanced Lsn (p)".to_string()), + }, + ), + ( + NodeId(2), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn( + current_lsn + data_manager_with_connection.max_lsn_wal_lag.get() / 2 + )), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("not enough advanced Lsn".to_string()), + pageserver_connstr: Some("not enough advanced Lsn (p)".to_string()), + }, + ), + ])); + + assert!( + no_candidate.is_none(), + "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}" + ); + + Ok(()) + } + + #[test] + fn no_connection_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("no_connection_candidate")?; + let mut data_manager_with_no_connection = dummy_wal_connection_manager(&harness); + data_manager_with_no_connection.wal_connection_data = None; + + let only_candidate = data_manager_with_no_connection + .select_connection_candidate(HashMap::from([( + NodeId(0), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(1 + data_manager_with_no_connection + .max_lsn_wal_lag + .get())), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), + }, + )])) + .expect("Expected one candidate selected out of the only data option, but got none"); + assert_eq!(only_candidate.safekeeper_id, NodeId(0)); + assert_eq!( + only_candidate.reason, + ReconnectReason::NoExistingConnection, + "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" + ); + assert!(only_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + assert!(only_candidate + .wal_producer_connstr + .contains(DUMMY_PAGESERVER_CONNSTR)); + + let selected_lsn = 100_000; + let biggest_wal_candidate = data_manager_with_no_connection + .select_connection_candidate(HashMap::from([ + ( + NodeId(0), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn - 100)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("smaller commit_lsn".to_string()), + pageserver_connstr: Some("smaller commit_lsn (p)".to_string()), + }, + ), + ( + NodeId(1), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), + }, + ), + ( + NodeId(2), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(Lsn(selected_lsn + 100)), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: None, + pageserver_connstr: Some( + "no safekeeper_connstr despite bigger commit_lsn".to_string(), + ), + }, + ), + ])) + .expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1)); + assert_eq!( + biggest_wal_candidate.reason, + ReconnectReason::NoExistingConnection, + "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" + ); + assert!(biggest_wal_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + assert!(biggest_wal_candidate + .wal_producer_connstr + .contains(DUMMY_PAGESERVER_CONNSTR)); + + Ok(()) + } + + #[tokio::test] + async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?; + let current_lsn = Lsn(100_000).align(); + + let id = ZTenantTimelineId { + tenant_id: harness.tenant_id, + timeline_id: TIMELINE_ID, + }; + + let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); + let connected_sk_id = NodeId(0); + let mut dummy_connection_data = dummy_connection_data(id, NodeId(0)).await; + let lagging_wal_timeout = + chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?; + let time_over_threshold = + Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + dummy_connection_data.last_wal_receiver_data = Some(( + ZenithFeedback { + current_timeline_size: 1, + ps_writelsn: current_lsn.0, + ps_applylsn: 1, + ps_flushlsn: 1, + ps_replytime: SystemTime::now(), + }, + time_over_threshold, + )); + dummy_connection_data.connection_init_time = time_over_threshold; + data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); + + let new_lsn = Lsn(current_lsn.0 + data_manager_with_connection.max_lsn_wal_lag.get() + 1); + let candidates = HashMap::from([ + ( + connected_sk_id, + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), + }, + ), + ( + NodeId(1), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(new_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), + pageserver_connstr: Some("advanced by Lsn safekeeper (p)".to_string()), + }, + ), + ]); + + let over_threshcurrent_candidate = data_manager_with_connection + .select_connection_candidate(candidates) + .expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1)); + assert_eq!( + over_threshcurrent_candidate.reason, + ReconnectReason::LaggingWal { + current_lsn, + new_lsn, + threshold: data_manager_with_connection.max_lsn_wal_lag + }, + "Should select bigger WAL safekeeper if it starts to lag enough" + ); + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains("advanced by Lsn safekeeper")); + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains("advanced by Lsn safekeeper (p)")); + + Ok(()) + } + + #[tokio::test] + async fn timeout_wal_over_threshcurrent_candidate() -> anyhow::Result<()> { + let harness = RepoHarness::create("timeout_wal_over_threshcurrent_candidate")?; + let current_lsn = Lsn(100_000).align(); + + let id = ZTenantTimelineId { + tenant_id: harness.tenant_id, + timeline_id: TIMELINE_ID, + }; + + let mut data_manager_with_connection = dummy_wal_connection_manager(&harness); + let mut dummy_connection_data = dummy_connection_data(id, NodeId(1)).await; + let lagging_wal_timeout = + chrono::Duration::from_std(data_manager_with_connection.lagging_wal_timeout)?; + let time_over_threshold = + Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; + dummy_connection_data.last_wal_receiver_data = None; + dummy_connection_data.connection_init_time = time_over_threshold; + data_manager_with_connection.wal_connection_data = Some(dummy_connection_data); + + let new_lsn = Lsn(current_lsn.0 + data_manager_with_connection.max_lsn_wal_lag.get() + 1); + let over_threshcurrent_candidate = data_manager_with_connection + .select_connection_candidate(HashMap::from([ + ( + NodeId(0), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(new_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), + }, + ), + ( + NodeId(1), + SkTimelineInfo { + last_log_term: None, + flush_lsn: None, + commit_lsn: Some(current_lsn), + backup_lsn: None, + remote_consistent_lsn: None, + peer_horizon_lsn: None, + safekeeper_connstr: Some("not advanced by Lsn safekeeper".to_string()), + pageserver_connstr: Some("not advanced by Lsn safekeeper".to_string()), + }, + ), + ])) + .expect( + "Expected one candidate selected out of multiple valid data options, but got none", + ); + + assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0)); + match over_threshcurrent_candidate.reason { + ReconnectReason::NoWalTimeout { + last_wal_interaction, + threshold, + .. + } => { + assert_eq!(last_wal_interaction, time_over_threshold); + assert_eq!(threshold, data_manager_with_connection.lagging_wal_timeout); + } + unexpected => panic!("Unexpected reason: {unexpected:?}"), + } + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains(DUMMY_SAFEKEEPER_CONNSTR)); + assert!(over_threshcurrent_candidate + .wal_producer_connstr + .contains(DUMMY_PAGESERVER_CONNSTR)); + + Ok(()) + } + + fn dummy_wal_connection_manager(harness: &RepoHarness) -> WalConnectionManager { + WalConnectionManager { + id: ZTenantTimelineId { + tenant_id: harness.tenant_id, + timeline_id: TIMELINE_ID, + }, + timeline: Arc::new(DatadirTimelineImpl::new( + harness + .load() + .create_empty_timeline(TIMELINE_ID, Lsn(0)) + .expect("Failed to create an empty timeline for dummy wal connection manager"), + 10_000, + )), + wal_connect_timeout: Duration::from_secs(1), + lagging_wal_timeout: Duration::from_secs(10), + max_lsn_wal_lag: NonZeroU64::new(300_000).unwrap(), + wal_connection_attempt: 0, + wal_connection_data: None, + } + } + + const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; + const DUMMY_PAGESERVER_CONNSTR: &str = "pageserver_connstr"; + + // the function itself does not need async, but it spawns a tokio::task underneath hence neeed + // a runtime to not to panic + async fn dummy_connection_data( + id: ZTenantTimelineId, + safekeeper_id: NodeId, + ) -> WalConnectionData { + let dummy_connstr = + wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR, DUMMY_PAGESERVER_CONNSTR) + .expect("Failed to construct dummy wal producer connstr"); + WalConnectionData { + safekeeper_id, + connection: WalReceiverConnection::open( + id, + safekeeper_id, + dummy_connstr, + Duration::from_secs(1), + ), + connection_init_time: Utc::now().naive_utc(), + last_wal_receiver_data: None, + } } } diff --git a/pageserver/src/walreceiver/connection_handler.rs b/pageserver/src/walreceiver/connection_handler.rs new file mode 100644 index 0000000000..aaccee9730 --- /dev/null +++ b/pageserver/src/walreceiver/connection_handler.rs @@ -0,0 +1,405 @@ +//! Actual Postgres connection handler to stream WAL to the server. +//! Runs as a separate, cancellable Tokio task. +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use anyhow::{bail, ensure, Context}; +use bytes::BytesMut; +use fail::fail_point; +use postgres::{SimpleQueryMessage, SimpleQueryRow}; +use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_protocol::message::backend::ReplicationMessage; +use postgres_types::PgLsn; +use tokio::{pin, select, sync::watch, time}; +use tokio_postgres::{replication::ReplicationStream, Client}; +use tokio_stream::StreamExt; +use tracing::{debug, error, info, info_span, trace, warn, Instrument}; +use utils::{ + lsn::Lsn, + pq_proto::ZenithFeedback, + zid::{NodeId, ZTenantTimelineId}, +}; + +use crate::{ + http::models::WalReceiverEntry, + repository::{Repository, Timeline}, + tenant_mgr, + walingest::WalIngest, +}; + +#[derive(Debug, Clone)] +pub enum WalConnectionEvent { + Started, + NewWal(ZenithFeedback), + End(Result<(), String>), +} + +/// A wrapper around standalone Tokio task, to poll its updates or cancel the task. +#[derive(Debug)] +pub struct WalReceiverConnection { + handle: tokio::task::JoinHandle<()>, + cancellation: watch::Sender<()>, + events_receiver: watch::Receiver, +} + +impl WalReceiverConnection { + /// Initializes the connection task, returning a set of handles on top of it. + /// The task is started immediately after the creation, fails if no connection is established during the timeout given. + pub fn open( + id: ZTenantTimelineId, + safekeeper_id: NodeId, + wal_producer_connstr: String, + connect_timeout: Duration, + ) -> Self { + let (cancellation, mut cancellation_receiver) = watch::channel(()); + let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started); + + let handle = tokio::spawn( + async move { + let connection_result = handle_walreceiver_connection( + id, + &wal_producer_connstr, + &events_sender, + &mut cancellation_receiver, + connect_timeout, + ) + .await + .map_err(|e| { + format!("Walreceiver connection for id {id} failed with error: {e:#}") + }); + + match &connection_result { + Ok(()) => { + debug!("Walreceiver connection for id {id} ended successfully") + } + Err(e) => warn!("{e}"), + } + events_sender + .send(WalConnectionEvent::End(connection_result)) + .ok(); + } + .instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)), + ); + + Self { + handle, + cancellation, + events_receiver, + } + } + + /// Polls for the next WAL receiver event, if there's any available since the last check. + /// Blocks if there's no new event available, returns `None` if no new events will ever occur. + /// Only the last event is returned, all events received between observatins are lost. + pub async fn next_event(&mut self) -> Option { + match self.events_receiver.changed().await { + Ok(()) => Some(self.events_receiver.borrow().clone()), + Err(_cancellation_error) => None, + } + } + + /// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed. + pub async fn shutdown(&mut self) -> anyhow::Result<()> { + self.cancellation.send(()).ok(); + let handle = &mut self.handle; + handle + .await + .context("Failed to join on a walreceiver connection task")?; + Ok(()) + } +} + +async fn handle_walreceiver_connection( + id: ZTenantTimelineId, + wal_producer_connstr: &str, + events_sender: &watch::Sender, + cancellation: &mut watch::Receiver<()>, + connect_timeout: Duration, +) -> anyhow::Result<()> { + // Connect to the database in replication mode. + info!("connecting to {wal_producer_connstr}"); + let connect_cfg = + format!("{wal_producer_connstr} application_name=pageserver replication=true"); + + let (mut replication_client, connection) = time::timeout( + connect_timeout, + tokio_postgres::connect(&connect_cfg, postgres::NoTls), + ) + .await + .context("Timed out while waiting for walreceiver connection to open")? + .context("Failed to open walreceiver conection")?; + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + let mut connection_cancellation = cancellation.clone(); + tokio::spawn( + async move { + info!("connected!"); + select! { + connection_result = connection => match connection_result{ + Ok(()) => info!("Walreceiver db connection closed"), + Err(connection_error) => { + if connection_error.is_closed() { + info!("Connection closed regularly: {connection_error}") + } else { + warn!("Connection aborted: {connection_error}") + } + } + }, + + _ = connection_cancellation.changed() => info!("Connection cancelled"), + } + } + .instrument(info_span!("safekeeper_handle_db")), + ); + + // Immediately increment the gauge, then create a job to decrement it on task exit. + // One of the pros of `defer!` is that this will *most probably* + // get called, even in presence of panics. + let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]); + gauge.inc(); + scopeguard::defer! { + gauge.dec(); + } + + let identify = identify_system(&mut replication_client).await?; + info!("{identify:?}"); + let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); + let mut caught_up = false; + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = id; + + let (repo, timeline) = tokio::task::spawn_blocking(move || { + let repo = tenant_mgr::get_repository_for_tenant(tenant_id) + .with_context(|| format!("no repository found for tenant {tenant_id}"))?; + let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id) + .with_context(|| { + format!("local timeline {timeline_id} not found for tenant {tenant_id}") + })?; + Ok::<_, anyhow::Error>((repo, timeline)) + }) + .await + .with_context(|| format!("Failed to spawn blocking task to get repository and timeline for tenant {tenant_id} timeline {timeline_id}"))??; + + // + // Start streaming the WAL, from where we left off previously. + // + // If we had previously received WAL up to some point in the middle of a WAL record, we + // better start from the end of last full WAL record, not in the middle of one. + let mut last_rec_lsn = timeline.get_last_record_lsn(); + let mut startpoint = last_rec_lsn; + + if startpoint == Lsn(0) { + bail!("No previous WAL position"); + } + + // There might be some padding after the last full record, skip it. + startpoint += startpoint.calc_padding(8u32); + + info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, server is at {end_of_wal}..."); + + let query = format!("START_REPLICATION PHYSICAL {startpoint}"); + + let copy_stream = replication_client.copy_both_simple(&query).await?; + let physical_stream = ReplicationStream::new(copy_stream); + pin!(physical_stream); + + let mut waldecoder = WalStreamDecoder::new(startpoint); + + let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?; + + while let Some(replication_message) = { + select! { + // check for shutdown first + biased; + _ = cancellation.changed() => { + info!("walreceiver interrupted"); + None + } + replication_message = physical_stream.next() => replication_message, + } + } { + let replication_message = replication_message?; + let status_update = match replication_message { + ReplicationMessage::XLogData(xlog_data) => { + // Pass the WAL data to the decoder, and see if we can decode + // more records as a result. + let data = xlog_data.data(); + let startlsn = Lsn::from(xlog_data.wal_start()); + let endlsn = startlsn + data.len() as u64; + + trace!("received XLogData between {startlsn} and {endlsn}"); + + waldecoder.feed_bytes(data); + + while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + let _enter = info_span!("processing record", lsn = %lsn).entered(); + + // It is important to deal with the aligned records as lsn in getPage@LSN is + // aligned and can be several bytes bigger. Without this alignment we are + // at risk of hitting a deadlock. + ensure!(lsn.is_aligned()); + + walingest.ingest_record(&timeline, recdata, lsn)?; + + fail_point!("walreceiver-after-ingest"); + + last_rec_lsn = lsn; + } + + if !caught_up && endlsn >= end_of_wal { + info!("caught up at LSN {endlsn}"); + caught_up = true; + } + + let timeline_to_check = Arc::clone(&timeline.tline); + tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance()) + .await + .with_context(|| { + format!("Spawned checkpoint check task panicked for timeline {id}") + })? + .with_context(|| { + format!("Failed to check checkpoint distance for timeline {id}") + })?; + + Some(endlsn) + } + + ReplicationMessage::PrimaryKeepAlive(keepalive) => { + let wal_end = keepalive.wal_end(); + let timestamp = keepalive.timestamp(); + let reply_requested = keepalive.reply() != 0; + + trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})"); + + if reply_requested { + Some(last_rec_lsn) + } else { + None + } + } + + _ => None, + }; + + if let Some(last_lsn) = status_update { + let remote_index = repo.get_remote_index(); + let timeline_remote_consistent_lsn = remote_index + .read() + .await + // here we either do not have this timeline in remote index + // or there were no checkpoints for it yet + .timeline_entry(&ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .map(|remote_timeline| remote_timeline.metadata.disk_consistent_lsn()) + // no checkpoint was uploaded + .unwrap_or(Lsn(0)); + + // The last LSN we processed. It is not guaranteed to survive pageserver crash. + let write_lsn = u64::from(last_lsn); + // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data + let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn()); + // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash + // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. + let apply_lsn = u64::from(timeline_remote_consistent_lsn); + let ts = SystemTime::now(); + + // Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS` + { + super::WAL_RECEIVER_ENTRIES.write().await.insert( + id, + WalReceiverEntry { + wal_producer_connstr: Some(wal_producer_connstr.to_owned()), + last_received_msg_lsn: Some(last_lsn), + last_received_msg_ts: Some( + ts.duration_since(SystemTime::UNIX_EPOCH) + .expect("Received message time should be before UNIX EPOCH!") + .as_micros(), + ), + }, + ); + } + + // Send zenith feedback message. + // Regular standby_status_update fields are put into this message. + let zenith_status_update = ZenithFeedback { + current_timeline_size: timeline.get_current_logical_size() as u64, + ps_writelsn: write_lsn, + ps_flushlsn: flush_lsn, + ps_applylsn: apply_lsn, + ps_replytime: ts, + }; + + debug!("zenith_status_update {zenith_status_update:?}"); + + let mut data = BytesMut::new(); + zenith_status_update.serialize(&mut data)?; + physical_stream + .as_mut() + .zenith_status_update(data.len() as u64, &data) + .await?; + if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) { + warn!("Wal connection event listener dropped, aborting the connection: {e}"); + return Ok(()); + } + } + } + + Ok(()) +} + +/// Data returned from the postgres `IDENTIFY_SYSTEM` command +/// +/// See the [postgres docs] for more details. +/// +/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html +#[derive(Debug)] +// As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as +// unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900 +#[allow(dead_code)] +struct IdentifySystem { + systemid: u64, + timeline: u32, + xlogpos: PgLsn, + dbname: Option, +} + +/// There was a problem parsing the response to +/// a postgres IDENTIFY_SYSTEM command. +#[derive(Debug, thiserror::Error)] +#[error("IDENTIFY_SYSTEM parse error")] +struct IdentifyError; + +/// Run the postgres `IDENTIFY_SYSTEM` command +async fn identify_system(client: &mut Client) -> anyhow::Result { + let query_str = "IDENTIFY_SYSTEM"; + let response = client.simple_query(query_str).await?; + + // get(N) from row, then parse it as some destination type. + fn get_parse(row: &SimpleQueryRow, idx: usize) -> Result + where + T: FromStr, + { + let val = row.get(idx).ok_or(IdentifyError)?; + val.parse::().or(Err(IdentifyError)) + } + + // extract the row contents into an IdentifySystem struct. + // written as a closure so I can use ? for Option here. + if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) { + Ok(IdentifySystem { + systemid: get_parse(first_row, 0)?, + timeline: get_parse(first_row, 1)?, + xlogpos: get_parse(first_row, 2)?, + dbname: get_parse(first_row, 3).ok(), + }) + } else { + Err(IdentifyError.into()) + } +} diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 9feb984c4f..5ce2591ff3 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -16,7 +16,8 @@ use toml_edit::Document; use tracing::*; use url::{ParseError, Url}; -use safekeeper::control_file::{self}; +use safekeeper::broker; +use safekeeper::control_file; use safekeeper::defaults::{ DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, }; @@ -26,7 +27,6 @@ use safekeeper::timeline::GlobalTimelines; use safekeeper::wal_backup; use safekeeper::wal_service; use safekeeper::SafeKeeperConf; -use safekeeper::{broker, callmemaybe}; use utils::{ http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener, zid::NodeId, @@ -272,9 +272,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let signals = signals::install_shutdown_handlers()?; let mut threads = vec![]; - let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel(); let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); - GlobalTimelines::init(callmemaybe_tx, wal_backup_launcher_tx); + GlobalTimelines::init(wal_backup_launcher_tx); let conf_ = conf.clone(); threads.push( @@ -296,29 +295,14 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let safekeeper_thread = thread::Builder::new() .name("Safekeeper thread".into()) .spawn(|| { - // thread code - let thread_result = wal_service::thread_main(conf_cloned, pg_listener); - if let Err(e) = thread_result { - info!("safekeeper thread terminated: {}", e); + if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) { + info!("safekeeper thread terminated: {e}"); } }) .unwrap(); threads.push(safekeeper_thread); - let conf_cloned = conf.clone(); - let callmemaybe_thread = thread::Builder::new() - .name("callmemaybe thread".into()) - .spawn(|| { - // thread code - let thread_result = callmemaybe::thread_main(conf_cloned, callmemaybe_rx); - if let Err(e) = thread_result { - error!("callmemaybe thread terminated: {}", e); - } - }) - .unwrap(); - threads.push(callmemaybe_thread); - if !conf.broker_endpoints.is_empty() { let conf_ = conf.clone(); threads.push( diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 1fae9b00f8..f328d2e85a 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -8,7 +8,6 @@ use url::Url; use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId}; pub mod broker; -pub mod callmemaybe; pub mod control_file; pub mod control_file_upgrade; pub mod handler; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index a89ed18071..7a6a8ca9b9 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -8,7 +8,6 @@ use anyhow::{bail, Context, Result}; use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; -use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::cmp::min; @@ -17,7 +16,6 @@ use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use std::{str, thread}; -use tokio::sync::mpsc::UnboundedSender; use tracing::*; use utils::{ bin_ser::BeSer, @@ -25,7 +23,6 @@ use utils::{ postgres_backend::PostgresBackend, pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback}, sock_split::ReadStream, - zid::{ZTenantId, ZTimelineId}, }; // See: https://www.postgresql.org/docs/13/protocol-replication.html @@ -83,40 +80,6 @@ impl Drop for ReplicationConnGuard { } } -// XXX: Naming is a bit messy here. -// This ReplicationStreamGuard lives as long as ReplicationConn -// and current ReplicationConnGuard is tied to the background thread -// that receives feedback. -struct ReplicationStreamGuard { - tx: UnboundedSender, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, - pageserver_connstr: String, -} - -impl Drop for ReplicationStreamGuard { - fn drop(&mut self) { - // the connection with pageserver is lost, - // resume callback subscription - debug!( - "Connection to pageserver is gone. Resume callmemaybe subsciption if necessary. tenantid {} timelineid {}", - self.tenant_id, self.timeline_id, - ); - - let subscription_key = SubscriptionStateKey::new( - self.tenant_id, - self.timeline_id, - self.pageserver_connstr.to_owned(), - ); - - self.tx - .send(CallmeEvent::Resume(subscription_key)) - .unwrap_or_else(|e| { - error!("failed to send Resume request to callmemaybe thread {}", e); - }); - } -} - impl ReplicationConn { /// Create a new `ReplicationConn` pub fn new(pgb: &mut PostgresBackend) -> Self { @@ -256,36 +219,6 @@ impl ReplicationConn { }; info!("Start replication from {:?} till {:?}", start_pos, stop_pos); - // Don't spam pageserver with callmemaybe queries - // when replication connection with pageserver is already established. - let _guard = { - if spg.appname == Some("wal_proposer_recovery".to_string()) { - None - } else { - let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); - let zttid = spg.timeline.get().zttid; - let tx_clone = spg.timeline.get().callmemaybe_tx.clone(); - let subscription_key = SubscriptionStateKey::new( - zttid.tenant_id, - zttid.timeline_id, - pageserver_connstr.clone(), - ); - tx_clone - .send(CallmeEvent::Pause(subscription_key)) - .unwrap_or_else(|e| { - error!("failed to send Pause request to callmemaybe thread {}", e); - }); - - // create a guard to subscribe callback again, when this connection will exit - Some(ReplicationStreamGuard { - tx: tx_clone, - tenant_id: zttid.tenant_id, - timeline_id: zttid.timeline_id, - pageserver_connstr, - }) - } - }; - // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2fc5bcc1f6..b7a549fef8 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -16,7 +16,7 @@ use std::fs::{self}; use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::time::Duration; -use tokio::sync::mpsc::{Sender, UnboundedSender}; +use tokio::sync::mpsc::Sender; use tracing::*; use utils::{ @@ -25,7 +25,6 @@ use utils::{ zid::{NodeId, ZTenantId, ZTenantTimelineId}, }; -use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use crate::control_file; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, @@ -191,79 +190,33 @@ impl SharedState { self.wal_backup_active } - /// start/change walsender (via callmemaybe). - fn callmemaybe_sub( + /// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections. + fn activate_walsender( &mut self, zttid: &ZTenantTimelineId, - pageserver_connstr: Option<&String>, - callmemaybe_tx: &UnboundedSender, - ) -> Result<()> { - if let Some(ref pageserver_connstr) = self.pageserver_connstr { - // unsub old sub. xxx: callmemaybe is going out - let old_subscription_key = SubscriptionStateKey::new( - zttid.tenant_id, - zttid.timeline_id, - pageserver_connstr.to_owned(), - ); - callmemaybe_tx - .send(CallmeEvent::Unsubscribe(old_subscription_key)) - .unwrap_or_else(|e| { - error!("failed to send Pause request to callmemaybe thread {}", e); - }); + new_pageserver_connstr: Option, + ) { + if self.pageserver_connstr != new_pageserver_connstr { + self.deactivate_walsender(zttid); + + if new_pageserver_connstr.is_some() { + info!( + "timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}", + zttid.timeline_id, + ); + } + self.pageserver_connstr = new_pageserver_connstr; } - if let Some(pageserver_connstr) = pageserver_connstr { - let subscription_key = SubscriptionStateKey::new( - zttid.tenant_id, - zttid.timeline_id, - pageserver_connstr.to_owned(), - ); - // xx: sending to channel under lock is not very cool, but - // shouldn't be a problem here. If it is, we can grab a counter - // here and later augment channel messages with it. - callmemaybe_tx - .send(CallmeEvent::Subscribe(subscription_key)) - .unwrap_or_else(|e| { - error!( - "failed to send Subscribe request to callmemaybe thread {}", - e - ); - }); - info!( - "timeline {} is subscribed to callmemaybe to {}", - zttid.timeline_id, pageserver_connstr - ); - } - self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned()); - Ok(()) } - /// Deactivate the timeline: stop callmemaybe. - fn callmemaybe_unsub( - &mut self, - zttid: &ZTenantTimelineId, - callmemaybe_tx: &UnboundedSender, - ) -> Result<()> { - if let Some(ref pageserver_connstr) = self.pageserver_connstr { - let subscription_key = SubscriptionStateKey::new( - zttid.tenant_id, - zttid.timeline_id, - pageserver_connstr.to_owned(), - ); - callmemaybe_tx - .send(CallmeEvent::Unsubscribe(subscription_key)) - .unwrap_or_else(|e| { - error!( - "failed to send Unsubscribe request to callmemaybe thread {}", - e - ); - }); + /// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming. + fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) { + if let Some(pageserver_connstr) = self.pageserver_connstr.take() { info!( - "timeline {} is unsubscribed from callmemaybe to {}", + "timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}", zttid.timeline_id, - self.pageserver_connstr.as_ref().unwrap() - ); + ) } - Ok(()) } fn get_wal_seg_size(&self) -> usize { @@ -332,7 +285,6 @@ impl SharedState { /// Database instance (tenant) pub struct Timeline { pub zttid: ZTenantTimelineId, - pub callmemaybe_tx: UnboundedSender, /// Sending here asks for wal backup launcher attention (start/stop /// offloading). Sending zttid instead of concrete command allows to do /// sending without timeline lock. @@ -348,7 +300,6 @@ pub struct Timeline { impl Timeline { fn new( zttid: ZTenantTimelineId, - callmemaybe_tx: UnboundedSender, wal_backup_launcher_tx: Sender, shared_state: SharedState, ) -> Timeline { @@ -356,7 +307,6 @@ impl Timeline { watch::channel(shared_state.sk.inmem.commit_lsn); Timeline { zttid, - callmemaybe_tx, wal_backup_launcher_tx, commit_lsn_watch_tx, commit_lsn_watch_rx, @@ -378,7 +328,7 @@ impl Timeline { // should have kind of generations assigned by compute to distinguish // the latest one or even pass it through consensus to reliably deliver // to all safekeepers. - shared_state.callmemaybe_sub(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?; + shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned()); } // Wake up wal backup launcher, if offloading not started yet. if is_wal_backup_action_pending { @@ -414,7 +364,7 @@ impl Timeline { (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); if stop { - shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?; + shared_state.deactivate_walsender(&self.zttid); return Ok(true); } } @@ -431,16 +381,14 @@ impl Timeline { /// Deactivates the timeline, assuming it is being deleted. /// Returns whether the timeline was already active. /// - /// The callmemaybe thread is stopped by the deactivation message. We assume all other threads - /// will stop by themselves eventually (possibly with errors, but no panics). There should be no - /// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but + /// We assume all threads will stop by themselves eventually (possibly with errors, but no panics). + /// There should be no compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but /// we're deleting the timeline anyway. pub async fn deactivate_for_delete(&self) -> Result { let was_active: bool; { - let mut shared_state = self.mutex.lock().unwrap(); + let shared_state = self.mutex.lock().unwrap(); was_active = shared_state.active; - shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?; } self.wal_backup_launcher_tx.send(self.zttid).await?; Ok(was_active) @@ -576,7 +524,8 @@ impl Timeline { shared_state.sk.inmem.remote_consistent_lsn, )), peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), - safekeeper_connection_string: Some(conf.listen_pg_addr.clone()), + safekeeper_connstr: Some(conf.listen_pg_addr.clone()), + pageserver_connstr: shared_state.pageserver_connstr.clone(), backup_lsn: Some(shared_state.sk.inmem.backup_lsn), }) } @@ -675,14 +624,12 @@ impl TimelineTools for Option> { struct GlobalTimelinesState { timelines: HashMap>, - callmemaybe_tx: Option>, wal_backup_launcher_tx: Option>, } lazy_static! { static ref TIMELINES_STATE: Mutex = Mutex::new(GlobalTimelinesState { timelines: HashMap::new(), - callmemaybe_tx: None, wal_backup_launcher_tx: None, }); } @@ -697,13 +644,8 @@ pub struct TimelineDeleteForceResult { pub struct GlobalTimelines; impl GlobalTimelines { - pub fn init( - callmemaybe_tx: UnboundedSender, - wal_backup_launcher_tx: Sender, - ) { + pub fn init(wal_backup_launcher_tx: Sender) { let mut state = TIMELINES_STATE.lock().unwrap(); - assert!(state.callmemaybe_tx.is_none()); - state.callmemaybe_tx = Some(callmemaybe_tx); assert!(state.wal_backup_launcher_tx.is_none()); state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); } @@ -726,7 +668,6 @@ impl GlobalTimelines { let new_tli = Arc::new(Timeline::new( zttid, - state.callmemaybe_tx.as_ref().unwrap().clone(), state.wal_backup_launcher_tx.as_ref().unwrap().clone(), shared_state, )); @@ -778,7 +719,6 @@ impl GlobalTimelines { let new_tli = Arc::new(Timeline::new( zttid, - state.callmemaybe_tx.as_ref().unwrap().clone(), state.wal_backup_launcher_tx.as_ref().unwrap().clone(), shared_state, )); diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 2b0e5ae8bd..d22654ad3e 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -63,10 +63,11 @@ def test_pageserver_http_get_wal_receiver_not_found(zenith_simple_env: ZenithEnv tenant_id, timeline_id = env.zenith_cli.create_tenant() - # no PG compute node is running, so no WAL receiver is running - with pytest.raises(ZenithPageserverApiException) as e: - _ = client.wal_receiver_get(tenant_id, timeline_id) - assert "Not Found" in str(e.value) + empty_response = client.wal_receiver_get(tenant_id, timeline_id) + + assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv): @@ -81,7 +82,6 @@ def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv): # a successful `wal_receiver_get` response must contain the below fields assert list(res.keys()) == [ - "thread_id", "wal_producer_connstr", "last_received_msg_lsn", "last_received_msg_ts", diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index ff905efa53..37bc5fe541 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1600,9 +1600,7 @@ class Postgres(PgProtocol): for cfg_line in cfg_lines: # walproposer uses different application_name if ("synchronous_standby_names" in cfg_line or - # don't ask pageserver to fetch WAL from compute - "callmemaybe_connstring" in cfg_line or - # don't repeat safekeepers multiple times + # don't repeat safekeepers/wal_acceptors multiple times "safekeepers" in cfg_line): continue f.write(cfg_line) diff --git a/test_runner/performance/test_bulk_tenant_create.py b/test_runner/performance/test_bulk_tenant_create.py index 0e16d3e749..a8a1ff7687 100644 --- a/test_runner/performance/test_bulk_tenant_create.py +++ b/test_runner/performance/test_bulk_tenant_create.py @@ -13,16 +13,12 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder @pytest.mark.parametrize('tenants_count', [1, 5, 10]) -@pytest.mark.parametrize('use_safekeepers', ['with_wa', 'without_wa']) def test_bulk_tenant_create( zenith_env_builder: ZenithEnvBuilder, - use_safekeepers: str, tenants_count: int, zenbenchmark, ): - """Measure tenant creation time (with and without wal acceptors)""" - if use_safekeepers == 'with_wa': - zenith_env_builder.num_safekeepers = 3 + zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() time_slices = [] @@ -31,15 +27,15 @@ def test_bulk_tenant_create( start = timeit.default_timer() tenant, _ = env.zenith_cli.create_tenant() - env.zenith_cli.create_timeline( - f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant) + env.zenith_cli.create_timeline(f'test_bulk_tenant_create_{tenants_count}_{i}', + tenant_id=tenant) # FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now? #if use_safekeepers == 'with_sa': # wa_factory.start_n_new(3) - pg_tenant = env.postgres.create_start( - f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant) + pg_tenant = env.postgres.create_start(f'test_bulk_tenant_create_{tenants_count}_{i}', + tenant_id=tenant) end = timeit.default_timer() time_slices.append(end - start)