Replace callmemaybe with etcd subscriptions on safekeeper timeline info

This commit is contained in:
Kirill Bulatov
2022-04-22 13:56:48 +03:00
committed by Kirill Bulatov
parent 6623c5b9d5
commit e5cb727572
25 changed files with 1968 additions and 693 deletions

View File

@@ -352,7 +352,6 @@ impl PostgresNode {
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
conf.append("neon.callmemaybe_connstring", &self.connstr());
}
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::io::Write;
use std::net::TcpStream;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;
@@ -11,6 +12,7 @@ use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
use pageserver::tenant_mgr::TenantInfo;
use pageserver::timelines::TimelineInfo;
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -26,7 +28,6 @@ use utils::{
use crate::local_env::LocalEnv;
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
use pageserver::tenant_mgr::TenantInfo;
#[derive(Error, Debug)]
pub enum PageserverHttpError {
@@ -37,6 +38,12 @@ pub enum PageserverHttpError {
Response(String),
}
impl From<anyhow::Error> for PageserverHttpError {
fn from(e: anyhow::Error) -> Self {
Self::Response(e.to_string())
}
}
type Result<T> = result::Result<T, PageserverHttpError>;
pub trait ResponseErrorMessageExt: Sized {
@@ -410,6 +417,15 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()?,
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.get("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
max_lsn_wal_lag: settings
.get("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
})
.send()?
.error_from_body()?
@@ -433,22 +449,41 @@ impl PageServerNode {
tenant_id,
checkpoint_distance: settings
.get("checkpoint_distance")
.map(|x| x.parse::<u64>().unwrap()),
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
compaction_target_size: settings
.get("compaction_target_size")
.map(|x| x.parse::<u64>().unwrap()),
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.get("compaction_threshold")
.map(|x| x.parse::<usize>().unwrap()),
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_threshold' as an integer")?,
gc_horizon: settings
.get("gc_horizon")
.map(|x| x.parse::<u64>().unwrap()),
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.get("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings
.get("image_creation_threshold")
.map(|x| x.parse::<usize>().unwrap()),
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.get("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
max_lsn_wal_lag: settings
.get("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
})
.send()?
.error_from_body()?;

View File

@@ -31,7 +31,7 @@ broker_endpoints = ['some://etcd']
# [remote_storage]
```
The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user,
The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user,
see the corresponding section below.
Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank.
Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start.
@@ -54,7 +54,7 @@ Note that TOML distinguishes between strings and integers, the former require si
A list of endpoints (etcd currently) to connect and pull the information from.
Mandatory, does not have a default, since requires etcd to be started as a separate process,
and its connection url should be specified separately.
and its connection url should be specified separately.
#### broker_etcd_prefix
@@ -111,6 +111,20 @@ L0 delta layer threshold for L1 image layer creation. Default is 3.
WAL retention duration for PITR branching. Default is 30 days.
#### walreceiver_connect_timeout
Time to wait to establish the wal receiver connection before failing
#### lagging_wal_timeout
Time the pageserver did not get any WAL updates from safekeeper (if any).
Avoids lagging pageserver preemptively by forcing to switch it from stalled connections.
#### max_lsn_wal_lag
Difference between Lsn values of the latest available WAL on safekeepers: if currently connected safekeeper starts to lag too long and too much,
it gets swapped to the different one.
#### initial_superuser_name
Name of the initial superuser role, passed to initdb when a new tenant

View File

@@ -31,7 +31,7 @@ struct SafekeeperTimeline {
/// Published data about safekeeper's timeline. Fields made optional for easy migrations.
#[serde_as]
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
@@ -55,7 +55,9 @@ pub struct SkTimelineInfo {
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde(default)]
pub safekeeper_connection_string: Option<String>,
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub pageserver_connstr: Option<String>,
}
#[derive(Debug, thiserror::Error)]

View File

@@ -336,11 +336,11 @@ impl PostgresBackend {
let have_tls = self.tls_config.is_some();
match msg {
FeMessage::StartupPacket(m) => {
trace!("got startup message {:?}", m);
trace!("got startup message {m:?}");
match m {
FeStartupPacket::SslRequest => {
info!("SSL requested");
debug!("SSL requested");
self.write_message(&BeMessage::EncryptionResponse(have_tls))?;
if have_tls {
@@ -349,7 +349,7 @@ impl PostgresBackend {
}
}
FeStartupPacket::GssEncRequest => {
info!("GSS requested");
debug!("GSS requested");
self.write_message(&BeMessage::EncryptionResponse(false))?;
}
FeStartupPacket::StartupMessage { .. } => {
@@ -433,12 +433,7 @@ impl PostgresBackend {
// full cause of the error, not just the top-level context + its trace.
// We don't want to send that in the ErrorResponse though,
// because it's not relevant to the compute node logs.
if query_string.starts_with("callmemaybe") {
// FIXME avoid printing a backtrace for tenant x not found errors until this is properly fixed
error!("query handler for '{}' failed: {}", query_string, e);
} else {
error!("query handler for '{}' failed: {:?}", query_string, e);
}
error!("query handler for '{}' failed: {:?}", query_string, e);
self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?;
// TODO: untangle convoluted control flow
if e.to_string().contains("failed to run") {

View File

@@ -193,7 +193,7 @@ pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId);
// A pair uniquely identifying Zenith instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ZTenantTimelineId {
pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId,

View File

@@ -5,7 +5,7 @@ edition = "2021"
[features]
# It is simpler infra-wise to have failpoints enabled by default
# It shouldn't affect perf in any way because failpoints
# It shouldn't affect performance in any way because failpoints
# are not placed in hot code paths
default = ["failpoints"]
profiling = ["pprof"]

View File

@@ -480,6 +480,21 @@ impl PageServerConf {
if let Some(pitr_interval) = item.get("pitr_interval") {
t_conf.pitr_interval = Some(parse_toml_duration("pitr_interval", pitr_interval)?);
}
if let Some(walreceiver_connect_timeout) = item.get("walreceiver_connect_timeout") {
t_conf.walreceiver_connect_timeout = Some(parse_toml_duration(
"walreceiver_connect_timeout",
walreceiver_connect_timeout,
)?);
}
if let Some(lagging_wal_timeout) = item.get("lagging_wal_timeout") {
t_conf.lagging_wal_timeout = Some(parse_toml_duration(
"lagging_wal_timeout",
lagging_wal_timeout,
)?);
}
if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") {
t_conf.max_lsn_wal_lag = Some(parse_toml_from_str("max_lsn_wal_lag", max_lsn_wal_lag)?);
}
Ok(t_conf)
}

View File

@@ -1,3 +1,5 @@
use std::num::NonZeroU64;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
@@ -33,6 +35,9 @@ pub struct TenantCreateRequest {
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
}
#[serde_as]
@@ -68,6 +73,9 @@ pub struct TenantConfigRequest {
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
}
impl TenantConfigRequest {
@@ -82,6 +90,21 @@ impl TenantConfigRequest {
gc_period: None,
image_creation_threshold: None,
pitr_interval: None,
walreceiver_connect_timeout: None,
lagging_wal_timeout: None,
max_lsn_wal_lag: None,
}
}
}
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
/// We keep one WAL receiver active per timeline.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WalReceiverEntry {
pub wal_producer_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
}

View File

@@ -229,23 +229,16 @@ async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Bod
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let wal_receiver_entry = crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
.instrument(info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id))
.await
.ok_or_else(|| {
ApiError::NotFound(format!(
"WAL receiver data not found for tenant {tenant_id} and timeline {timeline_id}"
))
})?;
let wal_receiver = tokio::task::spawn_blocking(move || {
let _enter =
info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id).entered();
crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
})
.await
.map_err(ApiError::from_err)?
.ok_or_else(|| {
ApiError::NotFound(format!(
"WAL receiver not found for tenant {} and timeline {}",
tenant_id, timeline_id
))
})?;
json_response(StatusCode::OK, wal_receiver)
json_response(StatusCode::OK, &wal_receiver_entry)
}
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -402,6 +395,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
}
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout).map_err(ApiError::from_err)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout =
Some(humantime::parse_duration(&lagging_wal_timeout).map_err(ApiError::from_err)?);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
@@ -449,6 +455,18 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
tenant_conf.pitr_interval =
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
}
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(&walreceiver_connect_timeout).map_err(ApiError::from_err)?,
);
}
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout =
Some(humantime::parse_duration(&lagging_wal_timeout).map_err(ApiError::from_err)?);
}
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
tenant_conf.compaction_target_size = request_data.compaction_target_size;

View File

@@ -25,6 +25,7 @@ use std::collections::{BTreeSet, HashSet};
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::num::NonZeroU64;
use std::ops::{Bound::Included, Deref, Range};
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool};
@@ -557,6 +558,27 @@ impl LayeredRepository {
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
}
pub fn get_lagging_wal_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
}
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> {
let mut tenant_conf = self.tenant_conf.write().unwrap();

View File

@@ -7,7 +7,6 @@
// *status* -- show actual info about this pageserver,
// *pagestream* -- enter mode where smgr and pageserver talk with their
// custom protocol.
// *callmemaybe <zenith timelineid> $url* -- ask pageserver to start walreceiver on $url
//
use anyhow::{bail, ensure, Context, Result};
@@ -38,7 +37,6 @@ use crate::repository::Timeline;
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::walreceiver;
use crate::CheckpointConfig;
use metrics::{register_histogram_vec, HistogramVec};
use postgres_ffi::xlog_utils::to_pg_timestamp;
@@ -716,30 +714,6 @@ impl postgres_backend::Handler for PageServerHandler {
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("callmemaybe ") {
// callmemaybe <zenith tenantid as hex string> <zenith timelineid as hex string> <connstr>
// TODO lazy static
let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid callmemaybe: '{}'", query_string))?;
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let connstr = caps.get(3).unwrap().as_str().to_owned();
self.check_permission(Some(tenantid))?;
let _enter =
info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered();
// Check that the timeline exists
tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"

View File

@@ -469,6 +469,9 @@ pub mod repo_harness {
gc_period: Some(tenant_conf.gc_period),
image_creation_threshold: Some(tenant_conf.image_creation_threshold),
pitr_interval: Some(tenant_conf.pitr_interval),
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
}
}
}

View File

@@ -10,6 +10,7 @@
//!
use crate::config::PageServerConf;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::time::Duration;
use utils::zid::ZTenantId;
@@ -34,6 +35,9 @@ pub mod defaults {
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1_000_000;
}
/// Per-tenant configuration options
@@ -68,6 +72,17 @@ pub struct TenantConf {
// Page versions older than this are garbage collected away.
#[serde(with = "humantime_serde")]
pub pitr_interval: Duration,
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Duration,
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
/// A stalled safekeeper will be changed to a newer one when it appears.
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Duration,
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -85,6 +100,11 @@ pub struct TenantConfOpt {
pub image_creation_threshold: Option<usize>,
#[serde(with = "humantime_serde")]
pub pitr_interval: Option<Duration>,
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Option<Duration>,
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
}
impl TenantConfOpt {
@@ -108,6 +128,13 @@ impl TenantConfOpt {
.image_creation_threshold
.unwrap_or(global_conf.image_creation_threshold),
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
walreceiver_connect_timeout: self
.walreceiver_connect_timeout
.unwrap_or(global_conf.walreceiver_connect_timeout),
lagging_wal_timeout: self
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
}
}
@@ -136,6 +163,15 @@ impl TenantConfOpt {
if let Some(pitr_interval) = other.pitr_interval {
self.pitr_interval = Some(pitr_interval);
}
if let Some(walreceiver_connect_timeout) = other.walreceiver_connect_timeout {
self.walreceiver_connect_timeout = Some(walreceiver_connect_timeout);
}
if let Some(lagging_wal_timeout) = other.lagging_wal_timeout {
self.lagging_wal_timeout = Some(lagging_wal_timeout);
}
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
}
}
@@ -155,6 +191,14 @@ impl TenantConf {
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
walreceiver_connect_timeout: humantime::parse_duration(
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
)
.expect("cannot parse default walreceiver connect timeout"),
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
}
}
@@ -175,6 +219,16 @@ impl TenantConf {
gc_period: Duration::from_secs(10),
image_creation_threshold: defaults::DEFAULT_IMAGE_CREATION_THRESHOLD,
pitr_interval: Duration::from_secs(60 * 60),
walreceiver_connect_timeout: humantime::parse_duration(
defaults::DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
)
.unwrap(),
lagging_wal_timeout: humantime::parse_duration(
defaults::DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT,
)
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
}
}
}

View File

@@ -8,11 +8,10 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::storage_sync::index::RemoteIndex;
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{DatadirTimelineImpl, RepositoryImpl};
use anyhow::{bail, Context};
use serde::{Deserialize, Serialize};
@@ -21,23 +20,30 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::*;
use utils::lsn::Lsn;
use utils::zid::{ZTenantId, ZTimelineId};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
mod tenants_state {
use anyhow::ensure;
use std::{
collections::HashMap,
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
};
use tokio::sync::mpsc;
use tracing::{debug, error};
use utils::zid::ZTenantId;
use crate::tenant_mgr::Tenant;
use crate::tenant_mgr::{LocalTimelineUpdate, Tenant};
lazy_static::lazy_static! {
static ref TENANTS: RwLock<HashMap<ZTenantId, Tenant>> = RwLock::new(HashMap::new());
/// Sends updates to the local timelines (creation and deletion) to the WAL receiver,
/// so that it can enable/disable corresponding processes.
static ref TIMELINE_UPDATE_SENDER: RwLock<Option<mpsc::UnboundedSender<LocalTimelineUpdate>>> = RwLock::new(None);
}
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Tenant>> {
@@ -51,6 +57,39 @@ mod tenants_state {
.write()
.expect("Failed to write() tenants lock, it got poisoned")
}
pub(super) fn set_timeline_update_sender(
timeline_updates_sender: mpsc::UnboundedSender<LocalTimelineUpdate>,
) -> anyhow::Result<()> {
let mut sender_guard = TIMELINE_UPDATE_SENDER
.write()
.expect("Failed to write() timeline_update_sender lock, it got poisoned");
ensure!(sender_guard.is_none(), "Timeline update sender already set");
*sender_guard = Some(timeline_updates_sender);
Ok(())
}
pub(super) fn try_send_timeline_update(update: LocalTimelineUpdate) {
match TIMELINE_UPDATE_SENDER
.read()
.expect("Failed to read() timeline_update_sender lock, it got poisoned")
.as_ref()
{
Some(sender) => {
if let Err(e) = sender.send(update) {
error!("Failed to send timeline update: {}", e);
}
}
None => debug!("Timeline update sender is not enabled, cannot send update {update:?}"),
}
}
pub(super) fn stop_timeline_update_sender() {
TIMELINE_UPDATE_SENDER
.write()
.expect("Failed to write() timeline_update_sender lock, it got poisoned")
.take();
}
}
struct Tenant {
@@ -87,10 +126,10 @@ pub enum TenantState {
impl fmt::Display for TenantState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TenantState::Active => f.write_str("Active"),
TenantState::Idle => f.write_str("Idle"),
TenantState::Stopping => f.write_str("Stopping"),
TenantState::Broken => f.write_str("Broken"),
Self::Active => f.write_str("Active"),
Self::Idle => f.write_str("Idle"),
Self::Stopping => f.write_str("Stopping"),
Self::Broken => f.write_str("Broken"),
}
}
}
@@ -99,6 +138,11 @@ impl fmt::Display for TenantState {
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the repository once download is completed.
pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIndex> {
let (timeline_updates_sender, timeline_updates_receiver) =
mpsc::unbounded_channel::<LocalTimelineUpdate>();
tenants_state::set_timeline_update_sender(timeline_updates_sender)?;
walreceiver::init_wal_receiver_main_thread(conf, timeline_updates_receiver)?;
let SyncStartupData {
remote_index,
local_timeline_init_statuses,
@@ -113,16 +157,27 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
// loading a tenant is serious, but it's better to complete the startup and
// serve other tenants, than fail completely.
error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
let mut m = tenants_state::write_tenants();
if let Some(tenant) = m.get_mut(&tenant_id) {
tenant.state = TenantState::Broken;
}
set_tenant_state(tenant_id, TenantState::Broken)?;
}
}
Ok(remote_index)
}
pub enum LocalTimelineUpdate {
Detach(ZTenantTimelineId),
Attach(ZTenantTimelineId, Arc<DatadirTimelineImpl>),
}
impl std::fmt::Debug for LocalTimelineUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Detach(ttid) => f.debug_tuple("Remove").field(ttid).finish(),
Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(),
}
}
}
/// Updates tenants' repositories, changing their timelines state in memory.
pub fn apply_timeline_sync_status_updates(
conf: &'static PageServerConf,
@@ -160,6 +215,7 @@ pub fn apply_timeline_sync_status_updates(
/// Shut down all tenants. This runs as part of pageserver shutdown.
///
pub fn shutdown_all_tenants() {
tenants_state::stop_timeline_update_sender();
let mut m = tenants_state::write_tenants();
let mut tenantids = Vec::new();
for (tenantid, tenant) in m.iter_mut() {
@@ -173,7 +229,7 @@ pub fn shutdown_all_tenants() {
}
drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
@@ -247,32 +303,49 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
Some(tenants_state::read_tenants().get(&tenantid)?.state)
}
///
/// Change the state of a tenant to Active and launch its compactor and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow::Result<()> {
let mut m = tenants_state::write_tenants();
let tenant = m
.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?;
let old_state = tenant.state;
tenant.state = new_state;
drop(m);
info!("activating tenant {tenant_id}");
match tenant.state {
// If the tenant is already active, nothing to do.
TenantState::Active => {}
// If it's Idle, launch the compactor and GC threads
TenantState::Idle => {
thread_mgr::spawn(
match (old_state, new_state) {
(TenantState::Broken, TenantState::Broken)
| (TenantState::Active, TenantState::Active)
| (TenantState::Idle, TenantState::Idle)
| (TenantState::Stopping, TenantState::Stopping) => {
debug!("tenant {tenant_id} already in state {new_state}");
}
(TenantState::Broken, ignored) => {
debug!("Ignoring {ignored} since tenant {tenant_id} is in broken state");
}
(_, TenantState::Broken) => {
debug!("Setting tenant {tenant_id} status to broken");
}
(TenantState::Stopping, ignored) => {
debug!("Ignoring {ignored} since tenant {tenant_id} is in stopping state");
}
(TenantState::Idle, TenantState::Active) => {
info!("activating tenant {tenant_id}");
let compactor_spawn_result = thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
);
if compactor_spawn_result.is_err() {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
}
compactor_spawn_result?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
@@ -286,21 +359,31 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
tenant.state = TenantState::Active;
}
TenantState::Stopping => {
// don't re-activate it if it's being stopped
(TenantState::Idle, TenantState::Stopping) => {
info!("stopping idle tenant {tenant_id}");
}
TenantState::Broken => {
// cannot activate
(TenantState::Active, TenantState::Stopping | TenantState::Idle) => {
info!("stopping tenant {tenant_id} threads due to new state {new_state}");
thread_mgr::shutdown_threads(
Some(ThreadKind::WalReceiverManager),
Some(tenant_id),
None,
);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
}
}
Ok(())
}
@@ -325,15 +408,15 @@ pub fn get_local_timeline_with_load(
.with_context(|| format!("Tenant {tenant_id} not found"))?;
if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) {
return Ok(Arc::clone(page_tline));
Ok(Arc::clone(page_tline))
} else {
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
tenant
.local_timelines
.insert(timeline_id, Arc::clone(&page_tline));
Ok(page_tline)
}
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
tenant
.local_timelines
.insert(timeline_id, Arc::clone(&page_tline));
Ok(page_tline)
}
pub fn detach_timeline(
@@ -351,6 +434,9 @@ pub fn detach_timeline(
.detach_timeline(timeline_id)
.context("Failed to detach inmem tenant timeline")?;
tenant.local_timelines.remove(&timeline_id);
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach(
ZTenantTimelineId::new(tenant_id, timeline_id),
));
}
None => bail!("Tenant {tenant_id} not found in local tenant state"),
}
@@ -379,6 +465,12 @@ fn load_local_timeline(
repartition_distance,
));
page_tline.init_logical_size()?;
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach(
ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
Arc::clone(&page_tline),
));
Ok(page_tline)
}

View File

@@ -91,8 +91,8 @@ pub enum ThreadKind {
// associated with one later, after receiving a command from the client.
PageRequestHandler,
// Thread that connects to a safekeeper to fetch WAL for one timeline.
WalReceiver,
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
WalReceiverManager,
// Thread that handles compaction of all timelines for a tenant.
Compactor,

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,405 @@
//! Actual Postgres connection handler to stream WAL to the server.
//! Runs as a separate, cancellable Tokio task.
use std::{
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use fail::fail_point;
use postgres::{SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
zid::{NodeId, ZTenantTimelineId},
};
use crate::{
http::models::WalReceiverEntry,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
};
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ZenithFeedback),
End(Result<(), String>),
}
/// A wrapper around standalone Tokio task, to poll its updates or cancel the task.
#[derive(Debug)]
pub struct WalReceiverConnection {
handle: tokio::task::JoinHandle<()>,
cancellation: watch::Sender<()>,
events_receiver: watch::Receiver<WalConnectionEvent>,
}
impl WalReceiverConnection {
/// Initializes the connection task, returning a set of handles on top of it.
/// The task is started immediately after the creation, fails if no connection is established during the timeout given.
pub fn open(
id: ZTenantTimelineId,
safekeeper_id: NodeId,
wal_producer_connstr: String,
connect_timeout: Duration,
) -> Self {
let (cancellation, mut cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started);
let handle = tokio::spawn(
async move {
let connection_result = handle_walreceiver_connection(
id,
&wal_producer_connstr,
&events_sender,
&mut cancellation_receiver,
connect_timeout,
)
.await
.map_err(|e| {
format!("Walreceiver connection for id {id} failed with error: {e:#}")
});
match &connection_result {
Ok(()) => {
debug!("Walreceiver connection for id {id} ended successfully")
}
Err(e) => warn!("{e}"),
}
events_sender
.send(WalConnectionEvent::End(connection_result))
.ok();
}
.instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)),
);
Self {
handle,
cancellation,
events_receiver,
}
}
/// Polls for the next WAL receiver event, if there's any available since the last check.
/// Blocks if there's no new event available, returns `None` if no new events will ever occur.
/// Only the last event is returned, all events received between observatins are lost.
pub async fn next_event(&mut self) -> Option<WalConnectionEvent> {
match self.events_receiver.changed().await {
Ok(()) => Some(self.events_receiver.borrow().clone()),
Err(_cancellation_error) => None,
}
}
/// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed.
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
self.cancellation.send(()).ok();
let handle = &mut self.handle;
handle
.await
.context("Failed to join on a walreceiver connection task")?;
Ok(())
}
}
async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_producer_connstr: &str,
events_sender: &watch::Sender<WalConnectionEvent>,
cancellation: &mut watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
// Connect to the database in replication mode.
info!("connecting to {wal_producer_connstr}");
let connect_cfg =
format!("{wal_producer_connstr} application_name=pageserver replication=true");
let (mut replication_client, connection) = time::timeout(
connect_timeout,
tokio_postgres::connect(&connect_cfg, postgres::NoTls),
)
.await
.context("Timed out while waiting for walreceiver connection to open")?
.context("Failed to open walreceiver conection")?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
let mut connection_cancellation = cancellation.clone();
tokio::spawn(
async move {
info!("connected!");
select! {
connection_result = connection => match connection_result{
Ok(()) => info!("Walreceiver db connection closed"),
Err(connection_error) => {
if connection_error.is_closed() {
info!("Connection closed regularly: {connection_error}")
} else {
warn!("Connection aborted: {connection_error}")
}
}
},
_ = connection_cancellation.changed() => info!("Connection cancelled"),
}
}
.instrument(info_span!("safekeeper_handle_db")),
);
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}");
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = id;
let (repo, timeline) = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("no repository found for tenant {tenant_id}"))?;
let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)
.with_context(|| {
format!("local timeline {timeline_id} not found for tenant {tenant_id}")
})?;
Ok::<_, anyhow::Error>((repo, timeline))
})
.await
.with_context(|| format!("Failed to spawn blocking task to get repository and timeline for tenant {tenant_id} timeline {timeline_id}"))??;
//
// Start streaming the WAL, from where we left off previously.
//
// If we had previously received WAL up to some point in the middle of a WAL record, we
// better start from the end of last full WAL record, not in the middle of one.
let mut last_rec_lsn = timeline.get_last_record_lsn();
let mut startpoint = last_rec_lsn;
if startpoint == Lsn(0) {
bail!("No previous WAL position");
}
// There might be some padding after the last full record, skip it.
startpoint += startpoint.calc_padding(8u32);
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, server is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let copy_stream = replication_client.copy_both_simple(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;
while let Some(replication_message) = {
select! {
// check for shutdown first
biased;
_ = cancellation.changed() => {
info!("walreceiver interrupted");
None
}
replication_message = physical_stream.next() => replication_message,
}
} {
let replication_message = replication_message?;
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
trace!("received XLogData between {startlsn} and {endlsn}");
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest.ingest_record(&timeline, recdata, lsn)?;
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
}
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {endlsn}");
caught_up = true;
}
let timeline_to_check = Arc::clone(&timeline.tline);
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
.await
.with_context(|| {
format!("Spawned checkpoint check task panicked for timeline {id}")
})?
.with_context(|| {
format!("Failed to check checkpoint distance for timeline {id}")
})?;
Some(endlsn)
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
let wal_end = keepalive.wal_end();
let timestamp = keepalive.timestamp();
let reply_requested = keepalive.reply() != 0;
trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
if reply_requested {
Some(last_rec_lsn)
} else {
None
}
}
_ => None,
};
if let Some(last_lsn) = status_update {
let remote_index = repo.get_remote_index();
let timeline_remote_consistent_lsn = remote_index
.read()
.await
// here we either do not have this timeline in remote index
// or there were no checkpoints for it yet
.timeline_entry(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_timeline| remote_timeline.metadata.disk_consistent_lsn())
// no checkpoint was uploaded
.unwrap_or(Lsn(0));
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = u64::from(last_lsn);
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn());
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
let ts = SystemTime::now();
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
{
super::WAL_RECEIVER_ENTRIES.write().await.insert(
id,
WalReceiverEntry {
wal_producer_connstr: Some(wal_producer_connstr.to_owned()),
last_received_msg_lsn: Some(last_lsn),
last_received_msg_ts: Some(
ts.duration_since(SystemTime::UNIX_EPOCH)
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
),
},
);
}
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.
let zenith_status_update = ZenithFeedback {
current_timeline_size: timeline.get_current_logical_size() as u64,
ps_writelsn: write_lsn,
ps_flushlsn: flush_lsn,
ps_applylsn: apply_lsn,
ps_replytime: ts,
};
debug!("zenith_status_update {zenith_status_update:?}");
let mut data = BytesMut::new();
zenith_status_update.serialize(&mut data)?;
physical_stream
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
}
}
Ok(())
}
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
///
/// See the [postgres docs] for more details.
///
/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
#[derive(Debug)]
// As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
// unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
#[allow(dead_code)]
struct IdentifySystem {
systemid: u64,
timeline: u32,
xlogpos: PgLsn,
dbname: Option<String>,
}
/// There was a problem parsing the response to
/// a postgres IDENTIFY_SYSTEM command.
#[derive(Debug, thiserror::Error)]
#[error("IDENTIFY_SYSTEM parse error")]
struct IdentifyError;
/// Run the postgres `IDENTIFY_SYSTEM` command
async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem> {
let query_str = "IDENTIFY_SYSTEM";
let response = client.simple_query(query_str).await?;
// get(N) from row, then parse it as some destination type.
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
where
T: FromStr,
{
let val = row.get(idx).ok_or(IdentifyError)?;
val.parse::<T>().or(Err(IdentifyError))
}
// extract the row contents into an IdentifySystem struct.
// written as a closure so I can use ? for Option here.
if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
Ok(IdentifySystem {
systemid: get_parse(first_row, 0)?,
timeline: get_parse(first_row, 1)?,
xlogpos: get_parse(first_row, 2)?,
dbname: get_parse(first_row, 3).ok(),
})
} else {
Err(IdentifyError.into())
}
}

View File

@@ -16,7 +16,8 @@ use toml_edit::Document;
use tracing::*;
use url::{ParseError, Url};
use safekeeper::control_file::{self};
use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
@@ -26,7 +27,6 @@ use safekeeper::timeline::GlobalTimelines;
use safekeeper::wal_backup;
use safekeeper::wal_service;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, callmemaybe};
use utils::{
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
zid::NodeId,
@@ -272,9 +272,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel();
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
GlobalTimelines::init(callmemaybe_tx, wal_backup_launcher_tx);
GlobalTimelines::init(wal_backup_launcher_tx);
let conf_ = conf.clone();
threads.push(
@@ -296,29 +295,14 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into())
.spawn(|| {
// thread code
let thread_result = wal_service::thread_main(conf_cloned, pg_listener);
if let Err(e) = thread_result {
info!("safekeeper thread terminated: {}", e);
if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) {
info!("safekeeper thread terminated: {e}");
}
})
.unwrap();
threads.push(safekeeper_thread);
let conf_cloned = conf.clone();
let callmemaybe_thread = thread::Builder::new()
.name("callmemaybe thread".into())
.spawn(|| {
// thread code
let thread_result = callmemaybe::thread_main(conf_cloned, callmemaybe_rx);
if let Err(e) = thread_result {
error!("callmemaybe thread terminated: {}", e);
}
})
.unwrap();
threads.push(callmemaybe_thread);
if !conf.broker_endpoints.is_empty() {
let conf_ = conf.clone();
threads.push(

View File

@@ -8,7 +8,6 @@ use url::Url;
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId};
pub mod broker;
pub mod callmemaybe;
pub mod control_file;
pub mod control_file_upgrade;
pub mod handler;

View File

@@ -8,7 +8,6 @@ use anyhow::{bail, Context, Result};
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::cmp::min;
@@ -17,7 +16,6 @@ use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use std::{str, thread};
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;
use utils::{
bin_ser::BeSer,
@@ -25,7 +23,6 @@ use utils::{
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback},
sock_split::ReadStream,
zid::{ZTenantId, ZTimelineId},
};
// See: https://www.postgresql.org/docs/13/protocol-replication.html
@@ -83,40 +80,6 @@ impl Drop for ReplicationConnGuard {
}
}
// XXX: Naming is a bit messy here.
// This ReplicationStreamGuard lives as long as ReplicationConn
// and current ReplicationConnGuard is tied to the background thread
// that receives feedback.
struct ReplicationStreamGuard {
tx: UnboundedSender<CallmeEvent>,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
pageserver_connstr: String,
}
impl Drop for ReplicationStreamGuard {
fn drop(&mut self) {
// the connection with pageserver is lost,
// resume callback subscription
debug!(
"Connection to pageserver is gone. Resume callmemaybe subsciption if necessary. tenantid {} timelineid {}",
self.tenant_id, self.timeline_id,
);
let subscription_key = SubscriptionStateKey::new(
self.tenant_id,
self.timeline_id,
self.pageserver_connstr.to_owned(),
);
self.tx
.send(CallmeEvent::Resume(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Resume request to callmemaybe thread {}", e);
});
}
}
impl ReplicationConn {
/// Create a new `ReplicationConn`
pub fn new(pgb: &mut PostgresBackend) -> Self {
@@ -256,36 +219,6 @@ impl ReplicationConn {
};
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
// Don't spam pageserver with callmemaybe queries
// when replication connection with pageserver is already established.
let _guard = {
if spg.appname == Some("wal_proposer_recovery".to_string()) {
None
} else {
let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let zttid = spg.timeline.get().zttid;
let tx_clone = spg.timeline.get().callmemaybe_tx.clone();
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.clone(),
);
tx_clone
.send(CallmeEvent::Pause(subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
// create a guard to subscribe callback again, when this connection will exit
Some(ReplicationStreamGuard {
tx: tx_clone,
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
pageserver_connstr,
})
}
};
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?;

View File

@@ -16,7 +16,7 @@ use std::fs::{self};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::time::Duration;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::{
@@ -25,7 +25,6 @@ use utils::{
zid::{NodeId, ZTenantId, ZTenantTimelineId},
};
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use crate::control_file;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
@@ -191,79 +190,33 @@ impl SharedState {
self.wal_backup_active
}
/// start/change walsender (via callmemaybe).
fn callmemaybe_sub(
/// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections.
fn activate_walsender(
&mut self,
zttid: &ZTenantTimelineId,
pageserver_connstr: Option<&String>,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
// unsub old sub. xxx: callmemaybe is going out
let old_subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(old_subscription_key))
.unwrap_or_else(|e| {
error!("failed to send Pause request to callmemaybe thread {}", e);
});
new_pageserver_connstr: Option<String>,
) {
if self.pageserver_connstr != new_pageserver_connstr {
self.deactivate_walsender(zttid);
if new_pageserver_connstr.is_some() {
info!(
"timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}",
zttid.timeline_id,
);
}
self.pageserver_connstr = new_pageserver_connstr;
}
if let Some(pageserver_connstr) = pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
// xx: sending to channel under lock is not very cool, but
// shouldn't be a problem here. If it is, we can grab a counter
// here and later augment channel messages with it.
callmemaybe_tx
.send(CallmeEvent::Subscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Subscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is subscribed to callmemaybe to {}",
zttid.timeline_id, pageserver_connstr
);
}
self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned());
Ok(())
}
/// Deactivate the timeline: stop callmemaybe.
fn callmemaybe_unsub(
&mut self,
zttid: &ZTenantTimelineId,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Unsubscribe request to callmemaybe thread {}",
e
);
});
/// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming.
fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) {
if let Some(pageserver_connstr) = self.pageserver_connstr.take() {
info!(
"timeline {} is unsubscribed from callmemaybe to {}",
"timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}",
zttid.timeline_id,
self.pageserver_connstr.as_ref().unwrap()
);
)
}
Ok(())
}
fn get_wal_seg_size(&self) -> usize {
@@ -332,7 +285,6 @@ impl SharedState {
/// Database instance (tenant)
pub struct Timeline {
pub zttid: ZTenantTimelineId,
pub callmemaybe_tx: UnboundedSender<CallmeEvent>,
/// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending zttid instead of concrete command allows to do
/// sending without timeline lock.
@@ -348,7 +300,6 @@ pub struct Timeline {
impl Timeline {
fn new(
zttid: ZTenantTimelineId,
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
shared_state: SharedState,
) -> Timeline {
@@ -356,7 +307,6 @@ impl Timeline {
watch::channel(shared_state.sk.inmem.commit_lsn);
Timeline {
zttid,
callmemaybe_tx,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
@@ -378,7 +328,7 @@ impl Timeline {
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.callmemaybe_sub(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?;
shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned());
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
@@ -414,7 +364,7 @@ impl Timeline {
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
shared_state.deactivate_walsender(&self.zttid);
return Ok(true);
}
}
@@ -431,16 +381,14 @@ impl Timeline {
/// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active.
///
/// The callmemaybe thread is stopped by the deactivation message. We assume all other threads
/// will stop by themselves eventually (possibly with errors, but no panics). There should be no
/// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// We assume all threads will stop by themselves eventually (possibly with errors, but no panics).
/// There should be no compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// we're deleting the timeline anyway.
pub async fn deactivate_for_delete(&self) -> Result<bool> {
let was_active: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
let shared_state = self.mutex.lock().unwrap();
was_active = shared_state.active;
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
}
self.wal_backup_launcher_tx.send(self.zttid).await?;
Ok(was_active)
@@ -576,7 +524,8 @@ impl Timeline {
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connection_string: Some(conf.listen_pg_addr.clone()),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
pageserver_connstr: shared_state.pageserver_connstr.clone(),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
})
}
@@ -675,14 +624,12 @@ impl TimelineTools for Option<Arc<Timeline>> {
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
callmemaybe_tx: Option<UnboundedSender<CallmeEvent>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
}
lazy_static! {
static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
callmemaybe_tx: None,
wal_backup_launcher_tx: None,
});
}
@@ -697,13 +644,8 @@ pub struct TimelineDeleteForceResult {
pub struct GlobalTimelines;
impl GlobalTimelines {
pub fn init(
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
) {
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.callmemaybe_tx.is_none());
state.callmemaybe_tx = Some(callmemaybe_tx);
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
}
@@ -726,7 +668,6 @@ impl GlobalTimelines {
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));
@@ -778,7 +719,6 @@ impl GlobalTimelines {
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));

View File

@@ -63,10 +63,11 @@ def test_pageserver_http_get_wal_receiver_not_found(zenith_simple_env: ZenithEnv
tenant_id, timeline_id = env.zenith_cli.create_tenant()
# no PG compute node is running, so no WAL receiver is running
with pytest.raises(ZenithPageserverApiException) as e:
_ = client.wal_receiver_get(tenant_id, timeline_id)
assert "Not Found" in str(e.value)
empty_response = client.wal_receiver_get(tenant_id, timeline_id)
assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
@@ -81,7 +82,6 @@ def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
# a successful `wal_receiver_get` response must contain the below fields
assert list(res.keys()) == [
"thread_id",
"wal_producer_connstr",
"last_received_msg_lsn",
"last_received_msg_ts",

View File

@@ -1600,9 +1600,7 @@ class Postgres(PgProtocol):
for cfg_line in cfg_lines:
# walproposer uses different application_name
if ("synchronous_standby_names" in cfg_line or
# don't ask pageserver to fetch WAL from compute
"callmemaybe_connstring" in cfg_line or
# don't repeat safekeepers multiple times
# don't repeat safekeepers/wal_acceptors multiple times
"safekeepers" in cfg_line):
continue
f.write(cfg_line)

View File

@@ -13,16 +13,12 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder
@pytest.mark.parametrize('tenants_count', [1, 5, 10])
@pytest.mark.parametrize('use_safekeepers', ['with_wa', 'without_wa'])
def test_bulk_tenant_create(
zenith_env_builder: ZenithEnvBuilder,
use_safekeepers: str,
tenants_count: int,
zenbenchmark,
):
"""Measure tenant creation time (with and without wal acceptors)"""
if use_safekeepers == 'with_wa':
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
time_slices = []
@@ -31,15 +27,15 @@ def test_bulk_tenant_create(
start = timeit.default_timer()
tenant, _ = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
env.zenith_cli.create_timeline(f'test_bulk_tenant_create_{tenants_count}_{i}',
tenant_id=tenant)
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
#if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
pg_tenant = env.postgres.create_start(f'test_bulk_tenant_create_{tenants_count}_{i}',
tenant_id=tenant)
end = timeit.default_timer()
time_slices.append(end - start)