mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
Replace callmemaybe with etcd subscriptions on safekeeper timeline info
This commit is contained in:
committed by
Kirill Bulatov
parent
6623c5b9d5
commit
e5cb727572
@@ -352,7 +352,6 @@ impl PostgresNode {
|
|||||||
// This isn't really a supported configuration, but can be useful for
|
// This isn't really a supported configuration, but can be useful for
|
||||||
// testing.
|
// testing.
|
||||||
conf.append("synchronous_standby_names", "pageserver");
|
conf.append("synchronous_standby_names", "pageserver");
|
||||||
conf.append("neon.callmemaybe_connstring", &self.connstr());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
|
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::TcpStream;
|
use std::net::TcpStream;
|
||||||
|
use std::num::NonZeroU64;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -11,6 +12,7 @@ use nix::errno::Errno;
|
|||||||
use nix::sys::signal::{kill, Signal};
|
use nix::sys::signal::{kill, Signal};
|
||||||
use nix::unistd::Pid;
|
use nix::unistd::Pid;
|
||||||
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
|
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
|
||||||
|
use pageserver::tenant_mgr::TenantInfo;
|
||||||
use pageserver::timelines::TimelineInfo;
|
use pageserver::timelines::TimelineInfo;
|
||||||
use postgres::{Config, NoTls};
|
use postgres::{Config, NoTls};
|
||||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||||
@@ -26,7 +28,6 @@ use utils::{
|
|||||||
|
|
||||||
use crate::local_env::LocalEnv;
|
use crate::local_env::LocalEnv;
|
||||||
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
|
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
|
||||||
use pageserver::tenant_mgr::TenantInfo;
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum PageserverHttpError {
|
pub enum PageserverHttpError {
|
||||||
@@ -37,6 +38,12 @@ pub enum PageserverHttpError {
|
|||||||
Response(String),
|
Response(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<anyhow::Error> for PageserverHttpError {
|
||||||
|
fn from(e: anyhow::Error) -> Self {
|
||||||
|
Self::Response(e.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Result<T> = result::Result<T, PageserverHttpError>;
|
type Result<T> = result::Result<T, PageserverHttpError>;
|
||||||
|
|
||||||
pub trait ResponseErrorMessageExt: Sized {
|
pub trait ResponseErrorMessageExt: Sized {
|
||||||
@@ -410,6 +417,15 @@ impl PageServerNode {
|
|||||||
.map(|x| x.parse::<usize>())
|
.map(|x| x.parse::<usize>())
|
||||||
.transpose()?,
|
.transpose()?,
|
||||||
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
|
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
|
||||||
|
walreceiver_connect_timeout: settings
|
||||||
|
.get("walreceiver_connect_timeout")
|
||||||
|
.map(|x| x.to_string()),
|
||||||
|
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
|
||||||
|
max_lsn_wal_lag: settings
|
||||||
|
.get("max_lsn_wal_lag")
|
||||||
|
.map(|x| x.parse::<NonZeroU64>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||||
})
|
})
|
||||||
.send()?
|
.send()?
|
||||||
.error_from_body()?
|
.error_from_body()?
|
||||||
@@ -433,22 +449,41 @@ impl PageServerNode {
|
|||||||
tenant_id,
|
tenant_id,
|
||||||
checkpoint_distance: settings
|
checkpoint_distance: settings
|
||||||
.get("checkpoint_distance")
|
.get("checkpoint_distance")
|
||||||
.map(|x| x.parse::<u64>().unwrap()),
|
.map(|x| x.parse::<u64>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||||
compaction_target_size: settings
|
compaction_target_size: settings
|
||||||
.get("compaction_target_size")
|
.get("compaction_target_size")
|
||||||
.map(|x| x.parse::<u64>().unwrap()),
|
.map(|x| x.parse::<u64>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||||
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
|
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
|
||||||
compaction_threshold: settings
|
compaction_threshold: settings
|
||||||
.get("compaction_threshold")
|
.get("compaction_threshold")
|
||||||
.map(|x| x.parse::<usize>().unwrap()),
|
.map(|x| x.parse::<usize>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'compaction_threshold' as an integer")?,
|
||||||
gc_horizon: settings
|
gc_horizon: settings
|
||||||
.get("gc_horizon")
|
.get("gc_horizon")
|
||||||
.map(|x| x.parse::<u64>().unwrap()),
|
.map(|x| x.parse::<u64>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||||
gc_period: settings.get("gc_period").map(|x| x.to_string()),
|
gc_period: settings.get("gc_period").map(|x| x.to_string()),
|
||||||
image_creation_threshold: settings
|
image_creation_threshold: settings
|
||||||
.get("image_creation_threshold")
|
.get("image_creation_threshold")
|
||||||
.map(|x| x.parse::<usize>().unwrap()),
|
.map(|x| x.parse::<usize>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
|
||||||
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
|
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
|
||||||
|
walreceiver_connect_timeout: settings
|
||||||
|
.get("walreceiver_connect_timeout")
|
||||||
|
.map(|x| x.to_string()),
|
||||||
|
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
|
||||||
|
max_lsn_wal_lag: settings
|
||||||
|
.get("max_lsn_wal_lag")
|
||||||
|
.map(|x| x.parse::<NonZeroU64>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||||
})
|
})
|
||||||
.send()?
|
.send()?
|
||||||
.error_from_body()?;
|
.error_from_body()?;
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ broker_endpoints = ['some://etcd']
|
|||||||
# [remote_storage]
|
# [remote_storage]
|
||||||
```
|
```
|
||||||
|
|
||||||
The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user,
|
The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user,
|
||||||
see the corresponding section below.
|
see the corresponding section below.
|
||||||
Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank.
|
Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank.
|
||||||
Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start.
|
Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start.
|
||||||
@@ -54,7 +54,7 @@ Note that TOML distinguishes between strings and integers, the former require si
|
|||||||
|
|
||||||
A list of endpoints (etcd currently) to connect and pull the information from.
|
A list of endpoints (etcd currently) to connect and pull the information from.
|
||||||
Mandatory, does not have a default, since requires etcd to be started as a separate process,
|
Mandatory, does not have a default, since requires etcd to be started as a separate process,
|
||||||
and its connection url should be specified separately.
|
and its connection url should be specified separately.
|
||||||
|
|
||||||
#### broker_etcd_prefix
|
#### broker_etcd_prefix
|
||||||
|
|
||||||
@@ -111,6 +111,20 @@ L0 delta layer threshold for L1 image layer creation. Default is 3.
|
|||||||
|
|
||||||
WAL retention duration for PITR branching. Default is 30 days.
|
WAL retention duration for PITR branching. Default is 30 days.
|
||||||
|
|
||||||
|
#### walreceiver_connect_timeout
|
||||||
|
|
||||||
|
Time to wait to establish the wal receiver connection before failing
|
||||||
|
|
||||||
|
#### lagging_wal_timeout
|
||||||
|
|
||||||
|
Time the pageserver did not get any WAL updates from safekeeper (if any).
|
||||||
|
Avoids lagging pageserver preemptively by forcing to switch it from stalled connections.
|
||||||
|
|
||||||
|
#### max_lsn_wal_lag
|
||||||
|
|
||||||
|
Difference between Lsn values of the latest available WAL on safekeepers: if currently connected safekeeper starts to lag too long and too much,
|
||||||
|
it gets swapped to the different one.
|
||||||
|
|
||||||
#### initial_superuser_name
|
#### initial_superuser_name
|
||||||
|
|
||||||
Name of the initial superuser role, passed to initdb when a new tenant
|
Name of the initial superuser role, passed to initdb when a new tenant
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ struct SafekeeperTimeline {
|
|||||||
|
|
||||||
/// Published data about safekeeper's timeline. Fields made optional for easy migrations.
|
/// Published data about safekeeper's timeline. Fields made optional for easy migrations.
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
pub struct SkTimelineInfo {
|
pub struct SkTimelineInfo {
|
||||||
/// Term of the last entry.
|
/// Term of the last entry.
|
||||||
pub last_log_term: Option<u64>,
|
pub last_log_term: Option<u64>,
|
||||||
@@ -55,7 +55,9 @@ pub struct SkTimelineInfo {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub peer_horizon_lsn: Option<Lsn>,
|
pub peer_horizon_lsn: Option<Lsn>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub safekeeper_connection_string: Option<String>,
|
pub safekeeper_connstr: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub pageserver_connstr: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
|||||||
@@ -336,11 +336,11 @@ impl PostgresBackend {
|
|||||||
let have_tls = self.tls_config.is_some();
|
let have_tls = self.tls_config.is_some();
|
||||||
match msg {
|
match msg {
|
||||||
FeMessage::StartupPacket(m) => {
|
FeMessage::StartupPacket(m) => {
|
||||||
trace!("got startup message {:?}", m);
|
trace!("got startup message {m:?}");
|
||||||
|
|
||||||
match m {
|
match m {
|
||||||
FeStartupPacket::SslRequest => {
|
FeStartupPacket::SslRequest => {
|
||||||
info!("SSL requested");
|
debug!("SSL requested");
|
||||||
|
|
||||||
self.write_message(&BeMessage::EncryptionResponse(have_tls))?;
|
self.write_message(&BeMessage::EncryptionResponse(have_tls))?;
|
||||||
if have_tls {
|
if have_tls {
|
||||||
@@ -349,7 +349,7 @@ impl PostgresBackend {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
FeStartupPacket::GssEncRequest => {
|
FeStartupPacket::GssEncRequest => {
|
||||||
info!("GSS requested");
|
debug!("GSS requested");
|
||||||
self.write_message(&BeMessage::EncryptionResponse(false))?;
|
self.write_message(&BeMessage::EncryptionResponse(false))?;
|
||||||
}
|
}
|
||||||
FeStartupPacket::StartupMessage { .. } => {
|
FeStartupPacket::StartupMessage { .. } => {
|
||||||
@@ -433,12 +433,7 @@ impl PostgresBackend {
|
|||||||
// full cause of the error, not just the top-level context + its trace.
|
// full cause of the error, not just the top-level context + its trace.
|
||||||
// We don't want to send that in the ErrorResponse though,
|
// We don't want to send that in the ErrorResponse though,
|
||||||
// because it's not relevant to the compute node logs.
|
// because it's not relevant to the compute node logs.
|
||||||
if query_string.starts_with("callmemaybe") {
|
error!("query handler for '{}' failed: {:?}", query_string, e);
|
||||||
// FIXME avoid printing a backtrace for tenant x not found errors until this is properly fixed
|
|
||||||
error!("query handler for '{}' failed: {}", query_string, e);
|
|
||||||
} else {
|
|
||||||
error!("query handler for '{}' failed: {:?}", query_string, e);
|
|
||||||
}
|
|
||||||
self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?;
|
self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?;
|
||||||
// TODO: untangle convoluted control flow
|
// TODO: untangle convoluted control flow
|
||||||
if e.to_string().contains("failed to run") {
|
if e.to_string().contains("failed to run") {
|
||||||
|
|||||||
@@ -193,7 +193,7 @@ pub struct ZTenantId(ZId);
|
|||||||
zid_newtype!(ZTenantId);
|
zid_newtype!(ZTenantId);
|
||||||
|
|
||||||
// A pair uniquely identifying Zenith instance.
|
// A pair uniquely identifying Zenith instance.
|
||||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
pub struct ZTenantTimelineId {
|
pub struct ZTenantTimelineId {
|
||||||
pub tenant_id: ZTenantId,
|
pub tenant_id: ZTenantId,
|
||||||
pub timeline_id: ZTimelineId,
|
pub timeline_id: ZTimelineId,
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ edition = "2021"
|
|||||||
|
|
||||||
[features]
|
[features]
|
||||||
# It is simpler infra-wise to have failpoints enabled by default
|
# It is simpler infra-wise to have failpoints enabled by default
|
||||||
# It shouldn't affect perf in any way because failpoints
|
# It shouldn't affect performance in any way because failpoints
|
||||||
# are not placed in hot code paths
|
# are not placed in hot code paths
|
||||||
default = ["failpoints"]
|
default = ["failpoints"]
|
||||||
profiling = ["pprof"]
|
profiling = ["pprof"]
|
||||||
|
|||||||
@@ -480,6 +480,21 @@ impl PageServerConf {
|
|||||||
if let Some(pitr_interval) = item.get("pitr_interval") {
|
if let Some(pitr_interval) = item.get("pitr_interval") {
|
||||||
t_conf.pitr_interval = Some(parse_toml_duration("pitr_interval", pitr_interval)?);
|
t_conf.pitr_interval = Some(parse_toml_duration("pitr_interval", pitr_interval)?);
|
||||||
}
|
}
|
||||||
|
if let Some(walreceiver_connect_timeout) = item.get("walreceiver_connect_timeout") {
|
||||||
|
t_conf.walreceiver_connect_timeout = Some(parse_toml_duration(
|
||||||
|
"walreceiver_connect_timeout",
|
||||||
|
walreceiver_connect_timeout,
|
||||||
|
)?);
|
||||||
|
}
|
||||||
|
if let Some(lagging_wal_timeout) = item.get("lagging_wal_timeout") {
|
||||||
|
t_conf.lagging_wal_timeout = Some(parse_toml_duration(
|
||||||
|
"lagging_wal_timeout",
|
||||||
|
lagging_wal_timeout,
|
||||||
|
)?);
|
||||||
|
}
|
||||||
|
if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") {
|
||||||
|
t_conf.max_lsn_wal_lag = Some(parse_toml_from_str("max_lsn_wal_lag", max_lsn_wal_lag)?);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(t_conf)
|
Ok(t_conf)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
use std::num::NonZeroU64;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_with::{serde_as, DisplayFromStr};
|
use serde_with::{serde_as, DisplayFromStr};
|
||||||
use utils::{
|
use utils::{
|
||||||
@@ -33,6 +35,9 @@ pub struct TenantCreateRequest {
|
|||||||
pub gc_period: Option<String>,
|
pub gc_period: Option<String>,
|
||||||
pub image_creation_threshold: Option<usize>,
|
pub image_creation_threshold: Option<usize>,
|
||||||
pub pitr_interval: Option<String>,
|
pub pitr_interval: Option<String>,
|
||||||
|
pub walreceiver_connect_timeout: Option<String>,
|
||||||
|
pub lagging_wal_timeout: Option<String>,
|
||||||
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
@@ -68,6 +73,9 @@ pub struct TenantConfigRequest {
|
|||||||
pub gc_period: Option<String>,
|
pub gc_period: Option<String>,
|
||||||
pub image_creation_threshold: Option<usize>,
|
pub image_creation_threshold: Option<usize>,
|
||||||
pub pitr_interval: Option<String>,
|
pub pitr_interval: Option<String>,
|
||||||
|
pub walreceiver_connect_timeout: Option<String>,
|
||||||
|
pub lagging_wal_timeout: Option<String>,
|
||||||
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TenantConfigRequest {
|
impl TenantConfigRequest {
|
||||||
@@ -82,6 +90,21 @@ impl TenantConfigRequest {
|
|||||||
gc_period: None,
|
gc_period: None,
|
||||||
image_creation_threshold: None,
|
image_creation_threshold: None,
|
||||||
pitr_interval: None,
|
pitr_interval: None,
|
||||||
|
walreceiver_connect_timeout: None,
|
||||||
|
lagging_wal_timeout: None,
|
||||||
|
max_lsn_wal_lag: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
|
||||||
|
/// We keep one WAL receiver active per timeline.
|
||||||
|
#[serde_as]
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct WalReceiverEntry {
|
||||||
|
pub wal_producer_connstr: Option<String>,
|
||||||
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
pub last_received_msg_lsn: Option<Lsn>,
|
||||||
|
/// the timestamp (in microseconds) of the last received message
|
||||||
|
pub last_received_msg_ts: Option<u128>,
|
||||||
|
}
|
||||||
|
|||||||
@@ -229,23 +229,16 @@ async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Bod
|
|||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_id))?;
|
||||||
|
|
||||||
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
|
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
|
||||||
|
let wal_receiver_entry = crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
|
||||||
|
.instrument(info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id))
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| {
|
||||||
|
ApiError::NotFound(format!(
|
||||||
|
"WAL receiver data not found for tenant {tenant_id} and timeline {timeline_id}"
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
let wal_receiver = tokio::task::spawn_blocking(move || {
|
json_response(StatusCode::OK, &wal_receiver_entry)
|
||||||
let _enter =
|
|
||||||
info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id).entered();
|
|
||||||
|
|
||||||
crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from_err)?
|
|
||||||
.ok_or_else(|| {
|
|
||||||
ApiError::NotFound(format!(
|
|
||||||
"WAL receiver not found for tenant {} and timeline {}",
|
|
||||||
tenant_id, timeline_id
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
json_response(StatusCode::OK, wal_receiver)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
@@ -402,6 +395,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
|
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
|
||||||
|
tenant_conf.walreceiver_connect_timeout = Some(
|
||||||
|
humantime::parse_duration(&walreceiver_connect_timeout).map_err(ApiError::from_err)?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
|
||||||
|
tenant_conf.lagging_wal_timeout =
|
||||||
|
Some(humantime::parse_duration(&lagging_wal_timeout).map_err(ApiError::from_err)?);
|
||||||
|
}
|
||||||
|
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||||
|
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||||
|
}
|
||||||
|
|
||||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||||
@@ -449,6 +455,18 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
tenant_conf.pitr_interval =
|
tenant_conf.pitr_interval =
|
||||||
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
|
Some(humantime::parse_duration(&pitr_interval).map_err(ApiError::from_err)?);
|
||||||
}
|
}
|
||||||
|
if let Some(walreceiver_connect_timeout) = request_data.walreceiver_connect_timeout {
|
||||||
|
tenant_conf.walreceiver_connect_timeout = Some(
|
||||||
|
humantime::parse_duration(&walreceiver_connect_timeout).map_err(ApiError::from_err)?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(lagging_wal_timeout) = request_data.lagging_wal_timeout {
|
||||||
|
tenant_conf.lagging_wal_timeout =
|
||||||
|
Some(humantime::parse_duration(&lagging_wal_timeout).map_err(ApiError::from_err)?);
|
||||||
|
}
|
||||||
|
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||||
|
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||||
|
}
|
||||||
|
|
||||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ use std::collections::{BTreeSet, HashSet};
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::{File, OpenOptions};
|
use std::fs::{File, OpenOptions};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
use std::num::NonZeroU64;
|
||||||
use std::ops::{Bound::Included, Deref, Range};
|
use std::ops::{Bound::Included, Deref, Range};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{self, AtomicBool};
|
use std::sync::atomic::{self, AtomicBool};
|
||||||
@@ -557,6 +558,27 @@ impl LayeredRepository {
|
|||||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
|
||||||
|
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||||
|
tenant_conf
|
||||||
|
.walreceiver_connect_timeout
|
||||||
|
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_lagging_wal_timeout(&self) -> Duration {
|
||||||
|
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||||
|
tenant_conf
|
||||||
|
.lagging_wal_timeout
|
||||||
|
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
|
||||||
|
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||||
|
tenant_conf
|
||||||
|
.max_lsn_wal_lag
|
||||||
|
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> {
|
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> {
|
||||||
let mut tenant_conf = self.tenant_conf.write().unwrap();
|
let mut tenant_conf = self.tenant_conf.write().unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,6 @@
|
|||||||
// *status* -- show actual info about this pageserver,
|
// *status* -- show actual info about this pageserver,
|
||||||
// *pagestream* -- enter mode where smgr and pageserver talk with their
|
// *pagestream* -- enter mode where smgr and pageserver talk with their
|
||||||
// custom protocol.
|
// custom protocol.
|
||||||
// *callmemaybe <zenith timelineid> $url* -- ask pageserver to start walreceiver on $url
|
|
||||||
//
|
//
|
||||||
|
|
||||||
use anyhow::{bail, ensure, Context, Result};
|
use anyhow::{bail, ensure, Context, Result};
|
||||||
@@ -38,7 +37,6 @@ use crate::repository::Timeline;
|
|||||||
use crate::tenant_mgr;
|
use crate::tenant_mgr;
|
||||||
use crate::thread_mgr;
|
use crate::thread_mgr;
|
||||||
use crate::thread_mgr::ThreadKind;
|
use crate::thread_mgr::ThreadKind;
|
||||||
use crate::walreceiver;
|
|
||||||
use crate::CheckpointConfig;
|
use crate::CheckpointConfig;
|
||||||
use metrics::{register_histogram_vec, HistogramVec};
|
use metrics::{register_histogram_vec, HistogramVec};
|
||||||
use postgres_ffi::xlog_utils::to_pg_timestamp;
|
use postgres_ffi::xlog_utils::to_pg_timestamp;
|
||||||
@@ -716,30 +714,6 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
|
|
||||||
// Check that the timeline exists
|
// Check that the timeline exists
|
||||||
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
|
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
|
||||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
|
||||||
} else if query_string.starts_with("callmemaybe ") {
|
|
||||||
// callmemaybe <zenith tenantid as hex string> <zenith timelineid as hex string> <connstr>
|
|
||||||
// TODO lazy static
|
|
||||||
let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) ([[:xdigit:]]+) (.*)$").unwrap();
|
|
||||||
let caps = re
|
|
||||||
.captures(query_string)
|
|
||||||
.with_context(|| format!("invalid callmemaybe: '{}'", query_string))?;
|
|
||||||
|
|
||||||
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
|
|
||||||
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
|
||||||
let connstr = caps.get(3).unwrap().as_str().to_owned();
|
|
||||||
|
|
||||||
self.check_permission(Some(tenantid))?;
|
|
||||||
|
|
||||||
let _enter =
|
|
||||||
info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered();
|
|
||||||
|
|
||||||
// Check that the timeline exists
|
|
||||||
tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
|
|
||||||
.context("Cannot load local timeline")?;
|
|
||||||
|
|
||||||
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?;
|
|
||||||
|
|
||||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||||
|
|||||||
@@ -469,6 +469,9 @@ pub mod repo_harness {
|
|||||||
gc_period: Some(tenant_conf.gc_period),
|
gc_period: Some(tenant_conf.gc_period),
|
||||||
image_creation_threshold: Some(tenant_conf.image_creation_threshold),
|
image_creation_threshold: Some(tenant_conf.image_creation_threshold),
|
||||||
pitr_interval: Some(tenant_conf.pitr_interval),
|
pitr_interval: Some(tenant_conf.pitr_interval),
|
||||||
|
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
|
||||||
|
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||||
|
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@
|
|||||||
//!
|
//!
|
||||||
use crate::config::PageServerConf;
|
use crate::config::PageServerConf;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::num::NonZeroU64;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use utils::zid::ZTenantId;
|
use utils::zid::ZTenantId;
|
||||||
@@ -34,6 +35,9 @@ pub mod defaults {
|
|||||||
pub const DEFAULT_GC_PERIOD: &str = "100 s";
|
pub const DEFAULT_GC_PERIOD: &str = "100 s";
|
||||||
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
|
pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3;
|
||||||
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
|
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
|
||||||
|
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||||
|
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||||
|
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1_000_000;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Per-tenant configuration options
|
/// Per-tenant configuration options
|
||||||
@@ -68,6 +72,17 @@ pub struct TenantConf {
|
|||||||
// Page versions older than this are garbage collected away.
|
// Page versions older than this are garbage collected away.
|
||||||
#[serde(with = "humantime_serde")]
|
#[serde(with = "humantime_serde")]
|
||||||
pub pitr_interval: Duration,
|
pub pitr_interval: Duration,
|
||||||
|
/// Maximum amount of time to wait while opening a connection to receive wal, before erroring.
|
||||||
|
#[serde(with = "humantime_serde")]
|
||||||
|
pub walreceiver_connect_timeout: Duration,
|
||||||
|
/// Considers safekeepers stalled after no WAL updates were received longer than this threshold.
|
||||||
|
/// A stalled safekeeper will be changed to a newer one when it appears.
|
||||||
|
#[serde(with = "humantime_serde")]
|
||||||
|
pub lagging_wal_timeout: Duration,
|
||||||
|
/// Considers safekeepers lagging when their WAL is behind another safekeeper for more than this threshold.
|
||||||
|
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||||
|
/// to avoid eager reconnects.
|
||||||
|
pub max_lsn_wal_lag: NonZeroU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Same as TenantConf, but this struct preserves the information about
|
/// Same as TenantConf, but this struct preserves the information about
|
||||||
@@ -85,6 +100,11 @@ pub struct TenantConfOpt {
|
|||||||
pub image_creation_threshold: Option<usize>,
|
pub image_creation_threshold: Option<usize>,
|
||||||
#[serde(with = "humantime_serde")]
|
#[serde(with = "humantime_serde")]
|
||||||
pub pitr_interval: Option<Duration>,
|
pub pitr_interval: Option<Duration>,
|
||||||
|
#[serde(with = "humantime_serde")]
|
||||||
|
pub walreceiver_connect_timeout: Option<Duration>,
|
||||||
|
#[serde(with = "humantime_serde")]
|
||||||
|
pub lagging_wal_timeout: Option<Duration>,
|
||||||
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TenantConfOpt {
|
impl TenantConfOpt {
|
||||||
@@ -108,6 +128,13 @@ impl TenantConfOpt {
|
|||||||
.image_creation_threshold
|
.image_creation_threshold
|
||||||
.unwrap_or(global_conf.image_creation_threshold),
|
.unwrap_or(global_conf.image_creation_threshold),
|
||||||
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
|
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
|
||||||
|
walreceiver_connect_timeout: self
|
||||||
|
.walreceiver_connect_timeout
|
||||||
|
.unwrap_or(global_conf.walreceiver_connect_timeout),
|
||||||
|
lagging_wal_timeout: self
|
||||||
|
.lagging_wal_timeout
|
||||||
|
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||||
|
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -136,6 +163,15 @@ impl TenantConfOpt {
|
|||||||
if let Some(pitr_interval) = other.pitr_interval {
|
if let Some(pitr_interval) = other.pitr_interval {
|
||||||
self.pitr_interval = Some(pitr_interval);
|
self.pitr_interval = Some(pitr_interval);
|
||||||
}
|
}
|
||||||
|
if let Some(walreceiver_connect_timeout) = other.walreceiver_connect_timeout {
|
||||||
|
self.walreceiver_connect_timeout = Some(walreceiver_connect_timeout);
|
||||||
|
}
|
||||||
|
if let Some(lagging_wal_timeout) = other.lagging_wal_timeout {
|
||||||
|
self.lagging_wal_timeout = Some(lagging_wal_timeout);
|
||||||
|
}
|
||||||
|
if let Some(max_lsn_wal_lag) = other.max_lsn_wal_lag {
|
||||||
|
self.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,6 +191,14 @@ impl TenantConf {
|
|||||||
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||||
.expect("cannot parse default PITR interval"),
|
.expect("cannot parse default PITR interval"),
|
||||||
|
walreceiver_connect_timeout: humantime::parse_duration(
|
||||||
|
DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
|
||||||
|
)
|
||||||
|
.expect("cannot parse default walreceiver connect timeout"),
|
||||||
|
lagging_wal_timeout: humantime::parse_duration(DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT)
|
||||||
|
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||||
|
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||||
|
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,6 +219,16 @@ impl TenantConf {
|
|||||||
gc_period: Duration::from_secs(10),
|
gc_period: Duration::from_secs(10),
|
||||||
image_creation_threshold: defaults::DEFAULT_IMAGE_CREATION_THRESHOLD,
|
image_creation_threshold: defaults::DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||||
pitr_interval: Duration::from_secs(60 * 60),
|
pitr_interval: Duration::from_secs(60 * 60),
|
||||||
|
walreceiver_connect_timeout: humantime::parse_duration(
|
||||||
|
defaults::DEFAULT_WALRECEIVER_CONNECT_TIMEOUT,
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
|
lagging_wal_timeout: humantime::parse_duration(
|
||||||
|
defaults::DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT,
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
|
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||||
|
.unwrap(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,11 +8,10 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
|||||||
use crate::storage_sync::index::RemoteIndex;
|
use crate::storage_sync::index::RemoteIndex;
|
||||||
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
|
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
|
||||||
use crate::tenant_config::TenantConfOpt;
|
use crate::tenant_config::TenantConfOpt;
|
||||||
use crate::thread_mgr;
|
|
||||||
use crate::thread_mgr::ThreadKind;
|
use crate::thread_mgr::ThreadKind;
|
||||||
use crate::timelines;
|
|
||||||
use crate::timelines::CreateRepo;
|
use crate::timelines::CreateRepo;
|
||||||
use crate::walredo::PostgresRedoManager;
|
use crate::walredo::PostgresRedoManager;
|
||||||
|
use crate::{thread_mgr, timelines, walreceiver};
|
||||||
use crate::{DatadirTimelineImpl, RepositoryImpl};
|
use crate::{DatadirTimelineImpl, RepositoryImpl};
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -21,23 +20,30 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::lsn::Lsn;
|
use utils::lsn::Lsn;
|
||||||
|
|
||||||
use utils::zid::{ZTenantId, ZTimelineId};
|
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||||
|
|
||||||
mod tenants_state {
|
mod tenants_state {
|
||||||
|
use anyhow::ensure;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
|
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
|
||||||
};
|
};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tracing::{debug, error};
|
||||||
|
|
||||||
use utils::zid::ZTenantId;
|
use utils::zid::ZTenantId;
|
||||||
|
|
||||||
use crate::tenant_mgr::Tenant;
|
use crate::tenant_mgr::{LocalTimelineUpdate, Tenant};
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
static ref TENANTS: RwLock<HashMap<ZTenantId, Tenant>> = RwLock::new(HashMap::new());
|
static ref TENANTS: RwLock<HashMap<ZTenantId, Tenant>> = RwLock::new(HashMap::new());
|
||||||
|
/// Sends updates to the local timelines (creation and deletion) to the WAL receiver,
|
||||||
|
/// so that it can enable/disable corresponding processes.
|
||||||
|
static ref TIMELINE_UPDATE_SENDER: RwLock<Option<mpsc::UnboundedSender<LocalTimelineUpdate>>> = RwLock::new(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Tenant>> {
|
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||||
@@ -51,6 +57,39 @@ mod tenants_state {
|
|||||||
.write()
|
.write()
|
||||||
.expect("Failed to write() tenants lock, it got poisoned")
|
.expect("Failed to write() tenants lock, it got poisoned")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn set_timeline_update_sender(
|
||||||
|
timeline_updates_sender: mpsc::UnboundedSender<LocalTimelineUpdate>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut sender_guard = TIMELINE_UPDATE_SENDER
|
||||||
|
.write()
|
||||||
|
.expect("Failed to write() timeline_update_sender lock, it got poisoned");
|
||||||
|
ensure!(sender_guard.is_none(), "Timeline update sender already set");
|
||||||
|
*sender_guard = Some(timeline_updates_sender);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn try_send_timeline_update(update: LocalTimelineUpdate) {
|
||||||
|
match TIMELINE_UPDATE_SENDER
|
||||||
|
.read()
|
||||||
|
.expect("Failed to read() timeline_update_sender lock, it got poisoned")
|
||||||
|
.as_ref()
|
||||||
|
{
|
||||||
|
Some(sender) => {
|
||||||
|
if let Err(e) = sender.send(update) {
|
||||||
|
error!("Failed to send timeline update: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => debug!("Timeline update sender is not enabled, cannot send update {update:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn stop_timeline_update_sender() {
|
||||||
|
TIMELINE_UPDATE_SENDER
|
||||||
|
.write()
|
||||||
|
.expect("Failed to write() timeline_update_sender lock, it got poisoned")
|
||||||
|
.take();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Tenant {
|
struct Tenant {
|
||||||
@@ -87,10 +126,10 @@ pub enum TenantState {
|
|||||||
impl fmt::Display for TenantState {
|
impl fmt::Display for TenantState {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
TenantState::Active => f.write_str("Active"),
|
Self::Active => f.write_str("Active"),
|
||||||
TenantState::Idle => f.write_str("Idle"),
|
Self::Idle => f.write_str("Idle"),
|
||||||
TenantState::Stopping => f.write_str("Stopping"),
|
Self::Stopping => f.write_str("Stopping"),
|
||||||
TenantState::Broken => f.write_str("Broken"),
|
Self::Broken => f.write_str("Broken"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -99,6 +138,11 @@ impl fmt::Display for TenantState {
|
|||||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||||
/// are scheduled for download and added to the repository once download is completed.
|
/// are scheduled for download and added to the repository once download is completed.
|
||||||
pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIndex> {
|
pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIndex> {
|
||||||
|
let (timeline_updates_sender, timeline_updates_receiver) =
|
||||||
|
mpsc::unbounded_channel::<LocalTimelineUpdate>();
|
||||||
|
tenants_state::set_timeline_update_sender(timeline_updates_sender)?;
|
||||||
|
walreceiver::init_wal_receiver_main_thread(conf, timeline_updates_receiver)?;
|
||||||
|
|
||||||
let SyncStartupData {
|
let SyncStartupData {
|
||||||
remote_index,
|
remote_index,
|
||||||
local_timeline_init_statuses,
|
local_timeline_init_statuses,
|
||||||
@@ -113,16 +157,27 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
|
|||||||
// loading a tenant is serious, but it's better to complete the startup and
|
// loading a tenant is serious, but it's better to complete the startup and
|
||||||
// serve other tenants, than fail completely.
|
// serve other tenants, than fail completely.
|
||||||
error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
|
error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
|
||||||
let mut m = tenants_state::write_tenants();
|
set_tenant_state(tenant_id, TenantState::Broken)?;
|
||||||
if let Some(tenant) = m.get_mut(&tenant_id) {
|
|
||||||
tenant.state = TenantState::Broken;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(remote_index)
|
Ok(remote_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum LocalTimelineUpdate {
|
||||||
|
Detach(ZTenantTimelineId),
|
||||||
|
Attach(ZTenantTimelineId, Arc<DatadirTimelineImpl>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for LocalTimelineUpdate {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Detach(ttid) => f.debug_tuple("Remove").field(ttid).finish(),
|
||||||
|
Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Updates tenants' repositories, changing their timelines state in memory.
|
/// Updates tenants' repositories, changing their timelines state in memory.
|
||||||
pub fn apply_timeline_sync_status_updates(
|
pub fn apply_timeline_sync_status_updates(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
@@ -160,6 +215,7 @@ pub fn apply_timeline_sync_status_updates(
|
|||||||
/// Shut down all tenants. This runs as part of pageserver shutdown.
|
/// Shut down all tenants. This runs as part of pageserver shutdown.
|
||||||
///
|
///
|
||||||
pub fn shutdown_all_tenants() {
|
pub fn shutdown_all_tenants() {
|
||||||
|
tenants_state::stop_timeline_update_sender();
|
||||||
let mut m = tenants_state::write_tenants();
|
let mut m = tenants_state::write_tenants();
|
||||||
let mut tenantids = Vec::new();
|
let mut tenantids = Vec::new();
|
||||||
for (tenantid, tenant) in m.iter_mut() {
|
for (tenantid, tenant) in m.iter_mut() {
|
||||||
@@ -173,7 +229,7 @@ pub fn shutdown_all_tenants() {
|
|||||||
}
|
}
|
||||||
drop(m);
|
drop(m);
|
||||||
|
|
||||||
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None);
|
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
|
||||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
|
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
|
||||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
|
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
|
||||||
|
|
||||||
@@ -247,32 +303,49 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
|
|||||||
Some(tenants_state::read_tenants().get(&tenantid)?.state)
|
Some(tenants_state::read_tenants().get(&tenantid)?.state)
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow::Result<()> {
|
||||||
/// Change the state of a tenant to Active and launch its compactor and GC
|
|
||||||
/// threads. If the tenant was already in Active state or Stopping, does nothing.
|
|
||||||
///
|
|
||||||
pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
|
|
||||||
let mut m = tenants_state::write_tenants();
|
let mut m = tenants_state::write_tenants();
|
||||||
let tenant = m
|
let tenant = m
|
||||||
.get_mut(&tenant_id)
|
.get_mut(&tenant_id)
|
||||||
.with_context(|| format!("Tenant not found for id {tenant_id}"))?;
|
.with_context(|| format!("Tenant not found for id {tenant_id}"))?;
|
||||||
|
let old_state = tenant.state;
|
||||||
|
tenant.state = new_state;
|
||||||
|
drop(m);
|
||||||
|
|
||||||
info!("activating tenant {tenant_id}");
|
match (old_state, new_state) {
|
||||||
|
(TenantState::Broken, TenantState::Broken)
|
||||||
match tenant.state {
|
| (TenantState::Active, TenantState::Active)
|
||||||
// If the tenant is already active, nothing to do.
|
| (TenantState::Idle, TenantState::Idle)
|
||||||
TenantState::Active => {}
|
| (TenantState::Stopping, TenantState::Stopping) => {
|
||||||
|
debug!("tenant {tenant_id} already in state {new_state}");
|
||||||
// If it's Idle, launch the compactor and GC threads
|
}
|
||||||
TenantState::Idle => {
|
(TenantState::Broken, ignored) => {
|
||||||
thread_mgr::spawn(
|
debug!("Ignoring {ignored} since tenant {tenant_id} is in broken state");
|
||||||
|
}
|
||||||
|
(_, TenantState::Broken) => {
|
||||||
|
debug!("Setting tenant {tenant_id} status to broken");
|
||||||
|
}
|
||||||
|
(TenantState::Stopping, ignored) => {
|
||||||
|
debug!("Ignoring {ignored} since tenant {tenant_id} is in stopping state");
|
||||||
|
}
|
||||||
|
(TenantState::Idle, TenantState::Active) => {
|
||||||
|
info!("activating tenant {tenant_id}");
|
||||||
|
let compactor_spawn_result = thread_mgr::spawn(
|
||||||
ThreadKind::Compactor,
|
ThreadKind::Compactor,
|
||||||
Some(tenant_id),
|
Some(tenant_id),
|
||||||
None,
|
None,
|
||||||
"Compactor thread",
|
"Compactor thread",
|
||||||
false,
|
false,
|
||||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||||
)?;
|
);
|
||||||
|
if compactor_spawn_result.is_err() {
|
||||||
|
let mut m = tenants_state::write_tenants();
|
||||||
|
m.get_mut(&tenant_id)
|
||||||
|
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
|
||||||
|
.state = old_state;
|
||||||
|
drop(m);
|
||||||
|
}
|
||||||
|
compactor_spawn_result?;
|
||||||
|
|
||||||
let gc_spawn_result = thread_mgr::spawn(
|
let gc_spawn_result = thread_mgr::spawn(
|
||||||
ThreadKind::GarbageCollector,
|
ThreadKind::GarbageCollector,
|
||||||
@@ -286,21 +359,31 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
|
|||||||
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
||||||
|
|
||||||
if let Err(e) = &gc_spawn_result {
|
if let Err(e) = &gc_spawn_result {
|
||||||
|
let mut m = tenants_state::write_tenants();
|
||||||
|
m.get_mut(&tenant_id)
|
||||||
|
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
|
||||||
|
.state = old_state;
|
||||||
|
drop(m);
|
||||||
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
||||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||||
return gc_spawn_result;
|
return gc_spawn_result;
|
||||||
}
|
}
|
||||||
tenant.state = TenantState::Active;
|
|
||||||
}
|
}
|
||||||
|
(TenantState::Idle, TenantState::Stopping) => {
|
||||||
TenantState::Stopping => {
|
info!("stopping idle tenant {tenant_id}");
|
||||||
// don't re-activate it if it's being stopped
|
|
||||||
}
|
}
|
||||||
|
(TenantState::Active, TenantState::Stopping | TenantState::Idle) => {
|
||||||
TenantState::Broken => {
|
info!("stopping tenant {tenant_id} threads due to new state {new_state}");
|
||||||
// cannot activate
|
thread_mgr::shutdown_threads(
|
||||||
|
Some(ThreadKind::WalReceiverManager),
|
||||||
|
Some(tenant_id),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
|
||||||
|
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -325,15 +408,15 @@ pub fn get_local_timeline_with_load(
|
|||||||
.with_context(|| format!("Tenant {tenant_id} not found"))?;
|
.with_context(|| format!("Tenant {tenant_id} not found"))?;
|
||||||
|
|
||||||
if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) {
|
if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) {
|
||||||
return Ok(Arc::clone(page_tline));
|
Ok(Arc::clone(page_tline))
|
||||||
|
} else {
|
||||||
|
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
|
||||||
|
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
|
||||||
|
tenant
|
||||||
|
.local_timelines
|
||||||
|
.insert(timeline_id, Arc::clone(&page_tline));
|
||||||
|
Ok(page_tline)
|
||||||
}
|
}
|
||||||
|
|
||||||
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
|
|
||||||
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
|
|
||||||
tenant
|
|
||||||
.local_timelines
|
|
||||||
.insert(timeline_id, Arc::clone(&page_tline));
|
|
||||||
Ok(page_tline)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn detach_timeline(
|
pub fn detach_timeline(
|
||||||
@@ -351,6 +434,9 @@ pub fn detach_timeline(
|
|||||||
.detach_timeline(timeline_id)
|
.detach_timeline(timeline_id)
|
||||||
.context("Failed to detach inmem tenant timeline")?;
|
.context("Failed to detach inmem tenant timeline")?;
|
||||||
tenant.local_timelines.remove(&timeline_id);
|
tenant.local_timelines.remove(&timeline_id);
|
||||||
|
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach(
|
||||||
|
ZTenantTimelineId::new(tenant_id, timeline_id),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
None => bail!("Tenant {tenant_id} not found in local tenant state"),
|
None => bail!("Tenant {tenant_id} not found in local tenant state"),
|
||||||
}
|
}
|
||||||
@@ -379,6 +465,12 @@ fn load_local_timeline(
|
|||||||
repartition_distance,
|
repartition_distance,
|
||||||
));
|
));
|
||||||
page_tline.init_logical_size()?;
|
page_tline.init_logical_size()?;
|
||||||
|
|
||||||
|
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach(
|
||||||
|
ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
|
||||||
|
Arc::clone(&page_tline),
|
||||||
|
));
|
||||||
|
|
||||||
Ok(page_tline)
|
Ok(page_tline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -91,8 +91,8 @@ pub enum ThreadKind {
|
|||||||
// associated with one later, after receiving a command from the client.
|
// associated with one later, after receiving a command from the client.
|
||||||
PageRequestHandler,
|
PageRequestHandler,
|
||||||
|
|
||||||
// Thread that connects to a safekeeper to fetch WAL for one timeline.
|
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
|
||||||
WalReceiver,
|
WalReceiverManager,
|
||||||
|
|
||||||
// Thread that handles compaction of all timelines for a tenant.
|
// Thread that handles compaction of all timelines for a tenant.
|
||||||
Compactor,
|
Compactor,
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
405
pageserver/src/walreceiver/connection_handler.rs
Normal file
405
pageserver/src/walreceiver/connection_handler.rs
Normal file
@@ -0,0 +1,405 @@
|
|||||||
|
//! Actual Postgres connection handler to stream WAL to the server.
|
||||||
|
//! Runs as a separate, cancellable Tokio task.
|
||||||
|
use std::{
|
||||||
|
str::FromStr,
|
||||||
|
sync::Arc,
|
||||||
|
time::{Duration, SystemTime},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::{bail, ensure, Context};
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use fail::fail_point;
|
||||||
|
use postgres::{SimpleQueryMessage, SimpleQueryRow};
|
||||||
|
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||||
|
use postgres_protocol::message::backend::ReplicationMessage;
|
||||||
|
use postgres_types::PgLsn;
|
||||||
|
use tokio::{pin, select, sync::watch, time};
|
||||||
|
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
|
||||||
|
use utils::{
|
||||||
|
lsn::Lsn,
|
||||||
|
pq_proto::ZenithFeedback,
|
||||||
|
zid::{NodeId, ZTenantTimelineId},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
http::models::WalReceiverEntry,
|
||||||
|
repository::{Repository, Timeline},
|
||||||
|
tenant_mgr,
|
||||||
|
walingest::WalIngest,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum WalConnectionEvent {
|
||||||
|
Started,
|
||||||
|
NewWal(ZenithFeedback),
|
||||||
|
End(Result<(), String>),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A wrapper around standalone Tokio task, to poll its updates or cancel the task.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct WalReceiverConnection {
|
||||||
|
handle: tokio::task::JoinHandle<()>,
|
||||||
|
cancellation: watch::Sender<()>,
|
||||||
|
events_receiver: watch::Receiver<WalConnectionEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WalReceiverConnection {
|
||||||
|
/// Initializes the connection task, returning a set of handles on top of it.
|
||||||
|
/// The task is started immediately after the creation, fails if no connection is established during the timeout given.
|
||||||
|
pub fn open(
|
||||||
|
id: ZTenantTimelineId,
|
||||||
|
safekeeper_id: NodeId,
|
||||||
|
wal_producer_connstr: String,
|
||||||
|
connect_timeout: Duration,
|
||||||
|
) -> Self {
|
||||||
|
let (cancellation, mut cancellation_receiver) = watch::channel(());
|
||||||
|
let (events_sender, events_receiver) = watch::channel(WalConnectionEvent::Started);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(
|
||||||
|
async move {
|
||||||
|
let connection_result = handle_walreceiver_connection(
|
||||||
|
id,
|
||||||
|
&wal_producer_connstr,
|
||||||
|
&events_sender,
|
||||||
|
&mut cancellation_receiver,
|
||||||
|
connect_timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
format!("Walreceiver connection for id {id} failed with error: {e:#}")
|
||||||
|
});
|
||||||
|
|
||||||
|
match &connection_result {
|
||||||
|
Ok(()) => {
|
||||||
|
debug!("Walreceiver connection for id {id} ended successfully")
|
||||||
|
}
|
||||||
|
Err(e) => warn!("{e}"),
|
||||||
|
}
|
||||||
|
events_sender
|
||||||
|
.send(WalConnectionEvent::End(connection_result))
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
.instrument(info_span!("safekeeper_handle", sk = %safekeeper_id)),
|
||||||
|
);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
handle,
|
||||||
|
cancellation,
|
||||||
|
events_receiver,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Polls for the next WAL receiver event, if there's any available since the last check.
|
||||||
|
/// Blocks if there's no new event available, returns `None` if no new events will ever occur.
|
||||||
|
/// Only the last event is returned, all events received between observatins are lost.
|
||||||
|
pub async fn next_event(&mut self) -> Option<WalConnectionEvent> {
|
||||||
|
match self.events_receiver.changed().await {
|
||||||
|
Ok(()) => Some(self.events_receiver.borrow().clone()),
|
||||||
|
Err(_cancellation_error) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gracefully aborts current WAL streaming task, waiting for the current WAL streamed.
|
||||||
|
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
|
||||||
|
self.cancellation.send(()).ok();
|
||||||
|
let handle = &mut self.handle;
|
||||||
|
handle
|
||||||
|
.await
|
||||||
|
.context("Failed to join on a walreceiver connection task")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_walreceiver_connection(
|
||||||
|
id: ZTenantTimelineId,
|
||||||
|
wal_producer_connstr: &str,
|
||||||
|
events_sender: &watch::Sender<WalConnectionEvent>,
|
||||||
|
cancellation: &mut watch::Receiver<()>,
|
||||||
|
connect_timeout: Duration,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// Connect to the database in replication mode.
|
||||||
|
info!("connecting to {wal_producer_connstr}");
|
||||||
|
let connect_cfg =
|
||||||
|
format!("{wal_producer_connstr} application_name=pageserver replication=true");
|
||||||
|
|
||||||
|
let (mut replication_client, connection) = time::timeout(
|
||||||
|
connect_timeout,
|
||||||
|
tokio_postgres::connect(&connect_cfg, postgres::NoTls),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context("Timed out while waiting for walreceiver connection to open")?
|
||||||
|
.context("Failed to open walreceiver conection")?;
|
||||||
|
// The connection object performs the actual communication with the database,
|
||||||
|
// so spawn it off to run on its own.
|
||||||
|
let mut connection_cancellation = cancellation.clone();
|
||||||
|
tokio::spawn(
|
||||||
|
async move {
|
||||||
|
info!("connected!");
|
||||||
|
select! {
|
||||||
|
connection_result = connection => match connection_result{
|
||||||
|
Ok(()) => info!("Walreceiver db connection closed"),
|
||||||
|
Err(connection_error) => {
|
||||||
|
if connection_error.is_closed() {
|
||||||
|
info!("Connection closed regularly: {connection_error}")
|
||||||
|
} else {
|
||||||
|
warn!("Connection aborted: {connection_error}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
_ = connection_cancellation.changed() => info!("Connection cancelled"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(info_span!("safekeeper_handle_db")),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||||
|
// One of the pros of `defer!` is that this will *most probably*
|
||||||
|
// get called, even in presence of panics.
|
||||||
|
let gauge = crate::LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
|
||||||
|
gauge.inc();
|
||||||
|
scopeguard::defer! {
|
||||||
|
gauge.dec();
|
||||||
|
}
|
||||||
|
|
||||||
|
let identify = identify_system(&mut replication_client).await?;
|
||||||
|
info!("{identify:?}");
|
||||||
|
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
|
||||||
|
let mut caught_up = false;
|
||||||
|
let ZTenantTimelineId {
|
||||||
|
tenant_id,
|
||||||
|
timeline_id,
|
||||||
|
} = id;
|
||||||
|
|
||||||
|
let (repo, timeline) = tokio::task::spawn_blocking(move || {
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
|
||||||
|
.with_context(|| format!("no repository found for tenant {tenant_id}"))?;
|
||||||
|
let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)
|
||||||
|
.with_context(|| {
|
||||||
|
format!("local timeline {timeline_id} not found for tenant {tenant_id}")
|
||||||
|
})?;
|
||||||
|
Ok::<_, anyhow::Error>((repo, timeline))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("Failed to spawn blocking task to get repository and timeline for tenant {tenant_id} timeline {timeline_id}"))??;
|
||||||
|
|
||||||
|
//
|
||||||
|
// Start streaming the WAL, from where we left off previously.
|
||||||
|
//
|
||||||
|
// If we had previously received WAL up to some point in the middle of a WAL record, we
|
||||||
|
// better start from the end of last full WAL record, not in the middle of one.
|
||||||
|
let mut last_rec_lsn = timeline.get_last_record_lsn();
|
||||||
|
let mut startpoint = last_rec_lsn;
|
||||||
|
|
||||||
|
if startpoint == Lsn(0) {
|
||||||
|
bail!("No previous WAL position");
|
||||||
|
}
|
||||||
|
|
||||||
|
// There might be some padding after the last full record, skip it.
|
||||||
|
startpoint += startpoint.calc_padding(8u32);
|
||||||
|
|
||||||
|
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, server is at {end_of_wal}...");
|
||||||
|
|
||||||
|
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
|
||||||
|
|
||||||
|
let copy_stream = replication_client.copy_both_simple(&query).await?;
|
||||||
|
let physical_stream = ReplicationStream::new(copy_stream);
|
||||||
|
pin!(physical_stream);
|
||||||
|
|
||||||
|
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||||
|
|
||||||
|
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;
|
||||||
|
|
||||||
|
while let Some(replication_message) = {
|
||||||
|
select! {
|
||||||
|
// check for shutdown first
|
||||||
|
biased;
|
||||||
|
_ = cancellation.changed() => {
|
||||||
|
info!("walreceiver interrupted");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
replication_message = physical_stream.next() => replication_message,
|
||||||
|
}
|
||||||
|
} {
|
||||||
|
let replication_message = replication_message?;
|
||||||
|
let status_update = match replication_message {
|
||||||
|
ReplicationMessage::XLogData(xlog_data) => {
|
||||||
|
// Pass the WAL data to the decoder, and see if we can decode
|
||||||
|
// more records as a result.
|
||||||
|
let data = xlog_data.data();
|
||||||
|
let startlsn = Lsn::from(xlog_data.wal_start());
|
||||||
|
let endlsn = startlsn + data.len() as u64;
|
||||||
|
|
||||||
|
trace!("received XLogData between {startlsn} and {endlsn}");
|
||||||
|
|
||||||
|
waldecoder.feed_bytes(data);
|
||||||
|
|
||||||
|
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||||
|
let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||||
|
|
||||||
|
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||||
|
// aligned and can be several bytes bigger. Without this alignment we are
|
||||||
|
// at risk of hitting a deadlock.
|
||||||
|
ensure!(lsn.is_aligned());
|
||||||
|
|
||||||
|
walingest.ingest_record(&timeline, recdata, lsn)?;
|
||||||
|
|
||||||
|
fail_point!("walreceiver-after-ingest");
|
||||||
|
|
||||||
|
last_rec_lsn = lsn;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !caught_up && endlsn >= end_of_wal {
|
||||||
|
info!("caught up at LSN {endlsn}");
|
||||||
|
caught_up = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let timeline_to_check = Arc::clone(&timeline.tline);
|
||||||
|
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("Spawned checkpoint check task panicked for timeline {id}")
|
||||||
|
})?
|
||||||
|
.with_context(|| {
|
||||||
|
format!("Failed to check checkpoint distance for timeline {id}")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Some(endlsn)
|
||||||
|
}
|
||||||
|
|
||||||
|
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
|
||||||
|
let wal_end = keepalive.wal_end();
|
||||||
|
let timestamp = keepalive.timestamp();
|
||||||
|
let reply_requested = keepalive.reply() != 0;
|
||||||
|
|
||||||
|
trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
|
||||||
|
|
||||||
|
if reply_requested {
|
||||||
|
Some(last_rec_lsn)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(last_lsn) = status_update {
|
||||||
|
let remote_index = repo.get_remote_index();
|
||||||
|
let timeline_remote_consistent_lsn = remote_index
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
// here we either do not have this timeline in remote index
|
||||||
|
// or there were no checkpoints for it yet
|
||||||
|
.timeline_entry(&ZTenantTimelineId {
|
||||||
|
tenant_id,
|
||||||
|
timeline_id,
|
||||||
|
})
|
||||||
|
.map(|remote_timeline| remote_timeline.metadata.disk_consistent_lsn())
|
||||||
|
// no checkpoint was uploaded
|
||||||
|
.unwrap_or(Lsn(0));
|
||||||
|
|
||||||
|
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
|
||||||
|
let write_lsn = u64::from(last_lsn);
|
||||||
|
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
|
||||||
|
let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn());
|
||||||
|
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
|
||||||
|
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
|
||||||
|
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
|
||||||
|
let ts = SystemTime::now();
|
||||||
|
|
||||||
|
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
|
||||||
|
{
|
||||||
|
super::WAL_RECEIVER_ENTRIES.write().await.insert(
|
||||||
|
id,
|
||||||
|
WalReceiverEntry {
|
||||||
|
wal_producer_connstr: Some(wal_producer_connstr.to_owned()),
|
||||||
|
last_received_msg_lsn: Some(last_lsn),
|
||||||
|
last_received_msg_ts: Some(
|
||||||
|
ts.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.expect("Received message time should be before UNIX EPOCH!")
|
||||||
|
.as_micros(),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send zenith feedback message.
|
||||||
|
// Regular standby_status_update fields are put into this message.
|
||||||
|
let zenith_status_update = ZenithFeedback {
|
||||||
|
current_timeline_size: timeline.get_current_logical_size() as u64,
|
||||||
|
ps_writelsn: write_lsn,
|
||||||
|
ps_flushlsn: flush_lsn,
|
||||||
|
ps_applylsn: apply_lsn,
|
||||||
|
ps_replytime: ts,
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!("zenith_status_update {zenith_status_update:?}");
|
||||||
|
|
||||||
|
let mut data = BytesMut::new();
|
||||||
|
zenith_status_update.serialize(&mut data)?;
|
||||||
|
physical_stream
|
||||||
|
.as_mut()
|
||||||
|
.zenith_status_update(data.len() as u64, &data)
|
||||||
|
.await?;
|
||||||
|
if let Err(e) = events_sender.send(WalConnectionEvent::NewWal(zenith_status_update)) {
|
||||||
|
warn!("Wal connection event listener dropped, aborting the connection: {e}");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
|
||||||
|
///
|
||||||
|
/// See the [postgres docs] for more details.
|
||||||
|
///
|
||||||
|
/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
|
||||||
|
#[derive(Debug)]
|
||||||
|
// As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
|
||||||
|
// unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct IdentifySystem {
|
||||||
|
systemid: u64,
|
||||||
|
timeline: u32,
|
||||||
|
xlogpos: PgLsn,
|
||||||
|
dbname: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// There was a problem parsing the response to
|
||||||
|
/// a postgres IDENTIFY_SYSTEM command.
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
#[error("IDENTIFY_SYSTEM parse error")]
|
||||||
|
struct IdentifyError;
|
||||||
|
|
||||||
|
/// Run the postgres `IDENTIFY_SYSTEM` command
|
||||||
|
async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem> {
|
||||||
|
let query_str = "IDENTIFY_SYSTEM";
|
||||||
|
let response = client.simple_query(query_str).await?;
|
||||||
|
|
||||||
|
// get(N) from row, then parse it as some destination type.
|
||||||
|
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
|
||||||
|
where
|
||||||
|
T: FromStr,
|
||||||
|
{
|
||||||
|
let val = row.get(idx).ok_or(IdentifyError)?;
|
||||||
|
val.parse::<T>().or(Err(IdentifyError))
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract the row contents into an IdentifySystem struct.
|
||||||
|
// written as a closure so I can use ? for Option here.
|
||||||
|
if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
|
||||||
|
Ok(IdentifySystem {
|
||||||
|
systemid: get_parse(first_row, 0)?,
|
||||||
|
timeline: get_parse(first_row, 1)?,
|
||||||
|
xlogpos: get_parse(first_row, 2)?,
|
||||||
|
dbname: get_parse(first_row, 3).ok(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(IdentifyError.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,7 +16,8 @@ use toml_edit::Document;
|
|||||||
use tracing::*;
|
use tracing::*;
|
||||||
use url::{ParseError, Url};
|
use url::{ParseError, Url};
|
||||||
|
|
||||||
use safekeeper::control_file::{self};
|
use safekeeper::broker;
|
||||||
|
use safekeeper::control_file;
|
||||||
use safekeeper::defaults::{
|
use safekeeper::defaults::{
|
||||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
|
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
|
||||||
};
|
};
|
||||||
@@ -26,7 +27,6 @@ use safekeeper::timeline::GlobalTimelines;
|
|||||||
use safekeeper::wal_backup;
|
use safekeeper::wal_backup;
|
||||||
use safekeeper::wal_service;
|
use safekeeper::wal_service;
|
||||||
use safekeeper::SafeKeeperConf;
|
use safekeeper::SafeKeeperConf;
|
||||||
use safekeeper::{broker, callmemaybe};
|
|
||||||
use utils::{
|
use utils::{
|
||||||
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
|
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
|
||||||
zid::NodeId,
|
zid::NodeId,
|
||||||
@@ -272,9 +272,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
|||||||
|
|
||||||
let signals = signals::install_shutdown_handlers()?;
|
let signals = signals::install_shutdown_handlers()?;
|
||||||
let mut threads = vec![];
|
let mut threads = vec![];
|
||||||
let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel();
|
|
||||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||||
GlobalTimelines::init(callmemaybe_tx, wal_backup_launcher_tx);
|
GlobalTimelines::init(wal_backup_launcher_tx);
|
||||||
|
|
||||||
let conf_ = conf.clone();
|
let conf_ = conf.clone();
|
||||||
threads.push(
|
threads.push(
|
||||||
@@ -296,29 +295,14 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
|||||||
let safekeeper_thread = thread::Builder::new()
|
let safekeeper_thread = thread::Builder::new()
|
||||||
.name("Safekeeper thread".into())
|
.name("Safekeeper thread".into())
|
||||||
.spawn(|| {
|
.spawn(|| {
|
||||||
// thread code
|
if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) {
|
||||||
let thread_result = wal_service::thread_main(conf_cloned, pg_listener);
|
info!("safekeeper thread terminated: {e}");
|
||||||
if let Err(e) = thread_result {
|
|
||||||
info!("safekeeper thread terminated: {}", e);
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
threads.push(safekeeper_thread);
|
threads.push(safekeeper_thread);
|
||||||
|
|
||||||
let conf_cloned = conf.clone();
|
|
||||||
let callmemaybe_thread = thread::Builder::new()
|
|
||||||
.name("callmemaybe thread".into())
|
|
||||||
.spawn(|| {
|
|
||||||
// thread code
|
|
||||||
let thread_result = callmemaybe::thread_main(conf_cloned, callmemaybe_rx);
|
|
||||||
if let Err(e) = thread_result {
|
|
||||||
error!("callmemaybe thread terminated: {}", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
threads.push(callmemaybe_thread);
|
|
||||||
|
|
||||||
if !conf.broker_endpoints.is_empty() {
|
if !conf.broker_endpoints.is_empty() {
|
||||||
let conf_ = conf.clone();
|
let conf_ = conf.clone();
|
||||||
threads.push(
|
threads.push(
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ use url::Url;
|
|||||||
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId};
|
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId};
|
||||||
|
|
||||||
pub mod broker;
|
pub mod broker;
|
||||||
pub mod callmemaybe;
|
|
||||||
pub mod control_file;
|
pub mod control_file;
|
||||||
pub mod control_file_upgrade;
|
pub mod control_file_upgrade;
|
||||||
pub mod handler;
|
pub mod handler;
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ use anyhow::{bail, Context, Result};
|
|||||||
|
|
||||||
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
|
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
|
||||||
|
|
||||||
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
@@ -17,7 +16,6 @@ use std::sync::Arc;
|
|||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{str, thread};
|
use std::{str, thread};
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::{
|
use utils::{
|
||||||
bin_ser::BeSer,
|
bin_ser::BeSer,
|
||||||
@@ -25,7 +23,6 @@ use utils::{
|
|||||||
postgres_backend::PostgresBackend,
|
postgres_backend::PostgresBackend,
|
||||||
pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback},
|
pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback},
|
||||||
sock_split::ReadStream,
|
sock_split::ReadStream,
|
||||||
zid::{ZTenantId, ZTimelineId},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// See: https://www.postgresql.org/docs/13/protocol-replication.html
|
// See: https://www.postgresql.org/docs/13/protocol-replication.html
|
||||||
@@ -83,40 +80,6 @@ impl Drop for ReplicationConnGuard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX: Naming is a bit messy here.
|
|
||||||
// This ReplicationStreamGuard lives as long as ReplicationConn
|
|
||||||
// and current ReplicationConnGuard is tied to the background thread
|
|
||||||
// that receives feedback.
|
|
||||||
struct ReplicationStreamGuard {
|
|
||||||
tx: UnboundedSender<CallmeEvent>,
|
|
||||||
tenant_id: ZTenantId,
|
|
||||||
timeline_id: ZTimelineId,
|
|
||||||
pageserver_connstr: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for ReplicationStreamGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
// the connection with pageserver is lost,
|
|
||||||
// resume callback subscription
|
|
||||||
debug!(
|
|
||||||
"Connection to pageserver is gone. Resume callmemaybe subsciption if necessary. tenantid {} timelineid {}",
|
|
||||||
self.tenant_id, self.timeline_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
let subscription_key = SubscriptionStateKey::new(
|
|
||||||
self.tenant_id,
|
|
||||||
self.timeline_id,
|
|
||||||
self.pageserver_connstr.to_owned(),
|
|
||||||
);
|
|
||||||
|
|
||||||
self.tx
|
|
||||||
.send(CallmeEvent::Resume(subscription_key))
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
error!("failed to send Resume request to callmemaybe thread {}", e);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ReplicationConn {
|
impl ReplicationConn {
|
||||||
/// Create a new `ReplicationConn`
|
/// Create a new `ReplicationConn`
|
||||||
pub fn new(pgb: &mut PostgresBackend) -> Self {
|
pub fn new(pgb: &mut PostgresBackend) -> Self {
|
||||||
@@ -256,36 +219,6 @@ impl ReplicationConn {
|
|||||||
};
|
};
|
||||||
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
|
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
|
||||||
|
|
||||||
// Don't spam pageserver with callmemaybe queries
|
|
||||||
// when replication connection with pageserver is already established.
|
|
||||||
let _guard = {
|
|
||||||
if spg.appname == Some("wal_proposer_recovery".to_string()) {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
|
|
||||||
let zttid = spg.timeline.get().zttid;
|
|
||||||
let tx_clone = spg.timeline.get().callmemaybe_tx.clone();
|
|
||||||
let subscription_key = SubscriptionStateKey::new(
|
|
||||||
zttid.tenant_id,
|
|
||||||
zttid.timeline_id,
|
|
||||||
pageserver_connstr.clone(),
|
|
||||||
);
|
|
||||||
tx_clone
|
|
||||||
.send(CallmeEvent::Pause(subscription_key))
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
error!("failed to send Pause request to callmemaybe thread {}", e);
|
|
||||||
});
|
|
||||||
|
|
||||||
// create a guard to subscribe callback again, when this connection will exit
|
|
||||||
Some(ReplicationStreamGuard {
|
|
||||||
tx: tx_clone,
|
|
||||||
tenant_id: zttid.tenant_id,
|
|
||||||
timeline_id: zttid.timeline_id,
|
|
||||||
pageserver_connstr,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// switch to copy
|
// switch to copy
|
||||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ use std::fs::{self};
|
|||||||
|
|
||||||
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
|
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc::{Sender, UnboundedSender};
|
use tokio::sync::mpsc::Sender;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
use utils::{
|
use utils::{
|
||||||
@@ -25,7 +25,6 @@ use utils::{
|
|||||||
zid::{NodeId, ZTenantId, ZTenantTimelineId},
|
zid::{NodeId, ZTenantId, ZTenantTimelineId},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
|
|
||||||
use crate::control_file;
|
use crate::control_file;
|
||||||
use crate::safekeeper::{
|
use crate::safekeeper::{
|
||||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||||
@@ -191,79 +190,33 @@ impl SharedState {
|
|||||||
self.wal_backup_active
|
self.wal_backup_active
|
||||||
}
|
}
|
||||||
|
|
||||||
/// start/change walsender (via callmemaybe).
|
/// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections.
|
||||||
fn callmemaybe_sub(
|
fn activate_walsender(
|
||||||
&mut self,
|
&mut self,
|
||||||
zttid: &ZTenantTimelineId,
|
zttid: &ZTenantTimelineId,
|
||||||
pageserver_connstr: Option<&String>,
|
new_pageserver_connstr: Option<String>,
|
||||||
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
|
) {
|
||||||
) -> Result<()> {
|
if self.pageserver_connstr != new_pageserver_connstr {
|
||||||
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
|
self.deactivate_walsender(zttid);
|
||||||
// unsub old sub. xxx: callmemaybe is going out
|
|
||||||
let old_subscription_key = SubscriptionStateKey::new(
|
if new_pageserver_connstr.is_some() {
|
||||||
zttid.tenant_id,
|
info!(
|
||||||
zttid.timeline_id,
|
"timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}",
|
||||||
pageserver_connstr.to_owned(),
|
zttid.timeline_id,
|
||||||
);
|
);
|
||||||
callmemaybe_tx
|
}
|
||||||
.send(CallmeEvent::Unsubscribe(old_subscription_key))
|
self.pageserver_connstr = new_pageserver_connstr;
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
error!("failed to send Pause request to callmemaybe thread {}", e);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
if let Some(pageserver_connstr) = pageserver_connstr {
|
|
||||||
let subscription_key = SubscriptionStateKey::new(
|
|
||||||
zttid.tenant_id,
|
|
||||||
zttid.timeline_id,
|
|
||||||
pageserver_connstr.to_owned(),
|
|
||||||
);
|
|
||||||
// xx: sending to channel under lock is not very cool, but
|
|
||||||
// shouldn't be a problem here. If it is, we can grab a counter
|
|
||||||
// here and later augment channel messages with it.
|
|
||||||
callmemaybe_tx
|
|
||||||
.send(CallmeEvent::Subscribe(subscription_key))
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
error!(
|
|
||||||
"failed to send Subscribe request to callmemaybe thread {}",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
});
|
|
||||||
info!(
|
|
||||||
"timeline {} is subscribed to callmemaybe to {}",
|
|
||||||
zttid.timeline_id, pageserver_connstr
|
|
||||||
);
|
|
||||||
}
|
|
||||||
self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned());
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deactivate the timeline: stop callmemaybe.
|
/// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming.
|
||||||
fn callmemaybe_unsub(
|
fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) {
|
||||||
&mut self,
|
if let Some(pageserver_connstr) = self.pageserver_connstr.take() {
|
||||||
zttid: &ZTenantTimelineId,
|
|
||||||
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
|
|
||||||
) -> Result<()> {
|
|
||||||
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
|
|
||||||
let subscription_key = SubscriptionStateKey::new(
|
|
||||||
zttid.tenant_id,
|
|
||||||
zttid.timeline_id,
|
|
||||||
pageserver_connstr.to_owned(),
|
|
||||||
);
|
|
||||||
callmemaybe_tx
|
|
||||||
.send(CallmeEvent::Unsubscribe(subscription_key))
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
error!(
|
|
||||||
"failed to send Unsubscribe request to callmemaybe thread {}",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
});
|
|
||||||
info!(
|
info!(
|
||||||
"timeline {} is unsubscribed from callmemaybe to {}",
|
"timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}",
|
||||||
zttid.timeline_id,
|
zttid.timeline_id,
|
||||||
self.pageserver_connstr.as_ref().unwrap()
|
)
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_wal_seg_size(&self) -> usize {
|
fn get_wal_seg_size(&self) -> usize {
|
||||||
@@ -332,7 +285,6 @@ impl SharedState {
|
|||||||
/// Database instance (tenant)
|
/// Database instance (tenant)
|
||||||
pub struct Timeline {
|
pub struct Timeline {
|
||||||
pub zttid: ZTenantTimelineId,
|
pub zttid: ZTenantTimelineId,
|
||||||
pub callmemaybe_tx: UnboundedSender<CallmeEvent>,
|
|
||||||
/// Sending here asks for wal backup launcher attention (start/stop
|
/// Sending here asks for wal backup launcher attention (start/stop
|
||||||
/// offloading). Sending zttid instead of concrete command allows to do
|
/// offloading). Sending zttid instead of concrete command allows to do
|
||||||
/// sending without timeline lock.
|
/// sending without timeline lock.
|
||||||
@@ -348,7 +300,6 @@ pub struct Timeline {
|
|||||||
impl Timeline {
|
impl Timeline {
|
||||||
fn new(
|
fn new(
|
||||||
zttid: ZTenantTimelineId,
|
zttid: ZTenantTimelineId,
|
||||||
callmemaybe_tx: UnboundedSender<CallmeEvent>,
|
|
||||||
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
|
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
|
||||||
shared_state: SharedState,
|
shared_state: SharedState,
|
||||||
) -> Timeline {
|
) -> Timeline {
|
||||||
@@ -356,7 +307,6 @@ impl Timeline {
|
|||||||
watch::channel(shared_state.sk.inmem.commit_lsn);
|
watch::channel(shared_state.sk.inmem.commit_lsn);
|
||||||
Timeline {
|
Timeline {
|
||||||
zttid,
|
zttid,
|
||||||
callmemaybe_tx,
|
|
||||||
wal_backup_launcher_tx,
|
wal_backup_launcher_tx,
|
||||||
commit_lsn_watch_tx,
|
commit_lsn_watch_tx,
|
||||||
commit_lsn_watch_rx,
|
commit_lsn_watch_rx,
|
||||||
@@ -378,7 +328,7 @@ impl Timeline {
|
|||||||
// should have kind of generations assigned by compute to distinguish
|
// should have kind of generations assigned by compute to distinguish
|
||||||
// the latest one or even pass it through consensus to reliably deliver
|
// the latest one or even pass it through consensus to reliably deliver
|
||||||
// to all safekeepers.
|
// to all safekeepers.
|
||||||
shared_state.callmemaybe_sub(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?;
|
shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned());
|
||||||
}
|
}
|
||||||
// Wake up wal backup launcher, if offloading not started yet.
|
// Wake up wal backup launcher, if offloading not started yet.
|
||||||
if is_wal_backup_action_pending {
|
if is_wal_backup_action_pending {
|
||||||
@@ -414,7 +364,7 @@ impl Timeline {
|
|||||||
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
|
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
|
||||||
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
|
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
|
||||||
if stop {
|
if stop {
|
||||||
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
|
shared_state.deactivate_walsender(&self.zttid);
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -431,16 +381,14 @@ impl Timeline {
|
|||||||
/// Deactivates the timeline, assuming it is being deleted.
|
/// Deactivates the timeline, assuming it is being deleted.
|
||||||
/// Returns whether the timeline was already active.
|
/// Returns whether the timeline was already active.
|
||||||
///
|
///
|
||||||
/// The callmemaybe thread is stopped by the deactivation message. We assume all other threads
|
/// We assume all threads will stop by themselves eventually (possibly with errors, but no panics).
|
||||||
/// will stop by themselves eventually (possibly with errors, but no panics). There should be no
|
/// There should be no compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
|
||||||
/// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
|
|
||||||
/// we're deleting the timeline anyway.
|
/// we're deleting the timeline anyway.
|
||||||
pub async fn deactivate_for_delete(&self) -> Result<bool> {
|
pub async fn deactivate_for_delete(&self) -> Result<bool> {
|
||||||
let was_active: bool;
|
let was_active: bool;
|
||||||
{
|
{
|
||||||
let mut shared_state = self.mutex.lock().unwrap();
|
let shared_state = self.mutex.lock().unwrap();
|
||||||
was_active = shared_state.active;
|
was_active = shared_state.active;
|
||||||
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
|
|
||||||
}
|
}
|
||||||
self.wal_backup_launcher_tx.send(self.zttid).await?;
|
self.wal_backup_launcher_tx.send(self.zttid).await?;
|
||||||
Ok(was_active)
|
Ok(was_active)
|
||||||
@@ -576,7 +524,8 @@ impl Timeline {
|
|||||||
shared_state.sk.inmem.remote_consistent_lsn,
|
shared_state.sk.inmem.remote_consistent_lsn,
|
||||||
)),
|
)),
|
||||||
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
|
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
|
||||||
safekeeper_connection_string: Some(conf.listen_pg_addr.clone()),
|
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
|
||||||
|
pageserver_connstr: shared_state.pageserver_connstr.clone(),
|
||||||
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
|
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -675,14 +624,12 @@ impl TimelineTools for Option<Arc<Timeline>> {
|
|||||||
|
|
||||||
struct GlobalTimelinesState {
|
struct GlobalTimelinesState {
|
||||||
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
|
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
|
||||||
callmemaybe_tx: Option<UnboundedSender<CallmeEvent>>,
|
|
||||||
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
|
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState {
|
static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState {
|
||||||
timelines: HashMap::new(),
|
timelines: HashMap::new(),
|
||||||
callmemaybe_tx: None,
|
|
||||||
wal_backup_launcher_tx: None,
|
wal_backup_launcher_tx: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -697,13 +644,8 @@ pub struct TimelineDeleteForceResult {
|
|||||||
pub struct GlobalTimelines;
|
pub struct GlobalTimelines;
|
||||||
|
|
||||||
impl GlobalTimelines {
|
impl GlobalTimelines {
|
||||||
pub fn init(
|
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
|
||||||
callmemaybe_tx: UnboundedSender<CallmeEvent>,
|
|
||||||
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
|
|
||||||
) {
|
|
||||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||||
assert!(state.callmemaybe_tx.is_none());
|
|
||||||
state.callmemaybe_tx = Some(callmemaybe_tx);
|
|
||||||
assert!(state.wal_backup_launcher_tx.is_none());
|
assert!(state.wal_backup_launcher_tx.is_none());
|
||||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||||
}
|
}
|
||||||
@@ -726,7 +668,6 @@ impl GlobalTimelines {
|
|||||||
|
|
||||||
let new_tli = Arc::new(Timeline::new(
|
let new_tli = Arc::new(Timeline::new(
|
||||||
zttid,
|
zttid,
|
||||||
state.callmemaybe_tx.as_ref().unwrap().clone(),
|
|
||||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||||
shared_state,
|
shared_state,
|
||||||
));
|
));
|
||||||
@@ -778,7 +719,6 @@ impl GlobalTimelines {
|
|||||||
|
|
||||||
let new_tli = Arc::new(Timeline::new(
|
let new_tli = Arc::new(Timeline::new(
|
||||||
zttid,
|
zttid,
|
||||||
state.callmemaybe_tx.as_ref().unwrap().clone(),
|
|
||||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||||
shared_state,
|
shared_state,
|
||||||
));
|
));
|
||||||
|
|||||||
@@ -63,10 +63,11 @@ def test_pageserver_http_get_wal_receiver_not_found(zenith_simple_env: ZenithEnv
|
|||||||
|
|
||||||
tenant_id, timeline_id = env.zenith_cli.create_tenant()
|
tenant_id, timeline_id = env.zenith_cli.create_tenant()
|
||||||
|
|
||||||
# no PG compute node is running, so no WAL receiver is running
|
empty_response = client.wal_receiver_get(tenant_id, timeline_id)
|
||||||
with pytest.raises(ZenithPageserverApiException) as e:
|
|
||||||
_ = client.wal_receiver_get(tenant_id, timeline_id)
|
assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||||
assert "Not Found" in str(e.value)
|
assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||||
|
assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
|
||||||
|
|
||||||
|
|
||||||
def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
|
def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
|
||||||
@@ -81,7 +82,6 @@ def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
|
|||||||
|
|
||||||
# a successful `wal_receiver_get` response must contain the below fields
|
# a successful `wal_receiver_get` response must contain the below fields
|
||||||
assert list(res.keys()) == [
|
assert list(res.keys()) == [
|
||||||
"thread_id",
|
|
||||||
"wal_producer_connstr",
|
"wal_producer_connstr",
|
||||||
"last_received_msg_lsn",
|
"last_received_msg_lsn",
|
||||||
"last_received_msg_ts",
|
"last_received_msg_ts",
|
||||||
|
|||||||
@@ -1600,9 +1600,7 @@ class Postgres(PgProtocol):
|
|||||||
for cfg_line in cfg_lines:
|
for cfg_line in cfg_lines:
|
||||||
# walproposer uses different application_name
|
# walproposer uses different application_name
|
||||||
if ("synchronous_standby_names" in cfg_line or
|
if ("synchronous_standby_names" in cfg_line or
|
||||||
# don't ask pageserver to fetch WAL from compute
|
# don't repeat safekeepers/wal_acceptors multiple times
|
||||||
"callmemaybe_connstring" in cfg_line or
|
|
||||||
# don't repeat safekeepers multiple times
|
|
||||||
"safekeepers" in cfg_line):
|
"safekeepers" in cfg_line):
|
||||||
continue
|
continue
|
||||||
f.write(cfg_line)
|
f.write(cfg_line)
|
||||||
|
|||||||
@@ -13,16 +13,12 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('tenants_count', [1, 5, 10])
|
@pytest.mark.parametrize('tenants_count', [1, 5, 10])
|
||||||
@pytest.mark.parametrize('use_safekeepers', ['with_wa', 'without_wa'])
|
|
||||||
def test_bulk_tenant_create(
|
def test_bulk_tenant_create(
|
||||||
zenith_env_builder: ZenithEnvBuilder,
|
zenith_env_builder: ZenithEnvBuilder,
|
||||||
use_safekeepers: str,
|
|
||||||
tenants_count: int,
|
tenants_count: int,
|
||||||
zenbenchmark,
|
zenbenchmark,
|
||||||
):
|
):
|
||||||
"""Measure tenant creation time (with and without wal acceptors)"""
|
zenith_env_builder.num_safekeepers = 3
|
||||||
if use_safekeepers == 'with_wa':
|
|
||||||
zenith_env_builder.num_safekeepers = 3
|
|
||||||
env = zenith_env_builder.init_start()
|
env = zenith_env_builder.init_start()
|
||||||
|
|
||||||
time_slices = []
|
time_slices = []
|
||||||
@@ -31,15 +27,15 @@ def test_bulk_tenant_create(
|
|||||||
start = timeit.default_timer()
|
start = timeit.default_timer()
|
||||||
|
|
||||||
tenant, _ = env.zenith_cli.create_tenant()
|
tenant, _ = env.zenith_cli.create_tenant()
|
||||||
env.zenith_cli.create_timeline(
|
env.zenith_cli.create_timeline(f'test_bulk_tenant_create_{tenants_count}_{i}',
|
||||||
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
|
tenant_id=tenant)
|
||||||
|
|
||||||
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
|
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
|
||||||
#if use_safekeepers == 'with_sa':
|
#if use_safekeepers == 'with_sa':
|
||||||
# wa_factory.start_n_new(3)
|
# wa_factory.start_n_new(3)
|
||||||
|
|
||||||
pg_tenant = env.postgres.create_start(
|
pg_tenant = env.postgres.create_start(f'test_bulk_tenant_create_{tenants_count}_{i}',
|
||||||
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
|
tenant_id=tenant)
|
||||||
|
|
||||||
end = timeit.default_timer()
|
end = timeit.default_timer()
|
||||||
time_slices.append(end - start)
|
time_slices.append(end - start)
|
||||||
|
|||||||
Reference in New Issue
Block a user