Merge pull request #2692 from neondatabase/main-rc

Release 2022-10-25
This commit is contained in:
Anastasia Lubennikova
2022-10-25 18:18:58 +03:00
committed by GitHub
40 changed files with 968 additions and 534 deletions

View File

@@ -3,7 +3,6 @@ storage:
bucket_name: neon-storage-ireland
bucket_region: eu-west-1
console_mgmt_base_url: http://neon-stress-console.local
env_name: neon-stress
etcd_endpoints: neon-stress-etcd.local:2379
safekeeper_enable_s3_offload: 'false'
pageserver_config_stub:
@@ -12,6 +11,7 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "{{ inventory_hostname }}"
safekeeper_s3_prefix: neon-stress/wal
hostname_suffix: ".local"
remote_user: admin
children:

View File

@@ -1,7 +1,6 @@
---
storage:
vars:
env_name: prod-1
console_mgmt_base_url: http://console-release.local
bucket_name: zenith-storage-oregon
bucket_region: us-west-2
@@ -12,6 +11,7 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "{{ inventory_hostname }}"
safekeeper_s3_prefix: prod-1/wal
hostname_suffix: ".local"
remote_user: admin

View File

@@ -3,7 +3,6 @@ storage:
bucket_name: zenith-staging-storage-us-east-1
bucket_region: us-east-1
console_mgmt_base_url: http://console-staging.local
env_name: us-stage
etcd_endpoints: zenith-us-stage-etcd.local:2379
pageserver_config_stub:
pg_distrib_dir: /usr/local
@@ -11,6 +10,7 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "{{ inventory_hostname }}"
safekeeper_s3_prefix: us-stage/wal
hostname_suffix: ".local"
remote_user: admin

View File

@@ -3,7 +3,6 @@ storage:
bucket_name: neon-staging-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://console-staging.local
env_name: us-stage
etcd_endpoints: etcd-0.us-east-2.aws.neon.build:2379
pageserver_config_stub:
pg_distrib_dir: /usr/local
@@ -11,6 +10,7 @@ storage:
bucket_name: "{{ bucket_name }}"
bucket_region: "{{ bucket_region }}"
prefix_in_bucket: "pageserver/v1"
safekeeper_s3_prefix: safekeeper/v1/wal
hostname_suffix: ""
remote_user: ssm-user
ansible_aws_ssm_region: us-east-2

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}'
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

15
Cargo.lock generated
View File

@@ -3932,6 +3932,16 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.16"
@@ -3942,12 +3952,15 @@ dependencies = [
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
@@ -4042,6 +4055,8 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"strum",
"strum_macros",
"tempfile",
"thiserror",
"tokio",

View File

@@ -9,7 +9,7 @@ ARG TAG=pinned
#
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
@@ -191,7 +191,7 @@ RUN apt update && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
echo "Installing GLIBC 2.34" && \
echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \
apt update && \
apt install -y --no-install-recommends -t testing libc6 && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \

View File

@@ -14,7 +14,7 @@ ARG TAG=pinned
#
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
@@ -196,7 +196,7 @@ RUN apt update && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
echo "Installing GLIBC 2.34" && \
echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \
apt update && \
apt install -y --no-install-recommends -t testing libc6 && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \

View File

@@ -423,11 +423,32 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
);
db_client.simple_query(&alter_query)?;
// // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
// // This is needed since postgres 15, where this privilege is removed by default.
// let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string();
// info!("grant query for db {} : {}", &db.name, &grant_query);
// db_client.simple_query(&grant_query)?;
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
// This is needed because since postgres 15 this privilege is removed by default.
let grant_query = "DO $$\n\
BEGIN\n\
IF EXISTS(\n\
SELECT nspname\n\
FROM pg_catalog.pg_namespace\n\
WHERE nspname = 'public'\n\
) AND\n\
current_setting('server_version_num')::int/10000 >= 15\n\
THEN\n\
IF EXISTS(\n\
SELECT rolname\n\
FROM pg_catalog.pg_roles\n\
WHERE rolname = 'web_access'\n\
)\n\
THEN\n\
GRANT CREATE ON SCHEMA public TO web_access;\n\
END IF;\n\
END IF;\n\
END\n\
$$;"
.to_string();
info!("grant query for db {} : {}", &db.name, &grant_query);
db_client.simple_query(&grant_query)?;
}
Ok(())

View File

@@ -282,9 +282,7 @@ impl PostgresNode {
fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
// wal_log_hints is mandatory when running against pageserver (see gh issue#192)
// TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE?
conf.append("wal_log_hints", "on");
conf.append("wal_log_hints", "off");
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");

View File

@@ -123,7 +123,6 @@ impl SafekeeperNode {
.args(&["--id", self.id.to_string().as_ref()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize"),
);
if !self.conf.sync {

View File

@@ -29,6 +29,9 @@ pub struct SkTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub local_start_lsn: Option<Lsn>,
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,

View File

@@ -19,6 +19,22 @@ pub enum TenantState {
Broken,
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
/// Timeline is fully operational, its background jobs are running.
Active,
/// A timeline is recognized by pageserver, but not yet ready to operate.
/// The status indicates, that the timeline could eventually go back to Active automatically:
/// for example, if the owning tenant goes back to Active again.
Suspended,
/// A timeline is recognized by pageserver, but not yet ready to operate and not allowed to
/// automatically become Active after certain events: only a management call can change this status.
Paused,
/// A timeline is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
@@ -160,6 +176,8 @@ pub struct TimelineInfo {
pub remote_consistent_lsn: Option<Lsn>,
pub awaits_download: bool,
pub state: TimelineState,
// Some of the above fields are duplicated in 'local' and 'remote', for backwards-
// compatility with older clients.
pub local: LocalTimelineInfo,

View File

@@ -19,7 +19,7 @@ thiserror = "1.0"
tokio = { version = "1.17", features = ["macros"]}
tokio-rustls = "0.23"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
nix = "0.25"
signal-hook = "0.3.10"
rand = "0.8.3"
@@ -30,6 +30,8 @@ rustls-split = "0.3.0"
git-version = "0.3.5"
serde_with = "2.0"
once_cell = "1.13.0"
strum = "0.24"
strum_macros = "0.24"
metrics = { path = "../metrics" }

View File

@@ -75,6 +75,12 @@ impl From<[u8; 16]> for Id {
}
}
impl From<Id> for u128 {
fn from(id: Id) -> Self {
u128::from_le_bytes(id.0)
}
}
impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.hex_encode())
@@ -136,6 +142,12 @@ macro_rules! id_newtype {
}
}
impl From<$t> for u128 {
fn from(id: $t) -> Self {
u128::from(id.0)
}
}
impl fmt::Display for $t {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)

View File

@@ -1,11 +1,35 @@
use std::{
fs::{File, OpenOptions},
path::Path,
str::FromStr,
};
use anyhow::{Context, Result};
use strum_macros::{EnumString, EnumVariantNames};
pub fn init(log_filename: impl AsRef<Path>, daemonize: bool) -> Result<File> {
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
Plain,
Json,
}
impl LogFormat {
pub fn from_config(s: &str) -> anyhow::Result<LogFormat> {
use strum::VariantNames;
LogFormat::from_str(s).with_context(|| {
format!(
"Unrecognized log format. Please specify one of: {:?}",
LogFormat::VARIANTS
)
})
}
}
pub fn init(
log_filename: impl AsRef<Path>,
daemonize: bool,
log_format: LogFormat,
) -> Result<File> {
// Don't open the same file for output multiple times;
// the different fds could overwrite each other's output.
let log_file = OpenOptions::new()
@@ -21,22 +45,50 @@ pub fn init(log_filename: impl AsRef<Path>, daemonize: bool) -> Result<File> {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
let x: File = log_file.try_clone().unwrap();
let base_logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false) // don't include event targets
.with_ansi(false); // don't use colors in log file;
.with_target(false)
.with_ansi(false)
.with_writer(move || -> Box<dyn std::io::Write> {
// we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it
// if we do not use daemonization (e.g. in docker) it is better to log to stdout directly
// for example to be in line with docker log command which expects logs comimg from stdout
if daemonize {
Box::new(x.try_clone().unwrap())
} else {
Box::new(std::io::stdout())
}
});
// we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it
// if we do not use daemonization (e.g. in docker) it is better to log to stdout directly
// for example to be in line with docker log command which expects logs comimg from stdout
if daemonize {
let x = log_file.try_clone().unwrap();
base_logger
.with_writer(move || x.try_clone().unwrap())
.init();
} else {
base_logger.init();
match log_format {
LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(),
}
Ok(log_file)
}
// #[cfg(test)]
// Due to global logger, can't run tests in same process.
// So until there's a non-global one, the tests are in ../tests/ as separate files.
#[macro_export(local_inner_macros)]
macro_rules! test_init_file_logger {
($log_level:expr, $log_format:expr) => {{
use std::str::FromStr;
std::env::set_var("RUST_LOG", $log_level);
let tmp_dir = tempfile::TempDir::new().unwrap();
let log_file_path = tmp_dir.path().join("logfile");
let log_format = $crate::logging::LogFormat::from_str($log_format).unwrap();
let _log_file = $crate::logging::init(&log_file_path, true, log_format).unwrap();
let log_file = std::fs::OpenOptions::new()
.read(true)
.open(&log_file_path)
.unwrap();
log_file
}};
}

View File

@@ -0,0 +1,36 @@
// This could be in ../src/logging.rs but since the logger is global, these
// can't be run in threads of the same process
use std::fs::File;
use std::io::{BufRead, BufReader, Lines};
use tracing::*;
use utils::test_init_file_logger;
fn read_lines(file: File) -> Lines<BufReader<File>> {
BufReader::new(file).lines()
}
#[test]
fn test_json_format_has_message_and_custom_field() {
std::env::set_var("RUST_LOG", "info");
let log_file = test_init_file_logger!("info", "json");
let custom_field: &str = "hi";
trace!(custom = %custom_field, "test log message");
debug!(custom = %custom_field, "test log message");
info!(custom = %custom_field, "test log message");
warn!(custom = %custom_field, "test log message");
error!(custom = %custom_field, "test log message");
let lines = read_lines(log_file);
for line in lines {
let content = line.unwrap();
let json_object = serde_json::from_str::<serde_json::Value>(&content).unwrap();
assert_eq!(json_object["fields"]["custom"], "hi");
assert_eq!(json_object["fields"]["message"], "test log message");
assert_ne!(json_object["level"], "TRACE");
assert_ne!(json_object["level"], "DEBUG");
}
}

View File

@@ -0,0 +1,36 @@
// This could be in ../src/logging.rs but since the logger is global, these
// can't be run in threads of the same process
use std::fs::File;
use std::io::{BufRead, BufReader, Lines};
use tracing::*;
use utils::test_init_file_logger;
fn read_lines(file: File) -> Lines<BufReader<File>> {
BufReader::new(file).lines()
}
#[test]
fn test_plain_format_has_message_and_custom_field() {
std::env::set_var("RUST_LOG", "warn");
let log_file = test_init_file_logger!("warn", "plain");
let custom_field: &str = "hi";
trace!(custom = %custom_field, "test log message");
debug!(custom = %custom_field, "test log message");
info!(custom = %custom_field, "test log message");
warn!(custom = %custom_field, "test log message");
error!(custom = %custom_field, "test log message");
let lines = read_lines(log_file);
for line in lines {
let content = line.unwrap();
serde_json::from_str::<serde_json::Value>(&content).unwrap_err();
assert!(content.contains("custom=hi"));
assert!(content.contains("test log message"));
assert!(!content.contains("TRACE"));
assert!(!content.contains("DEBUG"));
assert!(!content.contains("INFO"));
}
}

View File

@@ -199,7 +199,7 @@ fn initialize_config(
fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> {
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
let log_file = logging::init(LOG_FILE_NAME, daemonize, conf.log_format)?;
info!("version: {}", version());

View File

@@ -17,6 +17,7 @@ use toml_edit::{Document, Item};
use url::Url;
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::LogFormat,
postgres_backend::AuthType,
};
@@ -45,6 +46,8 @@ pub mod defaults {
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
pub const DEFAULT_LOG_FORMAT: &str = "plain";
///
/// Default built-in configuration file.
///
@@ -63,6 +66,7 @@ pub mod defaults {
# initial superuser role name to use when creating a new tenant
#initial_superuser_name = '{DEFAULT_SUPERUSER}'
#log_format = '{DEFAULT_LOG_FORMAT}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -126,6 +130,8 @@ pub struct PageServerConf {
/// Etcd broker endpoints to connect to.
pub broker_endpoints: Vec<Url>,
pub log_format: LogFormat,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -192,6 +198,8 @@ struct PageServerConfigBuilder {
profiling: BuilderValue<ProfilingConfig>,
broker_etcd_prefix: BuilderValue<String>,
broker_endpoints: BuilderValue<Vec<Url>>,
log_format: BuilderValue<LogFormat>,
}
impl Default for PageServerConfigBuilder {
@@ -219,6 +227,7 @@ impl Default for PageServerConfigBuilder {
profiling: Set(ProfilingConfig::Disabled),
broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()),
broker_endpoints: Set(Vec::new()),
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
}
}
}
@@ -291,6 +300,10 @@ impl PageServerConfigBuilder {
self.profiling = BuilderValue::Set(profiling)
}
pub fn log_format(&mut self, log_format: LogFormat) {
self.log_format = BuilderValue::Set(log_format)
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let broker_endpoints = self
.broker_endpoints
@@ -335,6 +348,7 @@ impl PageServerConfigBuilder {
broker_etcd_prefix: self
.broker_etcd_prefix
.ok_or(anyhow!("missing broker_etcd_prefix"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
})
}
}
@@ -459,6 +473,9 @@ impl PageServerConf {
})
.collect::<anyhow::Result<_>>()?,
),
"log_format" => builder.log_format(
LogFormat::from_config(&parse_toml_string(key, item)?)?
),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -571,6 +588,7 @@ impl PageServerConf {
default_tenant_conf: TenantConf::dummy_conf(),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
}
}
}
@@ -665,6 +683,8 @@ max_file_descriptors = 333
initial_superuser_name = 'zzzz'
id = 10
log_format = 'json'
"#;
#[test]
@@ -704,6 +724,7 @@ id = 10
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -748,6 +769,7 @@ id = 10
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::Json,
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -618,6 +618,7 @@ components:
- last_record_lsn
- disk_consistent_lsn
- awaits_download
- state
properties:
timeline_id:
type: string
@@ -660,6 +661,8 @@ components:
type: integer
awaits_download:
type: boolean
state:
type: string
# These 'local' and 'remote' fields just duplicate some of the fields
# above. They are kept for backwards-compatibility. They can be removed,

View File

@@ -129,6 +129,7 @@ async fn build_timeline_info(
}
};
let current_physical_size = Some(timeline.get_physical_size());
let state = timeline.current_state();
let info = TimelineInfo {
tenant_id: timeline.tenant_id,
@@ -158,6 +159,7 @@ async fn build_timeline_info(
remote_consistent_lsn,
awaits_download,
state,
// Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility
// with the control plane.
@@ -294,7 +296,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
let timeline_info = async {
let timeline = tokio::task::spawn_blocking(move || {
tenant_mgr::get_tenant(tenant_id, true)?.get_timeline(timeline_id)
tenant_mgr::get_tenant(tenant_id, true)?.get_timeline(timeline_id, false)
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
@@ -331,14 +333,13 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let timeline = tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id))
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
.map_err(ApiError::NotFound)?;
let result = match timeline
.find_lsn_for_timestamp(timestamp_pg)
.map_err(ApiError::InternalServerError)?
{
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Present(lsn) => format!("{lsn}"),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
@@ -788,16 +789,16 @@ async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body
check_permission(&request, Some(tenant_id))?;
// FIXME: currently this will return a 500 error on bad tenant id; it should be 4XX
let repo = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| repo.get_gc_horizon());
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo
let pitr = tenant.get_pitr_interval();
let result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
@@ -812,10 +813,9 @@ async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Bod
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let repo = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = repo
.get_timeline(timeline_id)
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
timeline.compact().map_err(ApiError::InternalServerError)?;
@@ -829,10 +829,9 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let repo = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = repo
.get_timeline(timeline_id)
.with_context(|| format!("No timeline {timeline_id} in repository for tenant {tenant_id}"))
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
timeline
.checkpoint(CheckpointConfig::Forced)

View File

@@ -1060,7 +1060,8 @@ impl postgres_backend_async::Handler for PageServerHandler {
}
fn get_local_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> Result<Arc<Timeline>> {
tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id))
tenant_mgr::get_tenant(tenant_id, true)
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
}
///

View File

@@ -11,7 +11,8 @@
//! parent timeline, and the last LSN that has been written to disk.
//!
use anyhow::{bail, ensure, Context};
use anyhow::{bail, Context};
use pageserver_api::models::TimelineState;
use tokio::sync::watch;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -189,6 +190,7 @@ impl UninitializedTimeline<'_> {
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
)
})?;
new_timeline.set_state(TimelineState::Active);
v.insert(Arc::clone(&new_timeline));
new_timeline.launch_wal_receiver();
}
@@ -338,18 +340,26 @@ impl Tenant {
/// Get Timeline handle for given Neon timeline ID.
/// This function is idempotent. It doesn't change internal state in any way.
pub fn get_timeline(&self, timeline_id: TimelineId) -> anyhow::Result<Arc<Timeline>> {
self.timelines
.lock()
.unwrap()
.get(&timeline_id)
.with_context(|| {
format!(
"Timeline {} was not found for tenant {}",
timeline_id, self.tenant_id
)
})
.map(Arc::clone)
pub fn get_timeline(
&self,
timeline_id: TimelineId,
active_only: bool,
) -> anyhow::Result<Arc<Timeline>> {
let timelines_accessor = self.timelines.lock().unwrap();
let timeline = timelines_accessor.get(&timeline_id).with_context(|| {
format!("Timeline {}/{} was not found", self.tenant_id, timeline_id)
})?;
if active_only && !timeline.is_active() {
anyhow::bail!(
"Timeline {}/{} is not active, state: {:?}",
self.tenant_id,
timeline_id,
timeline.current_state()
)
} else {
Ok(Arc::clone(timeline))
}
}
/// Lists timelines the tenant contains.
@@ -372,6 +382,11 @@ impl Tenant {
initdb_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<UninitializedTimeline> {
anyhow::ensure!(
self.is_active(),
"Cannot create empty timelines on inactive tenant"
);
let timelines = self.timelines.lock().unwrap();
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
drop(timelines);
@@ -408,9 +423,14 @@ impl Tenant {
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
) -> anyhow::Result<Option<Arc<Timeline>>> {
anyhow::ensure!(
self.is_active(),
"Cannot create timelines on inactive tenant"
);
let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate);
if self.get_timeline(new_timeline_id).is_ok() {
if self.get_timeline(new_timeline_id, false).is_ok() {
debug!("timeline {new_timeline_id} already exists");
return Ok(None);
}
@@ -418,7 +438,7 @@ impl Tenant {
let loaded_timeline = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = self
.get_timeline(ancestor_timeline_id)
.get_timeline(ancestor_timeline_id, false)
.context("Cannot branch off the timeline that's not present in pageserver")?;
if let Some(lsn) = ancestor_start_lsn.as_mut() {
@@ -470,6 +490,11 @@ impl Tenant {
pitr: Duration,
checkpoint_before_gc: bool,
) -> anyhow::Result<GcResult> {
anyhow::ensure!(
self.is_active(),
"Cannot run GC iteration on inactive tenant"
);
let timeline_str = target_timeline_id
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
@@ -486,6 +511,11 @@ impl Tenant {
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub fn compaction_iteration(&self) -> anyhow::Result<()> {
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
);
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -493,6 +523,7 @@ impl Tenant {
let timelines = self.timelines.lock().unwrap();
let timelines_to_compact = timelines
.iter()
.filter(|(_, timeline)| timeline.is_active())
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
.collect::<Vec<_>>();
drop(timelines);
@@ -515,13 +546,13 @@ impl Tenant {
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_compact = timelines
let timelines_to_checkpoint = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline)))
.collect::<Vec<_>>();
drop(timelines);
for (timeline_id, timeline) in &timelines_to_compact {
for (timeline_id, timeline) in &timelines_to_checkpoint {
let _entered =
info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id)
.entered();
@@ -543,7 +574,7 @@ impl Tenant {
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
ensure!(
anyhow::ensure!(
!children_exist,
"Cannot delete timeline which has child timelines"
);
@@ -552,7 +583,10 @@ impl Tenant {
Entry::Vacant(_) => bail!("timeline not found"),
};
let layer_removal_guard = timeline_entry.get().layer_removal_guard()?;
let timeline = timeline_entry.get();
timeline.set_state(TimelineState::Paused);
let layer_removal_guard = timeline.layer_removal_guard()?;
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
@@ -569,58 +603,6 @@ impl Tenant {
Ok(())
}
pub fn init_attach_timelines(
&self,
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> anyhow::Result<()> {
let sorted_timelines = if timelines.len() == 1 {
timelines.into_iter().collect()
} else if !timelines.is_empty() {
tree_sort_timelines(timelines)?
} else {
warn!("No timelines to attach received");
return Ok(());
};
let mut timelines_accessor = self.timelines.lock().unwrap();
for (timeline_id, metadata) in sorted_timelines {
info!(
"Attaching timeline {} pg_version {}",
timeline_id,
metadata.pg_version()
);
if timelines_accessor.contains_key(&timeline_id) {
warn!(
"Timeline {}/{} already exists in the tenant map, skipping its initialization",
self.tenant_id, timeline_id
);
continue;
} else {
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((
self.create_timeline_data(timeline_id, metadata, ancestor)
.with_context(|| {
format!("Failed to initialize timeline {timeline_id}")
})?,
TimelineUninitMark::dummy(),
)),
};
let initialized_timeline =
timeline.initialize_with_lock(&mut timelines_accessor, true)?;
timelines_accessor.insert(timeline_id, initialized_timeline);
}
}
Ok(())
}
/// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn.
pub fn get_remote_index(&self) -> &RemoteIndex {
&self.remote_index
@@ -661,10 +643,30 @@ impl Tenant {
}
(_, new_state) => {
self.state.send_replace(new_state);
if self.should_run_tasks() {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
match new_state {
TenantState::Active {
background_jobs_running,
} => {
if background_jobs_running {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
}
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Active);
}
}
TenantState::Paused | TenantState::Broken => {
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Suspended);
}
}
}
}
}
@@ -993,6 +995,7 @@ impl Tenant {
timelines
.iter()
.filter(|(_, timeline)| timeline.is_active())
.map(|(timeline_id, timeline_entry)| {
// This is unresolved question for now, how to do gc in presence of remote timelines
// especially when this is combined with branching.
@@ -1026,7 +1029,7 @@ impl Tenant {
for timeline_id in timeline_ids {
// Timeline is known to be local and loaded.
let timeline = self
.get_timeline(timeline_id)
.get_timeline(timeline_id, false)
.with_context(|| format!("Timeline {timeline_id} was not found"))?;
// If target_timeline is specified, ignore all other timelines
@@ -1111,7 +1114,7 @@ impl Tenant {
// Step 2 is to avoid initializing the new branch using data removed by past GC iterations
// or in-queue GC iterations.
let src_timeline = self.get_timeline(src).with_context(|| {
let src_timeline = self.get_timeline(src, false).with_context(|| {
format!(
"No ancestor {} found for timeline {}/{}",
src, self.tenant_id, dst
@@ -1381,6 +1384,68 @@ impl Tenant {
Ok(uninit_mark)
}
pub(super) fn init_attach_timelines(
&self,
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> anyhow::Result<()> {
let sorted_timelines = if timelines.len() == 1 {
timelines.into_iter().collect()
} else if !timelines.is_empty() {
tree_sort_timelines(timelines)?
} else {
warn!("No timelines to attach received");
return Ok(());
};
let tenant_id = self.tenant_id;
let mut timelines_accessor = self.timelines.lock().unwrap();
for (timeline_id, metadata) in sorted_timelines {
info!(
"Attaching timeline {}/{} pg_version {}",
tenant_id,
timeline_id,
metadata.pg_version()
);
if timelines_accessor.contains_key(&timeline_id) {
warn!("Timeline {tenant_id}/{timeline_id} already exists in the tenant map, skipping its initialization");
continue;
}
let ancestor = metadata
.ancestor_timeline()
.and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id))
.cloned();
let dummy_timeline = self
.create_timeline_data(timeline_id, metadata.clone(), ancestor.clone())
.with_context(|| {
format!("Failed to crate dummy timeline data for {tenant_id}/{timeline_id}")
})?;
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
};
match timeline.initialize_with_lock(&mut timelines_accessor, true) {
Ok(initialized_timeline) => {
timelines_accessor.insert(timeline_id, initialized_timeline);
}
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
let broken_timeline = self
.create_timeline_data(timeline_id, metadata, ancestor)
.with_context(|| {
format!("Failed to crate broken timeline data for {tenant_id}/{timeline_id}")
})?;
broken_timeline.set_state(TimelineState::Broken);
timelines_accessor.insert(timeline_id, Arc::new(broken_timeline));
}
}
}
Ok(())
}
}
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
@@ -1608,6 +1673,9 @@ pub mod harness {
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant.init_attach_timelines(timelines_to_load)?;
tenant.set_state(TenantState::Active {
background_jobs_running: false,
});
Ok(tenant)
}
@@ -1767,7 +1835,7 @@ mod tests {
// Branch the history, modify relation differently on the new timeline
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
let new_writer = newtline.writer();
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
@@ -1923,7 +1991,7 @@ mod tests {
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
@@ -1942,7 +2010,7 @@ mod tests {
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
@@ -1974,7 +2042,7 @@ mod tests {
let tenant = harness.load();
tenant
.get_timeline(TIMELINE_ID)
.get_timeline(TIMELINE_ID, true)
.expect("cannot load timeline");
Ok(())
@@ -1997,7 +2065,7 @@ mod tests {
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
@@ -2009,11 +2077,11 @@ mod tests {
// check that both, child and ancestor are loaded
let _child_tline = tenant
.get_timeline(NEW_TIMELINE_ID)
.get_timeline(NEW_TIMELINE_ID, true)
.expect("cannot get child timeline loaded");
let _ancestor_tline = tenant
.get_timeline(TIMELINE_ID)
.get_timeline(TIMELINE_ID, true)
.expect("cannot get ancestor timeline loaded");
Ok(())
@@ -2267,7 +2335,7 @@ mod tests {
let new_tline_id = TimelineId::generate();
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = tenant
.get_timeline(new_tline_id)
.get_timeline(new_tline_id, true)
.expect("Should have the branched timeline");
tline_id = new_tline_id;
@@ -2330,7 +2398,7 @@ mod tests {
let new_tline_id = TimelineId::generate();
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = tenant
.get_timeline(new_tline_id)
.get_timeline(new_tline_id, true)
.expect("Should have the branched timeline");
tline_id = new_tline_id;

View File

@@ -5,6 +5,8 @@ use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::TimelineState;
use tokio::sync::watch;
use tokio::task::spawn_blocking;
use tracing::*;
@@ -160,6 +162,8 @@ pub struct Timeline {
/// Relation size cache
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
state: watch::Sender<TimelineState>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -416,9 +420,11 @@ impl Timeline {
/// those functions with an LSN that has been processed yet is an error.
///
pub async fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
// This should never be called from the WAL receiver, because that could lead
// to a deadlock.
ensure!(
anyhow::ensure!(
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnection),
"wait_lsn cannot be called in WAL receiver"
);
@@ -635,6 +641,35 @@ impl Timeline {
}
Ok(())
}
pub fn set_state(&self, new_state: TimelineState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
}
(TimelineState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
}
(TimelineState::Paused, TimelineState::Active) => {
debug!("Not activating a paused timeline");
}
(_, new_state) => {
self.state.send_replace(new_state);
}
}
}
pub fn current_state(&self) -> TimelineState {
*self.state.borrow()
}
pub fn is_active(&self) -> bool {
self.current_state() == TimelineState::Active
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
self.state.subscribe()
}
}
// Private functions
@@ -688,8 +723,9 @@ impl Timeline {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
pg_version: u32,
) -> Timeline {
) -> Self {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Suspended);
let mut result = Timeline {
conf,
@@ -746,6 +782,7 @@ impl Timeline {
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
state,
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -883,8 +920,6 @@ impl Timeline {
}
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
let timeline_id = self.timeline_id;
// Atomically check if the timeline size calculation had already started.
// If the flag was not already set, this sets it.
if !self
@@ -901,17 +936,42 @@ impl Timeline {
"initial size calculation",
false,
async move {
let calculated_size = self_clone.calculate_logical_size(init_lsn)?;
let result = spawn_blocking(move || {
self_clone.current_logical_size.initial_logical_size.set(calculated_size)
}).await?;
match result {
Ok(()) => info!("Successfully calculated initial logical size"),
Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"),
let mut timeline_state_updates = self_clone.subscribe_for_state_updates();
let self_calculation = Arc::clone(&self_clone);
tokio::select! {
calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => {
let calculated_size = calculation_result
.context("Failed to spawn calculation result task")?
.context("Failed to calculate logical size")?;
match self_clone.current_logical_size.initial_logical_size.set(calculated_size) {
Ok(()) => info!("Successfully calculated initial logical size"),
Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"),
}
Ok(())
},
new_event = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return Some(new_state),
}
}
Err(_sender_dropped_error) => return None,
}
}
} => {
match new_event {
Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"),
None => info!("Timeline dropped state updates sender, stopping init size calculation"),
}
Ok(())
},
}
Ok(())
}
.instrument(info_span!("initial_logical_size_calculation", timeline = %timeline_id))
}.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)),
);
}
}
@@ -1356,7 +1416,7 @@ impl Timeline {
false,
)?;
if self.upload_layers.load(atomic::Ordering::Relaxed) {
if self.can_upload_layers() {
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
@@ -1826,7 +1886,7 @@ impl Timeline {
}
drop(layers);
if self.upload_layers.load(atomic::Ordering::Relaxed) {
if self.can_upload_layers() {
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
@@ -1930,7 +1990,7 @@ impl Timeline {
/// obsolete.
///
pub(super) fn gc(&self) -> anyhow::Result<GcResult> {
let mut result: GcResult = Default::default();
let mut result: GcResult = GcResult::default();
let now = SystemTime::now();
fail_point!("before-timeline-gc");
@@ -2110,7 +2170,7 @@ impl Timeline {
fail_point!("after-timeline-gc-removed-layers");
}
if self.upload_layers.load(atomic::Ordering::Relaxed) {
if self.can_upload_layers() {
storage_sync::schedule_layer_delete(
self.tenant_id,
self.timeline_id,
@@ -2199,6 +2259,11 @@ impl Timeline {
}
}
}
fn can_upload_layers(&self) -> bool {
self.upload_layers.load(atomic::Ordering::Relaxed)
&& self.current_state() != TimelineState::Broken
}
}
/// Helper function for get_reconstruct_data() to add the path of layers traversed

View File

@@ -175,7 +175,7 @@ async fn wait_for_active_tenant(
}
state => {
debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}");
tokio::time::sleep(wait).await;
continue;
}
}
}

View File

@@ -12,6 +12,7 @@
use std::{
collections::{hash_map, HashMap},
num::NonZeroU64,
ops::ControlFlow,
sync::Arc,
time::Duration,
};
@@ -26,7 +27,8 @@ use etcd_broker::{
subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription,
BrokerUpdate, Client,
};
use tokio::select;
use pageserver_api::models::TimelineState;
use tokio::{select, sync::watch};
use tracing::*;
use crate::{
@@ -58,10 +60,7 @@ pub fn spawn_connection_manager_task(
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!(
"walreceiver for tenant {} timeline {}",
timeline.tenant_id, timeline.timeline_id
),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver broker started, connecting to etcd");
@@ -75,19 +74,21 @@ pub fn spawn_connection_manager_task(
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
// Kill current connection, if any
if let Some(wal_connection) = walreceiver_state.wal_connection.take()
{
wal_connection.connection_task.shutdown().await;
}
walreceiver_state.shutdown().await;
return Ok(());
},
_ = connection_manager_loop_step(
loop_step_result = connection_manager_loop_step(
&broker_loop_prefix,
&mut etcd_client,
&mut walreceiver_state,
) => {},
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
}
},
}
}
}
@@ -104,7 +105,17 @@ async fn connection_manager_loop_step(
broker_prefix: &str,
etcd_client: &mut Client,
walreceiver_state: &mut WalreceiverState,
) {
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates();
match wait_for_active_timeline(&mut timeline_state_updates).await {
ControlFlow::Continue(()) => {}
ControlFlow::Break(()) => {
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
return ControlFlow::Break(());
}
}
let id = TenantTimelineId {
tenant_id: walreceiver_state.timeline.tenant_id,
timeline_id: walreceiver_state.timeline.timeline_id,
@@ -129,10 +140,12 @@ async fn connection_manager_loop_step(
// - change connection if the rules decide so, or if the current connection dies
// - receive updates from broker
// - this might change the current desired connection
// - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
broker_connection_result = &mut broker_subscription.watcher_handle => {
info!("Broker connection was closed from the other side, ending current broker loop step");
cleanup_broker_connection(broker_connection_result, walreceiver_state);
return;
return ControlFlow::Continue(());
},
Some(wal_connection_update) = async {
@@ -185,11 +198,36 @@ async fn connection_manager_loop_step(
(&mut broker_subscription.watcher_handle).await,
walreceiver_state,
);
return;
return ControlFlow::Continue(());
}
}
},
new_event = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = walreceiver_state.timeline.current_state();
match new_state {
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return ControlFlow::Continue(new_state),
}
}
Err(_sender_dropped_error) => return ControlFlow::Break(()),
}
}
} => match new_event {
ControlFlow::Continue(new_state) => {
info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates");
return ControlFlow::Continue(());
}
ControlFlow::Break(()) => {
info!("Timeline dropped state updates sender, stopping wal connection manager loop");
return ControlFlow::Break(());
}
},
_ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {}
}
@@ -216,6 +254,34 @@ async fn connection_manager_loop_step(
}
}
async fn wait_for_active_timeline(
timeline_state_updates: &mut watch::Receiver<TimelineState>,
) -> ControlFlow<(), ()> {
let current_state = *timeline_state_updates.borrow();
if current_state == TimelineState::Active {
return ControlFlow::Continue(());
}
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
TimelineState::Active => {
debug!("Timeline state changed to active, continuing the walreceiver connection manager");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}");
continue;
}
}
}
Err(_sender_dropped_error) => return ControlFlow::Break(()),
}
}
}
fn cleanup_broker_connection(
broker_connection_result: Result<Result<(), etcd_broker::BrokerError>, tokio::task::JoinError>,
walreceiver_state: &mut WalreceiverState,
@@ -723,6 +789,12 @@ impl WalreceiverState {
self.wal_connection_retries.remove(&node_id);
}
}
async fn shutdown(mut self) {
if let Some(wal_connection) = self.wal_connection.take() {
wal_connection.connection_task.shutdown().await;
}
}
}
#[derive(Debug, PartialEq, Eq)]
@@ -801,6 +873,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -817,6 +890,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -833,6 +908,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
},
etcd_version: 0,
@@ -849,6 +925,7 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -908,6 +985,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -924,6 +1003,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -940,6 +1021,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
},
etcd_version: 0,
@@ -974,6 +1057,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1006,6 +1091,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
},
etcd_version: 0,
@@ -1022,6 +1109,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1038,6 +1127,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
@@ -1083,6 +1174,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1099,6 +1192,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1168,6 +1263,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1184,6 +1281,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
},
etcd_version: 0,
@@ -1255,6 +1354,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
@@ -1326,6 +1427,8 @@ mod tests {
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,

12
poetry.lock generated
View File

@@ -514,14 +514,6 @@ python-versions = ">=3.7"
[package.dependencies]
typing-extensions = ">=4.1.0"
[[package]]
name = "cached-property"
version = "1.5.2"
description = "A decorator for caching properties in classes."
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "certifi"
version = "2022.6.15"
@@ -1647,10 +1639,6 @@ botocore-stubs = [
{file = "botocore-stubs-1.27.38.tar.gz", hash = "sha256:408e8b86b5d171b58f81c74ca9d3b5317a5a8e2d3bc2073aa841ac13b8939e56"},
{file = "botocore_stubs-1.27.38-py3-none-any.whl", hash = "sha256:7add7641e9a479a9c8366893bb522fd9ca3d58714201e43662a200a148a1bc38"},
]
cached-property = [
{file = "cached-property-1.5.2.tar.gz", hash = "sha256:9fa5755838eecbb2d234c3aa390bd80fbd3ac6b6869109bfc1b499f7bd89a130"},
{file = "cached_property-1.5.2-py2.py3-none-any.whl", hash = "sha256:df4f613cf7ad9a588cc381aaf4a512d26265ecebd5eb9e1ba12f1319eb85a6a0"},
]
certifi = [
{file = "certifi-2022.6.15-py3-none-any.whl", hash = "sha256:fe86415d55e84719d75f8b69414f6438ac3547d2078ab91b67e779ef69378412"},
{file = "certifi-2022.6.15.tar.gz", hash = "sha256:84c85a9078b11105f04f3036a9482ae10e4621616db313fe045dd24743a0820d"},

View File

@@ -14,7 +14,6 @@ requests = "^2.26.0"
pytest-xdist = "^2.3.0"
asyncpg = "^0.24.0"
aiopg = "^1.3.1"
cached-property = "^1.5.2"
Jinja2 = "^3.0.2"
types-requests = "^2.28.5"
types-psycopg2 = "^2.9.18"
@@ -74,7 +73,6 @@ strict = true
[[tool.mypy.overrides]]
module = [
"asyncpg.*",
"cached_property.*",
"pg8000.*",
]
ignore_missing_imports = true

View File

@@ -21,7 +21,8 @@ use metrics::set_build_info_metric;
use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use safekeeper::http;
use safekeeper::remove_wal;
@@ -31,8 +32,12 @@ use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use utils::auth::JwtAuth;
use utils::{
http::endpoint, id::NodeId, logging, project_git_version, shutdown::exit_now, signals,
tcp_listener,
http::endpoint,
id::NodeId,
logging::{self, LogFormat},
project_git_version,
shutdown::exit_now,
signals, tcp_listener,
};
const LOCK_FILE_NAME: &str = "safekeeper.lock";
@@ -72,10 +77,6 @@ fn main() -> anyhow::Result<()> {
conf.listen_http_addr = addr.to_string();
}
if let Some(recall) = arg_matches.get_one::<String>("recall") {
conf.recall_period = humantime::parse_duration(recall)?;
}
let mut given_id = None;
if let Some(given_id_str) = arg_matches.get_one::<String>("id") {
given_id = Some(NodeId(
@@ -93,6 +94,16 @@ fn main() -> anyhow::Result<()> {
conf.broker_etcd_prefix = prefix.to_string();
}
if let Some(heartbeat_timeout_str) = arg_matches.get_one::<String>("heartbeat-timeout") {
conf.heartbeat_timeout =
humantime::parse_duration(heartbeat_timeout_str).with_context(|| {
format!(
"failed to parse heartbeat-timeout {}",
heartbeat_timeout_str
)
})?;
}
if let Some(backup_threads) = arg_matches.get_one::<String>("wal-backup-threads") {
conf.backup_runtime_threads = backup_threads
.parse()
@@ -105,6 +116,14 @@ fn main() -> anyhow::Result<()> {
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?);
}
if let Some(max_offloader_lag_str) = arg_matches.get_one::<String>("max-offloader-lag") {
conf.max_offloader_lag_bytes = max_offloader_lag_str.parse().with_context(|| {
format!(
"failed to parse max offloader lag {}",
max_offloader_lag_str
)
})?;
}
// Seems like there is no better way to accept bool values explicitly in clap.
conf.wal_backup_enabled = arg_matches
.get_one::<String>("enable-wal-backup")
@@ -116,11 +135,15 @@ fn main() -> anyhow::Result<()> {
.get_one::<String>("auth-validation-public-key-path")
.map(PathBuf::from);
if let Some(log_format) = arg_matches.get_one::<String>("log-format") {
conf.log_format = LogFormat::from_config(log_format)?;
}
start_safekeeper(conf, given_id, arg_matches.get_flag("init"))
}
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bool) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
let log_file = logging::init("safekeeper.log", conf.daemonize, conf.log_format)?;
info!("version: {GIT_VERSION}");
@@ -361,11 +384,6 @@ fn cli() -> Command {
.short('p')
.long("pageserver"),
)
.arg(
Arg::new("recall")
.long("recall")
.help("Period for requestion pageserver to call for replication"),
)
.arg(
Arg::new("daemonize")
.short('d')
@@ -397,6 +415,11 @@ fn cli() -> Command {
.long("broker-etcd-prefix")
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.arg(
Arg::new("heartbeat-timeout")
.long("heartbeat-timeout")
.help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs()))
)
.arg(
Arg::new("wal-backup-threads").long("backup-threads").help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")),
).arg(
@@ -404,6 +427,11 @@ fn cli() -> Command {
.long("remote-storage")
.help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"<BUCKETNAME>\", \"bucket_region\":\"<REGION>\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring structure on the file system.")
)
.arg(
Arg::new("max-offloader-lag")
.long("max-offloader-lag")
.help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG_BYTES / (1 << 20)))
)
.arg(
Arg::new("enable-wal-backup")
.long("enable-wal-backup")
@@ -416,6 +444,11 @@ fn cli() -> Command {
.long("auth-validation-public-key-path")
.help("Path to an RSA .pem public key which is used to check JWT tokens")
)
.arg(
Arg::new("log-format")
.long("log-format")
.help("Format for logging, either 'plain' or 'json'")
)
}
#[test]

View File

@@ -1,6 +1,5 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
@@ -12,11 +11,9 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use url::Url;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -56,113 +53,6 @@ fn timeline_safekeeper_path(
)
}
pub struct Election {
pub election_name: String,
pub candidate_name: String,
pub broker_endpoints: Vec<Url>,
}
impl Election {
pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec<Url>) -> Self {
Self {
election_name,
candidate_name,
broker_endpoints,
}
}
}
pub struct ElectionLeader {
client: Client,
keep_alive: JoinHandle<Result<()>>,
}
impl ElectionLeader {
pub async fn check_am_i(
&mut self,
election_name: String,
candidate_name: String,
) -> Result<bool> {
let resp = self.client.leader(election_name).await?;
let kv = resp
.kv()
.ok_or_else(|| anyhow!("failed to get leader response"))?;
let leader = kv.value_str()?;
Ok(leader == candidate_name)
}
pub async fn give_up(self) {
self.keep_alive.abort();
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
// should we await for keep alive termination?
let _ = self.keep_alive.await;
}
}
pub async fn get_leader(req: &Election, leader: &mut Option<ElectionLeader>) -> Result<()> {
let mut client = Client::connect(req.broker_endpoints.clone(), None)
.await
.context("Could not connect to etcd")?;
let lease = client
.lease_grant(LEASE_TTL_SEC, None)
.await
.context("Could not acquire a lease");
let lease_id = lease.map(|l| l.id()).unwrap();
// kill previous keepalive, if any
if let Some(l) = leader.take() {
l.give_up().await;
}
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
// immediately save handle to kill task if we get canceled below
*leader = Some(ElectionLeader {
client: client.clone(),
keep_alive,
});
client
.campaign(
req.election_name.clone(),
req.candidate_name.clone(),
lease_id,
)
.await?;
Ok(())
}
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
let (mut keeper, mut ka_stream) = client
.lease_keep_alive(lease_id)
.await
.context("failed to create keepalive stream")?;
loop {
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{system_id}")
}
async fn push_sk_info(
ttid: TenantTimelineId,
mut client: Client,
@@ -236,7 +126,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let handles = active_tlis
.iter()
.map(|tli| {
let sk_info = tli.get_public_info(&conf);
let sk_info = tli.get_safekeeper_info(&conf);
let key =
timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id);
let lease = leases.remove(&tli.ttid).unwrap();
@@ -282,6 +172,9 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
Some(new_info) => {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(new_info.key.id) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}

View File

@@ -1,6 +1,7 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry,
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
TermSwitchEntry,
};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
@@ -134,7 +135,7 @@ pub struct SafeKeeperStateV4 {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
pub peers: PersistedPeers,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
@@ -165,7 +166,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to hexing some ids
} else if version == 2 {
@@ -188,7 +189,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
} else if version == 3 {
@@ -211,7 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
// migrate to having timeline_start_lsn
} else if version == 4 {
@@ -234,7 +235,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
peers: PersistedPeers(vec![]),
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);

View File

@@ -1,11 +1,16 @@
use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS;
use defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
//
use remote_storage::RemoteStorageConfig;
use std::path::PathBuf;
use std::time::Duration;
use url::Url;
use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::{
id::{NodeId, TenantId, TenantTimelineId},
logging::LogFormat,
};
pub mod broker;
pub mod control_file;
@@ -34,8 +39,9 @@ pub mod defaults {
DEFAULT_PG_LISTEN_PORT,
};
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
}
#[derive(Debug, Clone)]
@@ -52,7 +58,6 @@ pub struct SafeKeeperConf {
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub recall_period: Duration,
pub remote_storage: Option<RemoteStorageConfig>,
pub backup_runtime_threads: usize,
pub wal_backup_enabled: bool,
@@ -60,6 +65,9 @@ pub struct SafeKeeperConf {
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub auth_validation_public_key_path: Option<PathBuf>,
pub heartbeat_timeout: Duration,
pub max_offloader_lag_bytes: u64,
pub log_format: LogFormat,
}
impl SafeKeeperConf {
@@ -85,13 +93,15 @@ impl Default for SafeKeeperConf {
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
remote_storage: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: NodeId(0),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
auth_validation_public_key_path: None,
heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT,
max_offloader_lag_bytes: DEFAULT_MAX_OFFLOADER_LAG_BYTES,
log_format: LogFormat::Plain,
}
}
}

View File

@@ -11,6 +11,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use tracing::*;
use crate::control_file;
@@ -132,9 +133,8 @@ pub struct ServerInfo {
pub wal_seg_size: u32,
}
/// Data published by safekeeper to the peers
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub struct PersistedPeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
backup_lsn: Lsn,
/// Term of the last entry.
@@ -145,7 +145,7 @@ pub struct PeerInfo {
commit_lsn: Lsn,
}
impl PeerInfo {
impl PersistedPeerInfo {
fn new() -> Self {
Self {
backup_lsn: Lsn::INVALID,
@@ -156,10 +156,8 @@ impl PeerInfo {
}
}
// vector-based node id -> peer state map with very limited functionality we
// need/
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Peers(pub Vec<(NodeId, PeerInfo)>);
pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
@@ -203,7 +201,7 @@ pub struct SafeKeeperState {
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
pub peers: PersistedPeers,
}
#[derive(Debug, Clone)]
@@ -240,7 +238,12 @@ impl SafeKeeperState {
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
peers: PersistedPeers(
peers
.iter()
.map(|p| (*p, PersistedPeerInfo::new()))
.collect(),
),
}
}

View File

@@ -7,7 +7,7 @@ use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::XLogSegNo;
use tokio::sync::watch;
use tokio::{sync::watch, time::Instant};
use std::cmp::{max, min};
@@ -26,7 +26,7 @@ use utils::{
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo,
SafekeeperMemState, ServerInfo, Term,
};
use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
@@ -36,6 +36,53 @@ use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub sk_id: NodeId,
/// Term of the last entry.
_last_log_term: Term,
/// LSN of the last record.
_flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
pub local_start_lsn: Lsn,
/// When info was received.
ts: Instant,
}
impl PeerInfo {
fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id,
_last_log_term: sk_info.last_log_term.unwrap_or(0),
_flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID),
commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID),
local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID),
ts,
}
}
}
// vector-based node id -> peer state map with very limited functionality we
// need.
#[derive(Debug, Clone, Default)]
pub struct PeersInfo(pub Vec<PeerInfo>);
impl PeersInfo {
fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
self.0.iter_mut().find(|p| p.sk_id == id)
}
fn upsert(&mut self, p: &PeerInfo) {
match self.get(p.sk_id) {
Some(rp) => *rp = p.clone(),
None => self.0.push(p.clone()),
}
}
}
/// Replica status update + hot standby feedback
#[derive(Debug, Clone, Copy)]
pub struct ReplicaState {
@@ -74,6 +121,8 @@ impl ReplicaState {
pub struct SharedState {
/// Safekeeper object
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// In memory list containing state of peers sent in latest messages from them.
peers_info: PeersInfo,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// True when WAL backup launcher oversees the timeline, making sure WAL is
@@ -123,7 +172,8 @@ impl SharedState {
Ok(Self {
sk,
replicas: Vec::new(),
peers_info: PeersInfo(vec![]),
replicas: vec![],
wal_backup_active: false,
active: false,
num_computes: 0,
@@ -142,6 +192,7 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
replicas: Vec::new(),
wal_backup_active: false,
active: false,
@@ -201,12 +252,6 @@ impl SharedState {
self.wal_backup_active
}
// Can this safekeeper offload to s3? Recently joined safekeepers might not
// have necessary WAL.
fn can_wal_backup(&self) -> bool {
self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
@@ -268,6 +313,24 @@ impl SharedState {
self.replicas.push(Some(state));
pos
}
fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
SkTimelineInfo {
last_log_term: Some(self.sk.get_epoch()),
flush_lsn: Some(self.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(self.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
self.get_replicas_state().remote_consistent_lsn,
self.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(self.sk.inmem.backup_lsn),
local_start_lsn: Some(self.sk.state.local_start_lsn),
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -517,17 +580,6 @@ impl Timeline {
self.write_shared_state().wal_backup_attend()
}
/// Can this safekeeper offload to s3? Recently joined safekeepers might not
/// have necessary WAL.
pub fn can_wal_backup(&self) -> bool {
if self.is_cancelled() {
return false;
}
let shared_state = self.write_shared_state();
shared_state.can_wal_backup()
}
/// Returns full timeline info, required for the metrics. If the timeline is
/// not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
@@ -632,36 +684,25 @@ impl Timeline {
Ok(())
}
/// Return public safekeeper info for broadcasting to broker and other peers.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
/// Get safekeeper info for broadcasting to broker and other peers.
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
let shared_state = self.write_shared_state();
SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
shared_state.get_replicas_state().remote_consistent_lsn,
shared_state.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
}
shared_state.get_safekeeper_info(conf)
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(
&self,
sk_info: &SkTimelineInfo,
_sk_id: NodeId,
sk_id: NodeId,
) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state();
shared_state.sk.record_safekeeper_info(sk_info)?;
let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
@@ -673,6 +714,22 @@ impl Timeline {
Ok(())
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state();
let now = Instant::now();
shared_state
.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
.cloned()
.collect()
}
/// Add send_wal replica to the in-memory vector of replicas.
pub fn add_replica(&self, state: ReplicaState) -> usize {
self.write_shared_state().add_replica(state)

View File

@@ -1,8 +1,7 @@
use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::task::JoinHandle;
use utils::id::NodeId;
use std::cmp::min;
use std::collections::HashMap;
@@ -26,14 +25,11 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::broker::{Election, ElectionLeader};
use crate::timeline::Timeline;
use crate::{broker, GlobalTimelines, SafeKeeperConf};
use crate::timeline::{PeerInfo, Timeline};
use crate::{GlobalTimelines, SafeKeeperConf};
use once_cell::sync::OnceCell;
const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
@@ -70,47 +66,104 @@ struct WalBackupTimelineEntry {
handle: Option<WalBackupTaskHandle>,
}
/// Start per timeline task, if it makes sense for this safekeeper to offload.
fn consider_start_task(
async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
if let Some(wb_handle) = entry.handle.take() {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
}
/// The goal is to ensure that normally only one safekeepers offloads. However,
/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
/// time we have several ones as they PUT the same files. Also,
/// - frequently changing the offloader would be bad;
/// - electing seriously lagging safekeeper is undesirable;
/// So we deterministically choose among the reasonably caught up candidates.
/// TODO: take into account failed attempts to deal with hypothetical situation
/// where s3 is unreachable only for some sks.
fn determine_offloader(
alive_peers: &[PeerInfo],
wal_backup_lsn: Lsn,
ttid: TenantTimelineId,
conf: &SafeKeeperConf,
) -> (Option<NodeId>, String) {
// TODO: remove this once we fill newly joined safekeepers since backup_lsn.
let capable_peers = alive_peers
.iter()
.filter(|p| p.local_start_lsn <= wal_backup_lsn);
match capable_peers.clone().map(|p| p.commit_lsn).max() {
None => (None, "no connected peers to elect from".to_string()),
Some(max_commit_lsn) => {
let threshold = max_commit_lsn
.checked_sub(conf.max_offloader_lag_bytes)
.unwrap_or(Lsn(0));
let mut caughtup_peers = capable_peers
.clone()
.filter(|p| p.commit_lsn >= threshold)
.collect::<Vec<_>>();
caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
// To distribute the load, shift by timeline_id.
let offloader = caughtup_peers
[(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
.sk_id;
let mut capable_peers_dbg = capable_peers
.map(|p| (p.sk_id, p.commit_lsn))
.collect::<Vec<_>>();
capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
(
Some(offloader),
format!(
"elected {} among {:?} peers, with {} of them being caughtup",
offloader,
capable_peers_dbg,
caughtup_peers.len()
),
)
}
}
}
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
task: &mut WalBackupTimelineEntry,
entry: &mut WalBackupTimelineEntry,
) {
if !task.timeline.can_wal_backup() {
return;
let alive_peers = entry.timeline.get_peers(conf);
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn();
let (offloader, election_dbg_str) =
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
if elected_me != (entry.handle.is_some()) {
if elected_me {
info!("elected for backup {}: {}", ttid, election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
entry.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
} else {
info!("stepping down from backup {}: {}", ttid, election_dbg_str);
shut_down_task(ttid, entry).await;
}
}
info!("starting WAL backup task for {}", ttid);
// TODO: decide who should offload right here by simply checking current
// state instead of running elections in offloading task.
let election_name = SubscriptionKey {
cluster_prefix: conf.broker_etcd_prefix.clone(),
kind: SubscriptionKind::Operation(
ttid,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::WalBackup),
),
}
.watch_key();
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
task.handle = Some(WalBackupTaskHandle {
shutdown_tx,
handle,
});
}
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
@@ -158,27 +211,20 @@ async fn wal_backup_launcher_main_loop(
timeline,
handle: None,
});
consider_start_task(&conf, ttid, entry);
update_task(&conf, ttid, entry).await;
} else {
// need to stop the task
info!("stopping WAL backup task for {}", ttid);
let entry = tasks.remove(&ttid).unwrap();
if let Some(wb_handle) = entry.handle {
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", ttid, e);
}
}
let mut entry = tasks.remove(&ttid).unwrap();
shut_down_task(ttid, &mut entry).await;
}
}
}
// Start known tasks, if needed and possible.
// For each timeline needing offloading, check if this safekeeper
// should do the job and start/stop the task accordingly.
_ = ticker.tick() => {
for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) {
consider_start_task(&conf, *ttid, entry);
for (ttid, entry) in tasks.iter_mut() {
update_task(&conf, *ttid, entry).await;
}
}
}
@@ -190,17 +236,13 @@ struct WalBackupTask {
timeline_dir: PathBuf,
wal_seg_size: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
leader: Option<ElectionLeader>,
election: Election,
}
/// Offload single timeline. Called only after we checked that backup
/// is required (wal_backup_attend) and possible (can_wal_backup).
/// Offload single timeline.
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: PathBuf,
mut shutdown_rx: Receiver<()>,
election: Election,
) {
info!("started");
let res = GlobalTimelines::get(ttid);
@@ -215,8 +257,6 @@ async fn backup_task_main(
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli,
timeline_dir,
leader: None,
election,
};
// task is spinned up only when wal_seg_size already initialized
@@ -229,9 +269,6 @@ async fn backup_task_main(
canceled = true;
}
}
if let Some(l) = wb.leader {
l.give_up().await;
}
info!("task {}", if canceled { "canceled" } else { "terminated" });
}
@@ -239,106 +276,71 @@ impl WalBackupTask {
async fn run(&mut self) {
let mut backup_lsn = Lsn(0);
// election loop
let mut retry_attempt = 0u32;
// offload loop
loop {
let mut retry_attempt = 0u32;
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
}
info!("acquiring leadership");
if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
retry_attempt = 0;
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
retry_attempt = 0;
continue;
}
info!("acquired leadership");
// offload loop
loop {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("failed to set wal_backup_lsn: {}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) =
UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
// Note that backup_lsn can be higher than commit_lsn if we
// don't have much local WAL and others already uploaded
// segments we don't even have.
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
>= commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
if let Some(l) = self.leader.as_mut() {
// Optimization idea for later:
// Avoid checking election leader every time by returning current lease grant expiration time
// Re-check leadership only after expiration time,
// such approach would reduce overhead on write-intensive workloads
match l
.check_am_i(
self.election.election_name.clone(),
self.election.candidate_name.clone(),
)
.await
{
Ok(leader) => {
if !leader {
info!("lost leadership");
break;
}
}
Err(e) => {
warn!("error validating leader, {:?}", e);
break;
}
}
}
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = min(retry_attempt + 1, u32::MAX);
if retry_attempt < u32::MAX {
retry_attempt += 1;
}
}
}

View File

@@ -17,6 +17,7 @@ import uuid
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from enum import Flag, auto
from functools import cached_property
from pathlib import Path
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast
@@ -27,7 +28,6 @@ import jwt
import psycopg2
import pytest
import requests
from cached_property import cached_property
from fixtures.log_helper import log
from fixtures.types import Lsn, TenantId, TimelineId

View File

@@ -70,18 +70,14 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# But all others are broken
# First timeline would not get loaded into pageserver due to corrupt metadata file
with pytest.raises(
Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}"
) as err:
with pytest.raises(Exception, match=f"Timeline {tenant1}/{timeline1} was not found") as err:
pg1.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")
# Second timeline has no ancestors, only the metadata file and no layer files
# We don't have the remote storage enabled, which means timeline is in an incorrect state,
# it's not loaded at all
with pytest.raises(
Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}"
) as err:
with pytest.raises(Exception, match=f"Timeline {tenant2}/{timeline2} was not found") as err:
pg2.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")

View File

@@ -1,10 +1,10 @@
import os.path
import shutil
import subprocess
import threading
import time
from contextlib import closing
from cached_property import threading
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv

View File

@@ -65,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
# check 404
with pytest.raises(
NeonPageserverApiException,
match=f"Timeline {leaf_timeline_id} was not found for tenant {env.initial_tenant}",
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
):
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)