Replace callmemaybe with etcd subscriptions on safekeeper timeline info

This commit is contained in:
Kirill Bulatov
2022-04-22 13:56:48 +03:00
committed by Kirill Bulatov
parent 6623c5b9d5
commit e5cb727572
25 changed files with 1968 additions and 693 deletions

View File

@@ -352,7 +352,6 @@ impl PostgresNode {
// This isn't really a supported configuration, but can be useful for // This isn't really a supported configuration, but can be useful for
// testing. // testing.
conf.append("synchronous_standby_names", "pageserver"); conf.append("synchronous_standby_names", "pageserver");
conf.append("neon.callmemaybe_connstring", &self.connstr());
} }
let mut file = File::create(self.pgdata().join("postgresql.conf"))?; let mut file = File::create(self.pgdata().join("postgresql.conf"))?;

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use std::net::TcpStream; use std::net::TcpStream;
use std::num::NonZeroU64;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Command; use std::process::Command;
use std::time::Duration; use std::time::Duration;
@@ -11,6 +12,7 @@ use nix::errno::Errno;
use nix::sys::signal::{kill, Signal}; use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid; use nix::unistd::Pid;
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest}; use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
use pageserver::tenant_mgr::TenantInfo;
use pageserver::timelines::TimelineInfo; use pageserver::timelines::TimelineInfo;
use postgres::{Config, NoTls}; use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -26,7 +28,6 @@ use utils::{
use crate::local_env::LocalEnv; use crate::local_env::LocalEnv;
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
use pageserver::tenant_mgr::TenantInfo;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum PageserverHttpError { pub enum PageserverHttpError {
@@ -37,6 +38,12 @@ pub enum PageserverHttpError {
Response(String), Response(String),
} }
impl From<anyhow::Error> for PageserverHttpError {
fn from(e: anyhow::Error) -> Self {
Self::Response(e.to_string())
}
}
type Result<T> = result::Result<T, PageserverHttpError>; type Result<T> = result::Result<T, PageserverHttpError>;
pub trait ResponseErrorMessageExt: Sized { pub trait ResponseErrorMessageExt: Sized {
@@ -410,6 +417,15 @@ impl PageServerNode {
.map(|x| x.parse::<usize>()) .map(|x| x.parse::<usize>())
.transpose()?, .transpose()?,
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()), pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.get("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
max_lsn_wal_lag: settings
.get("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
}) })
.send()? .send()?
.error_from_body()? .error_from_body()?
@@ -433,22 +449,41 @@ impl PageServerNode {
tenant_id, tenant_id,
checkpoint_distance: settings checkpoint_distance: settings
.get("checkpoint_distance") .get("checkpoint_distance")
.map(|x| x.parse::<u64>().unwrap()), .map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
compaction_target_size: settings compaction_target_size: settings
.get("compaction_target_size") .get("compaction_target_size")
.map(|x| x.parse::<u64>().unwrap()), .map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings.get("compaction_period").map(|x| x.to_string()), compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings compaction_threshold: settings
.get("compaction_threshold") .get("compaction_threshold")
.map(|x| x.parse::<usize>().unwrap()), .map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_threshold' as an integer")?,
gc_horizon: settings gc_horizon: settings
.get("gc_horizon") .get("gc_horizon")
.map(|x| x.parse::<u64>().unwrap()), .map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.get("gc_period").map(|x| x.to_string()), gc_period: settings.get("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings image_creation_threshold: settings
.get("image_creation_threshold") .get("image_creation_threshold")
.map(|x| x.parse::<usize>().unwrap()), .map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()), pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.get("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
max_lsn_wal_lag: settings
.get("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
}) })
.send()? .send()?
.error_from_body()?; .error_from_body()?;

View File

@@ -31,7 +31,7 @@ broker_endpoints = ['some://etcd']
# [remote_storage] # [remote_storage]
``` ```
The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user, The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user,
see the corresponding section below. see the corresponding section below.
Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank. Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank.
Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start. Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start.
@@ -54,7 +54,7 @@ Note that TOML distinguishes between strings and integers, the former require si
A list of endpoints (etcd currently) to connect and pull the information from. A list of endpoints (etcd currently) to connect and pull the information from.
Mandatory, does not have a default, since requires etcd to be started as a separate process, Mandatory, does not have a default, since requires etcd to be started as a separate process,
and its connection url should be specified separately. and its connection url should be specified separately.
#### broker_etcd_prefix #### broker_etcd_prefix
@@ -111,6 +111,20 @@ L0 delta layer threshold for L1 image layer creation. Default is 3.
WAL retention duration for PITR branching. Default is 30 days. WAL retention duration for PITR branching. Default is 30 days.
#### walreceiver_connect_timeout
Time to wait to establish the wal receiver connection before failing
#### lagging_wal_timeout
Time the pageserver did not get any WAL updates from safekeeper (if any).
Avoids lagging pageserver preemptively by forcing to switch it from stalled connections.
#### max_lsn_wal_lag
Difference between Lsn values of the latest available WAL on safekeepers: if currently connected safekeeper starts to lag too long and too much,
it gets swapped to the different one.
#### initial_superuser_name #### initial_superuser_name
Name of the initial superuser role, passed to initdb when a new tenant Name of the initial superuser role, passed to initdb when a new tenant

View File

@@ -31,7 +31,7 @@ struct SafekeeperTimeline {
/// Published data about safekeeper's timeline. Fields made optional for easy migrations. /// Published data about safekeeper's timeline. Fields made optional for easy migrations.
#[serde_as] #[serde_as]
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo { pub struct SkTimelineInfo {
/// Term of the last entry. /// Term of the last entry.
pub last_log_term: Option<u64>, pub last_log_term: Option<u64>,
@@ -55,7 +55,9 @@ pub struct SkTimelineInfo {
#[serde(default)] #[serde(default)]
pub peer_horizon_lsn: Option<Lsn>, pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)] #[serde(default)]
pub safekeeper_connection_string: Option<String>, pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub pageserver_connstr: Option<String>,
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]

View File

@@ -336,11 +336,11 @@ impl PostgresBackend {
let have_tls = self.tls_config.is_some(); let have_tls = self.tls_config.is_some();
match msg { match msg {
FeMessage::StartupPacket(m) => { FeMessage::StartupPacket(m) => {
trace!("got startup message {:?}", m); trace!("got startup message {m:?}");
match m { match m {
FeStartupPacket::SslRequest => { FeStartupPacket::SslRequest => {
info!("SSL requested"); debug!("SSL requested");
self.write_message(&BeMessage::EncryptionResponse(have_tls))?; self.write_message(&BeMessage::EncryptionResponse(have_tls))?;
if have_tls { if have_tls {
@@ -349,7 +349,7 @@ impl PostgresBackend {
} }
} }
FeStartupPacket::GssEncRequest => { FeStartupPacket::GssEncRequest => {
info!("GSS requested"); debug!("GSS requested");
self.write_message(&BeMessage::EncryptionResponse(false))?; self.write_message(&BeMessage::EncryptionResponse(false))?;
} }
FeStartupPacket::StartupMessage { .. } => { FeStartupPacket::StartupMessage { .. } => {
@@ -433,12 +433,7 @@ impl PostgresBackend {
// full cause of the error, not just the top-level context + its trace. // full cause of the error, not just the top-level context + its trace.
// We don't want to send that in the ErrorResponse though, // We don't want to send that in the ErrorResponse though,
// because it's not relevant to the compute node logs. // because it's not relevant to the compute node logs.
if query_string.starts_with("callmemaybe") { error!("query handler for '{}' failed: {:?}", query_string, e);
// FIXME avoid printing a backtrace for tenant x not found errors until this is properly fixed
error!("query handler for '{}' failed: {}", query_string, e);
} else {
error!("query handler for '{}' failed: {:?}", query_string, e);
}
self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?; self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?;
// TODO: untangle convoluted control flow // TODO: untangle convoluted control flow
if e.to_string().contains("failed to run") { if e.to_string().contains("failed to run") {

View File

@@ -193,7 +193,7 @@ pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId); zid_newtype!(ZTenantId);
// A pair uniquely identifying Zenith instance. // A pair uniquely identifying Zenith instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ZTenantTimelineId { pub struct ZTenantTimelineId {
pub tenant_id: ZTenantId, pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId, pub timeline_id: ZTimelineId,

View File

@@ -5,7 +5,7 @@ edition = "2021"
[features] [features]
# It is simpler infra-wise to have failpoints enabled by default # It is simpler infra-wise to have failpoints enabled by default
# It shouldn't affect perf in any way because failpoints # It shouldn't affect performance in any way because failpoints
# are not placed in hot code paths # are not placed in hot code paths
default = ["failpoints"] default = ["failpoints"]
profiling = ["pprof"] profiling = ["pprof"]

View File

@@ -480,6 +480,21 @@ impl PageServerConf {
if let Some(pitr_interval) = item.get("pitr_interval") { if let Some(pitr_interval) = item.get("pitr_interval") {
t_conf.pitr_interval = Some(parse_toml_duration("pitr_interval", pitr_interval)?); t_conf.pitr_interval = Some(parse_toml_duration("pitr_interval", pitr_interval)?);
} }
if let Some(walreceiver_connect_timeout) = item.get("walreceiver_connect_timeout") {
t_conf.walreceiver_connect_timeout = Some(parse_toml_duration(
"walreceiver_connect_timeout",
walreceiver_connect_timeout,
)?);
}
if let Some(lagging_wal_timeout) = item.get("lagging_wal_timeout") {
t_conf.lagging_wal_timeout = Some(parse_toml_duration(
"lagging_wal_timeout",
lagging_wal_timeout,
)?);
}
if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") {
t_conf.max_lsn_wal_lag = Some(parse_toml_from_str("max_lsn_wal_lag", max_lsn_wal_lag)?);
}
Ok(t_conf) Ok(t_conf)
} }

View File

@@ -1,3 +1,5 @@
use std::num::NonZeroU64;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr}; use serde_with::{serde_as, DisplayFromStr};
use utils::{ use utils::{
@@ -33,6 +35,9 @@ pub struct TenantCreateRequest {
pub gc_period: Option<String>, pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>, pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>, pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
} }
#[serde_as] #[serde_as]
@@ -68,6 +73,9 @@ pub struct TenantConfigRequest {
pub gc_period: Option<String>, pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>, pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>, pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
} }
impl TenantConfigRequest { impl TenantConfigRequest {
@@ -82,6 +90,21 @@ impl TenantConfigRequest {
gc_period: None, gc_period: None,
image_creation_threshold: None, image_creation_threshold: None,
pitr_interval: None, pitr_interval: None,
walreceiver_connect_timeout: None,
lagging_wal_timeout: None,
max_lsn_wal_lag: None,
} }
} }
} }
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
/// We keep one WAL receiver active per timeline.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WalReceiverEntry {
pub wal_producer_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
}

View File

@@ -229,23 +229,16 @@ async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Bod
check_permission(&request, Some(tenant_id))?; check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let wal_receiver_entry = crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
.instrument(info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id))
.await
.ok_or_else(|| {
ApiError::NotFound(format!(
"WAL receiver data not found for tenant {tenant_id} and timeline {timeline_id}"
))
})?;
let wal_receiver = tokio::task::spawn_blocking(move || { json_response(StatusCode::OK, &wal_receiver_entry)
let _enter =
info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id).entered();
crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
})
.await
.map_err(ApiError::from_err)?
.ok_or_else(|| {
ApiError::NotFound(format!(
"WAL receiver not found for tenant {} and timeline {}",
tenant_id, timeline_id
))
})?;
json_response(StatusCode::OK, wal_receiver)
} }
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> { async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -402,6 +395,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?); Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
} }
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout).map_err(ApiError::from_err)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout =
Some(humantime::parse_duration(&lagging_wal_timeout).map_err(ApiError::from_err)?);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance; tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
tenant_conf.compaction_target_size = request_data.compaction_target_size; tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold; tenant_conf.compaction_threshold = request_data.compaction_threshold;
@@ -449,6 +455,18 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.pitr_interval = tenant_conf.pitr_interval =
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?); Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
} }
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout).map_err(ApiError::from_err)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout =
Some(humantime::parse_duration(&lagging_wal_timeout).map_err(ApiError::from_err)?);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance; tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
tenant_conf.compaction_target_size = request_data.compaction_target_size; tenant_conf.compaction_target_size = request_data.compaction_target_size;

View File

@@ -25,6 +25,7 @@ use std::collections::{BTreeSet, HashSet};
use std::fs; use std::fs;
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::Write; use std::io::Write;
use std::num::NonZeroU64;
use std::ops::{Bound::Included, Deref, Range}; use std::ops::{Bound::Included, Deref, Range};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool}; use std::sync::atomic::{self, AtomicBool};
@@ -557,6 +558,27 @@ impl LayeredRepository {
.unwrap_or(self.conf.default_tenant_conf.pitr_interval) .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
} }
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
}
pub fn get_lagging_wal_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
}
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> { pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> {
let mut tenant_conf = self.tenant_conf.write().unwrap(); let mut tenant_conf = self.tenant_conf.write().unwrap();

View File

@@ -7,7 +7,6 @@
// *status* -- show actual info about this pageserver, // *status* -- show actual info about this pageserver,
// *pagestream* -- enter mode where smgr and pageserver talk with their // *pagestream* -- enter mode where smgr and pageserver talk with their
// custom protocol. // custom protocol.
// *callmemaybe <zenith timelineid> $url* -- ask pageserver to start walreceiver on $url
// //
use anyhow::{bail, ensure, Context, Result}; use anyhow::{bail, ensure, Context, Result};
@@ -38,7 +37,6 @@ use crate::repository::Timeline;
use crate::tenant_mgr; use crate::tenant_mgr;
use crate::thread_mgr; use crate::thread_mgr;
use crate::thread_mgr::ThreadKind; use crate::thread_mgr::ThreadKind;
use crate::walreceiver;
use crate::CheckpointConfig; use crate::CheckpointConfig;
use metrics::{register_histogram_vec, HistogramVec}; use metrics::{register_histogram_vec, HistogramVec};
use postgres_ffi::xlog_utils::to_pg_timestamp; use postgres_ffi::xlog_utils::to_pg_timestamp;
@@ -716,30 +714,6 @@ impl postgres_backend::Handler for PageServerHandler {
// Check that the timeline exists // Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("callmemaybe ") {
// callmemaybe <zenith tenantid as hex string> <zenith timelineid as hex string> <connstr>
// TODO lazy static
let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid callmemaybe: '{}'", query_string))?;
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let connstr = caps.get(3).unwrap().as_str().to_owned();
self.check_permission(Some(tenantid))?;
let _enter =
info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered();
// Check that the timeline exists
tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.to_ascii_lowercase().starts_with("set ") { } else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'" // important because psycopg2 executes "SET datestyle TO 'ISO'"

View File

@@ -469,6 +469,9 @@ pub mod repo_harness {
gc_period: Some(tenant_conf.gc_period), gc_period: Some(tenant_conf.gc_period),
image_creation_threshold: Some(tenant_conf.image_creation_threshold), image_creation_threshold: Some(tenant_conf.image_creation_threshold),
pitr_interval: Some(tenant_conf.pitr_interval), pitr_interval: Some(tenant_conf.pitr_interval),
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
} }
} }
} }

View File

@@ -10,6 +10,7 @@
//! //!
use crate::config::PageServerConf; use crate::config::PageServerConf;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use utils::zid::ZTenantId; use utils::zid::ZTenantId;
@@ -34,6 +35,9 @@ pub mod defaults {
pub const DEFAULT_GC_PERIOD: &str = "100 s"; pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3; pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "30 days"; pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1_000_000;
} }
/// Per-tenant configuration options /// Per-tenant configuration options
@@ -68,6 +72,17 @@ pub struct TenantConf {
// Page versions older than this are garbage collected away. // Page versions older than this are garbage collected away.
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]
pub pitr_interval: Duration, pub pitr_interval: Duration,
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Duration,
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
/// A stalled safekeeper will be changed to a newer one when it appears.
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Duration,
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
} }
/// Same as TenantConf, but this struct preserves the information about /// Same as TenantConf, but this struct preserves the information about
@@ -85,6 +100,11 @@ pub struct TenantConfOpt {
pub image_creation_threshold: Option<usize>, pub image_creation_threshold: Option<usize>,
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]
pub pitr_interval: Option<Duration>, pub pitr_interval: Option<Duration>,
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Option<Duration>,
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
} }
impl TenantConfOpt { impl TenantConfOpt {
@@ -108,6 +128,13 @@ impl TenantConfOpt {
.image_creation_threshold .image_creation_threshold
.unwrap_or(global_conf.image_creation_threshold), .unwrap_or(global_conf.image_creation_threshold),
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval), pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
walreceiver_connect_timeout: self
.walreceiver_connect_timeout
.unwrap_or(global_conf.walreceiver_connect_timeout),
lagging_wal_timeout: self
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
} }
} }
@@ -136,6 +163,15 @@ impl TenantConfOpt {
if let Some(pitr_interval) = other.pitr_interval { if let Some(pitr_interval) = other.pitr_interval {
self.pitr_interval = Some(pitr_interval); self.pitr_interval = Some(pitr_interval);
} }
if let Some(walreceiver_connect_timeout) = other.walreceiver_connect_timeout {
self.walreceiver_connect_timeout = Some(walreceiver_connect_timeout);
}
if let Some(lagging_wal_timeout) = other.lagging_wal_timeout {
self.lagging_wal_timeout = Some(lagging_wal_timeout);
}
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
} }
} }
@@ -155,6 +191,14 @@ impl TenantConf {
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD, image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL) pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"), .expect("cannot parse default PITR interval"),
walreceiver_connect_timeout: humantime::parse_duration(
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
)
.expect("cannot parse default walreceiver connect timeout"),
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
} }
} }
@@ -175,6 +219,16 @@ impl TenantConf {
gc_period: Duration::from_secs(10), gc_period: Duration::from_secs(10),
image_creation_threshold: defaults::DEFAULT_IMAGE_CREATION_THRESHOLD, image_creation_threshold: defaults::DEFAULT_IMAGE_CREATION_THRESHOLD,
pitr_interval: Duration::from_secs(60 * 60), pitr_interval: Duration::from_secs(60 * 60),
walreceiver_connect_timeout: humantime::parse_duration(
defaults::DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
)
.unwrap(),
lagging_wal_timeout: humantime::parse_duration(
defaults::DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT,
)
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
} }
} }
} }

View File

@@ -8,11 +8,10 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::storage_sync::index::RemoteIndex; use crate::storage_sync::index::RemoteIndex;
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt; use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind; use crate::thread_mgr::ThreadKind;
use crate::timelines;
use crate::timelines::CreateRepo; use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager; use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{DatadirTimelineImpl, RepositoryImpl}; use crate::{DatadirTimelineImpl, RepositoryImpl};
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -21,23 +20,30 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::*; use tracing::*;
use utils::lsn::Lsn; use utils::lsn::Lsn;
use utils::zid::{ZTenantId, ZTimelineId}; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
mod tenants_state { mod tenants_state {
use anyhow::ensure;
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
}; };
use tokio::sync::mpsc;
use tracing::{debug, error};
use utils::zid::ZTenantId; use utils::zid::ZTenantId;
use crate::tenant_mgr::Tenant; use crate::tenant_mgr::{LocalTimelineUpdate, Tenant};
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref TENANTS: RwLock<HashMap<ZTenantId, Tenant>> = RwLock::new(HashMap::new()); static ref TENANTS: RwLock<HashMap<ZTenantId, Tenant>> = RwLock::new(HashMap::new());
/// Sends updates to the local timelines (creation and deletion) to the WAL receiver,
/// so that it can enable/disable corresponding processes.
static ref TIMELINE_UPDATE_SENDER: RwLock<Option<mpsc::UnboundedSender<LocalTimelineUpdate>>> = RwLock::new(None);
} }
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Tenant>> { pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Tenant>> {
@@ -51,6 +57,39 @@ mod tenants_state {
.write() .write()
.expect("Failed to write() tenants lock, it got poisoned") .expect("Failed to write() tenants lock, it got poisoned")
} }
pub(super) fn set_timeline_update_sender(
timeline_updates_sender: mpsc::UnboundedSender<LocalTimelineUpdate>,
) -> anyhow::Result<()> {
let mut sender_guard = TIMELINE_UPDATE_SENDER
.write()
.expect("Failed to write() timeline_update_sender lock, it got poisoned");
ensure!(sender_guard.is_none(), "Timeline update sender already set");
*sender_guard = Some(timeline_updates_sender);
Ok(())
}
pub(super) fn try_send_timeline_update(update: LocalTimelineUpdate) {
match TIMELINE_UPDATE_SENDER
.read()
.expect("Failed to read() timeline_update_sender lock, it got poisoned")
.as_ref()
{
Some(sender) => {
if let Err(e) = sender.send(update) {
error!("Failed to send timeline update: {}", e);
}
}
None => debug!("Timeline update sender is not enabled, cannot send update {update:?}"),
}
}
pub(super) fn stop_timeline_update_sender() {
TIMELINE_UPDATE_SENDER
.write()
.expect("Failed to write() timeline_update_sender lock, it got poisoned")
.take();
}
} }
struct Tenant { struct Tenant {
@@ -87,10 +126,10 @@ pub enum TenantState {
impl fmt::Display for TenantState { impl fmt::Display for TenantState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
TenantState::Active => f.write_str("Active"), Self::Active => f.write_str("Active"),
TenantState::Idle => f.write_str("Idle"), Self::Idle => f.write_str("Idle"),
TenantState::Stopping => f.write_str("Stopping"), Self::Stopping => f.write_str("Stopping"),
TenantState::Broken => f.write_str("Broken"), Self::Broken => f.write_str("Broken"),
} }
} }
} }
@@ -99,6 +138,11 @@ impl fmt::Display for TenantState {
/// Timelines that are only partially available locally (remote storage has more data than this pageserver) /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the repository once download is completed. /// are scheduled for download and added to the repository once download is completed.
pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIndex> { pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIndex> {
let (timeline_updates_sender, timeline_updates_receiver) =
mpsc::unbounded_channel::<LocalTimelineUpdate>();
tenants_state::set_timeline_update_sender(timeline_updates_sender)?;
walreceiver::init_wal_receiver_main_thread(conf, timeline_updates_receiver)?;
let SyncStartupData { let SyncStartupData {
remote_index, remote_index,
local_timeline_init_statuses, local_timeline_init_statuses,
@@ -113,16 +157,27 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
// loading a tenant is serious, but it's better to complete the startup and // loading a tenant is serious, but it's better to complete the startup and
// serve other tenants, than fail completely. // serve other tenants, than fail completely.
error!("Failed to initialize local tenant {tenant_id}: {:?}", err); error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
let mut m = tenants_state::write_tenants(); set_tenant_state(tenant_id, TenantState::Broken)?;
if let Some(tenant) = m.get_mut(&tenant_id) {
tenant.state = TenantState::Broken;
}
} }
} }
Ok(remote_index) Ok(remote_index)
} }
pub enum LocalTimelineUpdate {
Detach(ZTenantTimelineId),
Attach(ZTenantTimelineId, Arc<DatadirTimelineImpl>),
}
impl std::fmt::Debug for LocalTimelineUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Detach(ttid) => f.debug_tuple("Remove").field(ttid).finish(),
Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(),
}
}
}
/// Updates tenants' repositories, changing their timelines state in memory. /// Updates tenants' repositories, changing their timelines state in memory.
pub fn apply_timeline_sync_status_updates( pub fn apply_timeline_sync_status_updates(
conf: &'static PageServerConf, conf: &'static PageServerConf,
@@ -160,6 +215,7 @@ pub fn apply_timeline_sync_status_updates(
/// Shut down all tenants. This runs as part of pageserver shutdown. /// Shut down all tenants. This runs as part of pageserver shutdown.
/// ///
pub fn shutdown_all_tenants() { pub fn shutdown_all_tenants() {
tenants_state::stop_timeline_update_sender();
let mut m = tenants_state::write_tenants(); let mut m = tenants_state::write_tenants();
let mut tenantids = Vec::new(); let mut tenantids = Vec::new();
for (tenantid, tenant) in m.iter_mut() { for (tenantid, tenant) in m.iter_mut() {
@@ -173,7 +229,7 @@ pub fn shutdown_all_tenants() {
} }
drop(m); drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None); thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None); thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None); thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
@@ -247,32 +303,49 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
Some(tenants_state::read_tenants().get(&tenantid)?.state) Some(tenants_state::read_tenants().get(&tenantid)?.state)
} }
/// pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow::Result<()> {
/// Change the state of a tenant to Active and launch its compactor and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
let mut m = tenants_state::write_tenants(); let mut m = tenants_state::write_tenants();
let tenant = m let tenant = m
.get_mut(&tenant_id) .get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?; .with_context(|| format!("Tenant not found for id {tenant_id}"))?;
let old_state = tenant.state;
tenant.state = new_state;
drop(m);
info!("activating tenant {tenant_id}"); match (old_state, new_state) {
(TenantState::Broken, TenantState::Broken)
match tenant.state { | (TenantState::Active, TenantState::Active)
// If the tenant is already active, nothing to do. | (TenantState::Idle, TenantState::Idle)
TenantState::Active => {} | (TenantState::Stopping, TenantState::Stopping) => {
debug!("tenant {tenant_id} already in state {new_state}");
// If it's Idle, launch the compactor and GC threads }
TenantState::Idle => { (TenantState::Broken, ignored) => {
thread_mgr::spawn( debug!("Ignoring {ignored} since tenant {tenant_id} is in broken state");
}
(_, TenantState::Broken) => {
debug!("Setting tenant {tenant_id} status to broken");
}
(TenantState::Stopping, ignored) => {
debug!("Ignoring {ignored} since tenant {tenant_id} is in stopping state");
}
(TenantState::Idle, TenantState::Active) => {
info!("activating tenant {tenant_id}");
let compactor_spawn_result = thread_mgr::spawn(
ThreadKind::Compactor, ThreadKind::Compactor,
Some(tenant_id), Some(tenant_id),
None, None,
"Compactor thread", "Compactor thread",
false, false,
move || crate::tenant_threads::compact_loop(tenant_id), move || crate::tenant_threads::compact_loop(tenant_id),
)?; );
if compactor_spawn_result.is_err() {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
}
compactor_spawn_result?;
let gc_spawn_result = thread_mgr::spawn( let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector, ThreadKind::GarbageCollector,
@@ -286,21 +359,31 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result { if let Err(e) = &gc_spawn_result {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}"); error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result; return gc_spawn_result;
} }
tenant.state = TenantState::Active;
} }
(TenantState::Idle, TenantState::Stopping) => {
TenantState::Stopping => { info!("stopping idle tenant {tenant_id}");
// don't re-activate it if it's being stopped
} }
(TenantState::Active, TenantState::Stopping | TenantState::Idle) => {
TenantState::Broken => { info!("stopping tenant {tenant_id} threads due to new state {new_state}");
// cannot activate thread_mgr::shutdown_threads(
Some(ThreadKind::WalReceiverManager),
Some(tenant_id),
None,
);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
} }
} }
Ok(()) Ok(())
} }
@@ -325,15 +408,15 @@ pub fn get_local_timeline_with_load(
.with_context(|| format!("Tenant {tenant_id} not found"))?; .with_context(|| format!("Tenant {tenant_id} not found"))?;
if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) { if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) {
return Ok(Arc::clone(page_tline)); Ok(Arc::clone(page_tline))
} else {
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
tenant
.local_timelines
.insert(timeline_id, Arc::clone(&page_tline));
Ok(page_tline)
} }
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
tenant
.local_timelines
.insert(timeline_id, Arc::clone(&page_tline));
Ok(page_tline)
} }
pub fn detach_timeline( pub fn detach_timeline(
@@ -351,6 +434,9 @@ pub fn detach_timeline(
.detach_timeline(timeline_id) .detach_timeline(timeline_id)
.context("Failed to detach inmem tenant timeline")?; .context("Failed to detach inmem tenant timeline")?;
tenant.local_timelines.remove(&timeline_id); tenant.local_timelines.remove(&timeline_id);
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach(
ZTenantTimelineId::new(tenant_id, timeline_id),
));
} }
None => bail!("Tenant {tenant_id} not found in local tenant state"), None => bail!("Tenant {tenant_id} not found in local tenant state"),
} }
@@ -379,6 +465,12 @@ fn load_local_timeline(
repartition_distance, repartition_distance,
)); ));
page_tline.init_logical_size()?; page_tline.init_logical_size()?;
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach(
ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
Arc::clone(&page_tline),
));
Ok(page_tline) Ok(page_tline)
} }

View File

@@ -91,8 +91,8 @@ pub enum ThreadKind {
// associated with one later, after receiving a command from the client. // associated with one later, after receiving a command from the client.
PageRequestHandler, PageRequestHandler,
// Thread that connects to a safekeeper to fetch WAL for one timeline. // Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
WalReceiver, WalReceiverManager,
// Thread that handles compaction of all timelines for a tenant. // Thread that handles compaction of all timelines for a tenant.
Compactor, Compactor,

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,405 @@
//! Actual Postgres connection handler to stream WAL to the server.
//! Runs as a separate, cancellable Tokio task.
use std::{
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use fail::fail_point;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
zid::{NodeId, ZTenantTimelineId},
};
use crate::{
http::models::WalReceiverEntry,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
};
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ZenithFeedback),
End(Result<(), String>),
}
/// A wrapper around standalone Tokio task, to poll its updates or cancel the task.
#[derive(Debug)]
pub struct WalReceiverConnection {
handle: tokio::task::JoinHandle<()>,
cancellation: watch::Sender<()>,
events_receiver: watch::Receiver<WalConnectionEvent>,
}
impl WalReceiverConnection {
/// Initializes the connection task, returning a set of handles on top of it.
/// The task is started immediately after the creation, fails if no connection is established during the timeout given.
pub fn open(
id: ZTenantTimelineId,
safekeeper_id: NodeId,
wal_producer_connstr: String,
connect_timeout: Duration,
) -> Self {
let (cancellation, mut cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started);
let handle = tokio::spawn(
async move {
let connection_result = handle_walreceiver_connection(
id,
&wal_producer_connstr,
&events_sender,
&mut cancellation_receiver,
connect_timeout,
)
.await
.map_err(|e| {
format!("Walreceiver connection for id {id} failed with error: {e:#}")
});
match &connection_result {
Ok(()) => {
debug!("Walreceiver connection for id {id} ended successfully")
}
Err(e) => warn!("{e}"),
}
events_sender
.send(WalConnectionEvent::End(connection_result))
.ok();
}
.instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)),
);
Self {
handle,
cancellation,
events_receiver,
}
}
/// Polls for the next WAL receiver event, if there's any available since the last check.
/// Blocks if there's no new event available, returns `None` if no new events will ever occur.
/// Only the last event is returned, all events received between observatins are lost.
pub async fn next_event(&mut self) -> Option<WalConnectionEvent> {
match self.events_receiver.changed().await {
Ok(()) => Some(self.events_receiver.borrow().clone()),
Err(_cancellation_error) => None,
}
}
/// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed.
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
self.cancellation.send(()).ok();
let handle = &mut self.handle;
handle
.await
.context("Failed to join on a walreceiver connection task")?;
Ok(())
}
}
async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_producer_connstr: &str,
events_sender: &watch::Sender<WalConnectionEvent>,
cancellation: &mut watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
// Connect to the database in replication mode.
info!("connecting to {wal_producer_connstr}");
let connect_cfg =
format!("{wal_producer_connstr} application_name=pageserver replication=true");
let (mut replication_client, connection) = time::timeout(
connect_timeout,
tokio_postgres::connect(&connect_cfg, postgres::NoTls),
)
.await
.context("Timed out while waiting for walreceiver connection to open")?
.context("Failed to open walreceiver conection")?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
let mut connection_cancellation = cancellation.clone();
tokio::spawn(
async move {
info!("connected!");
select! {
connection_result = connection => match connection_result{
Ok(()) => info!("Walreceiver db connection closed"),
Err(connection_error) => {
if connection_error.is_closed() {
info!("Connection closed regularly: {connection_error}")
} else {
warn!("Connection aborted: {connection_error}")
}
}
},
_ = connection_cancellation.changed() => info!("Connection cancelled"),
}
}
.instrument(info_span!("safekeeper_handle_db")),
);
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}");
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = id;
let (repo, timeline) = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("no repository found for tenant {tenant_id}"))?;
let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)
.with_context(|| {
format!("local timeline {timeline_id} not found for tenant {tenant_id}")
})?;
Ok::<_, anyhow::Error>((repo, timeline))
})
.await
.with_context(|| format!("Failed to spawn blocking task to get repository and timeline for tenant {tenant_id} timeline {timeline_id}"))??;
//
// Start streaming the WAL, from where we left off previously.
//
// If we had previously received WAL up to some point in the middle of a WAL record, we
// better start from the end of last full WAL record, not in the middle of one.
let mut last_rec_lsn = timeline.get_last_record_lsn();
let mut startpoint = last_rec_lsn;
if startpoint == Lsn(0) {
bail!("No previous WAL position");
}
// There might be some padding after the last full record, skip it.
startpoint += startpoint.calc_padding(8u32);
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, server is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let copy_stream = replication_client.copy_both_simple(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;
while let Some(replication_message) = {
select! {
// check for shutdown first
biased;
_ = cancellation.changed() => {
info!("walreceiver interrupted");
None
}
replication_message = physical_stream.next() => replication_message,
}
} {
let replication_message = replication_message?;
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
trace!("received XLogData between {startlsn} and {endlsn}");
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest.ingest_record(&timeline, recdata, lsn)?;
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
}
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {endlsn}");
caught_up = true;
}
let timeline_to_check = Arc::clone(&timeline.tline);
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
.await
.with_context(|| {
format!("Spawned checkpoint check task panicked for timeline {id}")
})?
.with_context(|| {
format!("Failed to check checkpoint distance for timeline {id}")
})?;
Some(endlsn)
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
let wal_end = keepalive.wal_end();
let timestamp = keepalive.timestamp();
let reply_requested = keepalive.reply() != 0;
trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
if reply_requested {
Some(last_rec_lsn)
} else {
None
}
}
_ => None,
};
if let Some(last_lsn) = status_update {
let remote_index = repo.get_remote_index();
let timeline_remote_consistent_lsn = remote_index
.read()
.await
// here we either do not have this timeline in remote index
// or there were no checkpoints for it yet
.timeline_entry(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_timeline| remote_timeline.metadata.disk_consistent_lsn())
// no checkpoint was uploaded
.unwrap_or(Lsn(0));
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = u64::from(last_lsn);
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn());
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
let ts = SystemTime::now();
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
{
super::WAL_RECEIVER_ENTRIES.write().await.insert(
id,
WalReceiverEntry {
wal_producer_connstr: Some(wal_producer_connstr.to_owned()),
last_received_msg_lsn: Some(last_lsn),
last_received_msg_ts: Some(
ts.duration_since(SystemTime::UNIX_EPOCH)
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
),
},
);
}
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.
let zenith_status_update = ZenithFeedback {
current_timeline_size: timeline.get_current_logical_size() as u64,
ps_writelsn: write_lsn,
ps_flushlsn: flush_lsn,
ps_applylsn: apply_lsn,
ps_replytime: ts,
};
debug!("zenith_status_update {zenith_status_update:?}");
let mut data = BytesMut::new();
zenith_status_update.serialize(&mut data)?;
physical_stream
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
}
}
Ok(())
}
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
///
/// See the [postgres docs] for more details.
///
/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
#[derive(Debug)]
// As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
// unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
#[allow(dead_code)]
struct IdentifySystem {
systemid: u64,
timeline: u32,
xlogpos: PgLsn,
dbname: Option<String>,
}
/// There was a problem parsing the response to
/// a postgres IDENTIFY_SYSTEM command.
#[derive(Debug, thiserror::Error)]
#[error("IDENTIFY_SYSTEM parse error")]
struct IdentifyError;
/// Run the postgres `IDENTIFY_SYSTEM` command
async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem> {
let query_str = "IDENTIFY_SYSTEM";
let response = client.simple_query(query_str).await?;
// get(N) from row, then parse it as some destination type.
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
where
T: FromStr,
{
let val = row.get(idx).ok_or(IdentifyError)?;
val.parse::<T>().or(Err(IdentifyError))
}
// extract the row contents into an IdentifySystem struct.
// written as a closure so I can use ? for Option here.
if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
Ok(IdentifySystem {
systemid: get_parse(first_row, 0)?,
timeline: get_parse(first_row, 1)?,
xlogpos: get_parse(first_row, 2)?,
dbname: get_parse(first_row, 3).ok(),
})
} else {
Err(IdentifyError.into())
}
}

View File

@@ -16,7 +16,8 @@ use toml_edit::Document;
use tracing::*; use tracing::*;
use url::{ParseError, Url}; use url::{ParseError, Url};
use safekeeper::control_file::{self}; use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{ use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
}; };
@@ -26,7 +27,6 @@ use safekeeper::timeline::GlobalTimelines;
use safekeeper::wal_backup; use safekeeper::wal_backup;
use safekeeper::wal_service; use safekeeper::wal_service;
use safekeeper::SafeKeeperConf; use safekeeper::SafeKeeperConf;
use safekeeper::{broker, callmemaybe};
use utils::{ use utils::{
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener, http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
zid::NodeId, zid::NodeId,
@@ -272,9 +272,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let signals = signals::install_shutdown_handlers()?; let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![]; let mut threads = vec![];
let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel();
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
GlobalTimelines::init(callmemaybe_tx, wal_backup_launcher_tx); GlobalTimelines::init(wal_backup_launcher_tx);
let conf_ = conf.clone(); let conf_ = conf.clone();
threads.push( threads.push(
@@ -296,29 +295,14 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let safekeeper_thread = thread::Builder::new() let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into()) .name("Safekeeper thread".into())
.spawn(|| { .spawn(|| {
// thread code if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) {
let thread_result = wal_service::thread_main(conf_cloned, pg_listener); info!("safekeeper thread terminated: {e}");
if let Err(e) = thread_result {
info!("safekeeper thread terminated: {}", e);
} }
}) })
.unwrap(); .unwrap();
threads.push(safekeeper_thread); threads.push(safekeeper_thread);
let conf_cloned = conf.clone();
let callmemaybe_thread = thread::Builder::new()
.name("callmemaybe thread".into())
.spawn(|| {
// thread code
let thread_result = callmemaybe::thread_main(conf_cloned, callmemaybe_rx);
if let Err(e) = thread_result {
error!("callmemaybe thread terminated: {}", e);
}
})
.unwrap();
threads.push(callmemaybe_thread);
if !conf.broker_endpoints.is_empty() { if !conf.broker_endpoints.is_empty() {
let conf_ = conf.clone(); let conf_ = conf.clone();
threads.push( threads.push(

View File

@@ -8,7 +8,6 @@ use url::Url;
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId}; use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId};
pub mod broker; pub mod broker;
pub mod callmemaybe;
pub mod control_file; pub mod control_file;
pub mod control_file_upgrade; pub mod control_file_upgrade;
pub mod handler; pub mod handler;

View File

@@ -8,7 +8,6 @@ use anyhow::{bail, Context, Result};
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::min; use std::cmp::min;
@@ -17,7 +16,6 @@ use std::sync::Arc;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use std::{str, thread}; use std::{str, thread};
use tokio::sync::mpsc::UnboundedSender;
use tracing::*; use tracing::*;
use utils::{ use utils::{
bin_ser::BeSer, bin_ser::BeSer,
@@ -25,7 +23,6 @@ use utils::{
postgres_backend::PostgresBackend, postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback}, pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback},
sock_split::ReadStream, sock_split::ReadStream,
zid::{ZTenantId, ZTimelineId},
}; };
// See: https://www.postgresql.org/docs/13/protocol-replication.html // See: https://www.postgresql.org/docs/13/protocol-replication.html
@@ -83,40 +80,6 @@ impl Drop for ReplicationConnGuard {
} }
} }
// XXX: Naming is a bit messy here.
// This ReplicationStreamGuard lives as long as ReplicationConn
// and current ReplicationConnGuard is tied to the background thread
// that receives feedback.
struct ReplicationStreamGuard {
tx: UnboundedSender<CallmeEvent>,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
pageserver_connstr: String,
}
impl Drop for ReplicationStreamGuard {
fn drop(&mut self) {
// the connection with pageserver is lost,
// resume callback subscription
debug!(
"Connection to pageserver is gone. Resume callmemaybe subsciption if necessary. tenantid {} timelineid {}",
self.tenant_id, self.timeline_id,
);
let subscription_key = SubscriptionStateKey::new(
self.tenant_id,
self.timeline_id,
self.pageserver_connstr.to_owned(),
);
self.tx
.send(CallmeEvent::Resume(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Resume request to callmemaybe thread {}", e);
});
}
}
impl ReplicationConn { impl ReplicationConn {
/// Create a new `ReplicationConn` /// Create a new `ReplicationConn`
pub fn new(pgb: &mut PostgresBackend) -> Self { pub fn new(pgb: &mut PostgresBackend) -> Self {
@@ -256,36 +219,6 @@ impl ReplicationConn {
}; };
info!("Start replication from {:?} till {:?}", start_pos, stop_pos); info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
// Don't spam pageserver with callmemaybe queries
// when replication connection with pageserver is already established.
let _guard = {
if spg.appname == Some("wal_proposer_recovery".to_string()) {
None
} else {
let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let zttid = spg.timeline.get().zttid;
let tx_clone = spg.timeline.get().callmemaybe_tx.clone();
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.clone(),
);
tx_clone
.send(CallmeEvent::Pause(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
// create a guard to subscribe callback again, when this connection will exit
Some(ReplicationStreamGuard {
tx: tx_clone,
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
pageserver_connstr,
})
}
};
// switch to copy // switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?; pgb.write_message(&BeMessage::CopyBothResponse)?;

View File

@@ -16,7 +16,7 @@ use std::fs::{self};
use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio::sync::mpsc::Sender;
use tracing::*; use tracing::*;
use utils::{ use utils::{
@@ -25,7 +25,6 @@ use utils::{
zid::{NodeId, ZTenantId, ZTenantTimelineId}, zid::{NodeId, ZTenantId, ZTenantTimelineId},
}; };
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use crate::control_file; use crate::control_file;
use crate::safekeeper::{ use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
@@ -191,79 +190,33 @@ impl SharedState {
self.wal_backup_active self.wal_backup_active
} }
/// start/change walsender (via callmemaybe). /// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections.
fn callmemaybe_sub( fn activate_walsender(
&mut self, &mut self,
zttid: &ZTenantTimelineId, zttid: &ZTenantTimelineId,
pageserver_connstr: Option<&String>, new_pageserver_connstr: Option<String>,
callmemaybe_tx: &UnboundedSender<CallmeEvent>, ) {
) -> Result<()> { if self.pageserver_connstr != new_pageserver_connstr {
if let Some(ref pageserver_connstr) = self.pageserver_connstr { self.deactivate_walsender(zttid);
// unsub old sub. xxx: callmemaybe is going out
let old_subscription_key = SubscriptionStateKey::new( if new_pageserver_connstr.is_some() {
zttid.tenant_id, info!(
zttid.timeline_id, "timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}",
pageserver_connstr.to_owned(), zttid.timeline_id,
); );
callmemaybe_tx }
.send(CallmeEvent::Unsubscribe(old_subscription_key)) self.pageserver_connstr = new_pageserver_connstr;
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
} }
if let Some(pageserver_connstr) = pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
// xx: sending to channel under lock is not very cool, but
// shouldn't be a problem here. If it is, we can grab a counter
// here and later augment channel messages with it.
callmemaybe_tx
.send(CallmeEvent::Subscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Subscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is subscribed to callmemaybe to {}",
zttid.timeline_id, pageserver_connstr
);
}
self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned());
Ok(())
} }
/// Deactivate the timeline: stop callmemaybe. /// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming.
fn callmemaybe_unsub( fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) {
&mut self, if let Some(pageserver_connstr) = self.pageserver_connstr.take() {
zttid: &ZTenantTimelineId,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Unsubscribe request to callmemaybe thread {}",
e
);
});
info!( info!(
"timeline {} is unsubscribed from callmemaybe to {}", "timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}",
zttid.timeline_id, zttid.timeline_id,
self.pageserver_connstr.as_ref().unwrap() )
);
} }
Ok(())
} }
fn get_wal_seg_size(&self) -> usize { fn get_wal_seg_size(&self) -> usize {
@@ -332,7 +285,6 @@ impl SharedState {
/// Database instance (tenant) /// Database instance (tenant)
pub struct Timeline { pub struct Timeline {
pub zttid: ZTenantTimelineId, pub zttid: ZTenantTimelineId,
pub callmemaybe_tx: UnboundedSender<CallmeEvent>,
/// Sending here asks for wal backup launcher attention (start/stop /// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending zttid instead of concrete command allows to do /// offloading). Sending zttid instead of concrete command allows to do
/// sending without timeline lock. /// sending without timeline lock.
@@ -348,7 +300,6 @@ pub struct Timeline {
impl Timeline { impl Timeline {
fn new( fn new(
zttid: ZTenantTimelineId, zttid: ZTenantTimelineId,
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>, wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
shared_state: SharedState, shared_state: SharedState,
) -> Timeline { ) -> Timeline {
@@ -356,7 +307,6 @@ impl Timeline {
watch::channel(shared_state.sk.inmem.commit_lsn); watch::channel(shared_state.sk.inmem.commit_lsn);
Timeline { Timeline {
zttid, zttid,
callmemaybe_tx,
wal_backup_launcher_tx, wal_backup_launcher_tx,
commit_lsn_watch_tx, commit_lsn_watch_tx,
commit_lsn_watch_rx, commit_lsn_watch_rx,
@@ -378,7 +328,7 @@ impl Timeline {
// should have kind of generations assigned by compute to distinguish // should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver // the latest one or even pass it through consensus to reliably deliver
// to all safekeepers. // to all safekeepers.
shared_state.callmemaybe_sub(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?; shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned());
} }
// Wake up wal backup launcher, if offloading not started yet. // Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending { if is_wal_backup_action_pending {
@@ -414,7 +364,7 @@ impl Timeline {
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop { if stop {
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?; shared_state.deactivate_walsender(&self.zttid);
return Ok(true); return Ok(true);
} }
} }
@@ -431,16 +381,14 @@ impl Timeline {
/// Deactivates the timeline, assuming it is being deleted. /// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active. /// Returns whether the timeline was already active.
/// ///
/// The callmemaybe thread is stopped by the deactivation message. We assume all other threads /// We assume all threads will stop by themselves eventually (possibly with errors, but no panics).
/// will stop by themselves eventually (possibly with errors, but no panics). There should be no /// There should be no compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// we're deleting the timeline anyway. /// we're deleting the timeline anyway.
pub async fn deactivate_for_delete(&self) -> Result<bool> { pub async fn deactivate_for_delete(&self) -> Result<bool> {
let was_active: bool; let was_active: bool;
{ {
let mut shared_state = self.mutex.lock().unwrap(); let shared_state = self.mutex.lock().unwrap();
was_active = shared_state.active; was_active = shared_state.active;
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
} }
self.wal_backup_launcher_tx.send(self.zttid).await?; self.wal_backup_launcher_tx.send(self.zttid).await?;
Ok(was_active) Ok(was_active)
@@ -576,7 +524,8 @@ impl Timeline {
shared_state.sk.inmem.remote_consistent_lsn, shared_state.sk.inmem.remote_consistent_lsn,
)), )),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connection_string: Some(conf.listen_pg_addr.clone()), safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
pageserver_connstr: shared_state.pageserver_connstr.clone(),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn), backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
}) })
} }
@@ -675,14 +624,12 @@ impl TimelineTools for Option<Arc<Timeline>> {
struct GlobalTimelinesState { struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>, timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
callmemaybe_tx: Option<UnboundedSender<CallmeEvent>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>, wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
} }
lazy_static! { lazy_static! {
static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState { static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(), timelines: HashMap::new(),
callmemaybe_tx: None,
wal_backup_launcher_tx: None, wal_backup_launcher_tx: None,
}); });
} }
@@ -697,13 +644,8 @@ pub struct TimelineDeleteForceResult {
pub struct GlobalTimelines; pub struct GlobalTimelines;
impl GlobalTimelines { impl GlobalTimelines {
pub fn init( pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
) {
let mut state = TIMELINES_STATE.lock().unwrap(); let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.callmemaybe_tx.is_none());
state.callmemaybe_tx = Some(callmemaybe_tx);
assert!(state.wal_backup_launcher_tx.is_none()); assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
} }
@@ -726,7 +668,6 @@ impl GlobalTimelines {
let new_tli = Arc::new(Timeline::new( let new_tli = Arc::new(Timeline::new(
zttid, zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(), state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state, shared_state,
)); ));
@@ -778,7 +719,6 @@ impl GlobalTimelines {
let new_tli = Arc::new(Timeline::new( let new_tli = Arc::new(Timeline::new(
zttid, zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(), state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state, shared_state,
)); ));

View File

@@ -63,10 +63,11 @@ def test_pageserver_http_get_wal_receiver_not_found(zenith_simple_env: ZenithEnv
tenant_id, timeline_id = env.zenith_cli.create_tenant() tenant_id, timeline_id = env.zenith_cli.create_tenant()
# no PG compute node is running, so no WAL receiver is running empty_response = client.wal_receiver_get(tenant_id, timeline_id)
with pytest.raises(ZenithPageserverApiException) as e:
_ = client.wal_receiver_get(tenant_id, timeline_id) assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert "Not Found" in str(e.value) assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv): def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
@@ -81,7 +82,6 @@ def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
# a successful `wal_receiver_get` response must contain the below fields # a successful `wal_receiver_get` response must contain the below fields
assert list(res.keys()) == [ assert list(res.keys()) == [
"thread_id",
"wal_producer_connstr", "wal_producer_connstr",
"last_received_msg_lsn", "last_received_msg_lsn",
"last_received_msg_ts", "last_received_msg_ts",

View File

@@ -1600,9 +1600,7 @@ class Postgres(PgProtocol):
for cfg_line in cfg_lines: for cfg_line in cfg_lines:
# walproposer uses different application_name # walproposer uses different application_name
if ("synchronous_standby_names" in cfg_line or if ("synchronous_standby_names" in cfg_line or
# don't ask pageserver to fetch WAL from compute # don't repeat safekeepers/wal_acceptors multiple times
"callmemaybe_connstring" in cfg_line or
# don't repeat safekeepers multiple times
"safekeepers" in cfg_line): "safekeepers" in cfg_line):
continue continue
f.write(cfg_line) f.write(cfg_line)

View File

@@ -13,16 +13,12 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder
@pytest.mark.parametrize('tenants_count', [1, 5, 10]) @pytest.mark.parametrize('tenants_count', [1, 5, 10])
@pytest.mark.parametrize('use_safekeepers', ['with_wa', 'without_wa'])
def test_bulk_tenant_create( def test_bulk_tenant_create(
zenith_env_builder: ZenithEnvBuilder, zenith_env_builder: ZenithEnvBuilder,
use_safekeepers: str,
tenants_count: int, tenants_count: int,
zenbenchmark, zenbenchmark,
): ):
"""Measure tenant creation time (with and without wal acceptors)""" zenith_env_builder.num_safekeepers = 3
if use_safekeepers == 'with_wa':
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start() env = zenith_env_builder.init_start()
time_slices = [] time_slices = []
@@ -31,15 +27,15 @@ def test_bulk_tenant_create(
start = timeit.default_timer() start = timeit.default_timer()
tenant, _ = env.zenith_cli.create_tenant() tenant, _ = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline( env.zenith_cli.create_timeline(f'test_bulk_tenant_create_{tenants_count}_{i}',
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant) tenant_id=tenant)
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now? # FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
#if use_safekeepers == 'with_sa': #if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3) # wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start( pg_tenant = env.postgres.create_start(f'test_bulk_tenant_create_{tenants_count}_{i}',
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant) tenant_id=tenant)
end = timeit.default_timer() end = timeit.default_timer()
time_slices.append(end - start) time_slices.append(end - start)