diff --git a/.github/ansible/neon-stress.hosts.yaml b/.github/ansible/neon-stress.hosts.yaml index 8afc9a5be8..dd61ac5a5e 100644 --- a/.github/ansible/neon-stress.hosts.yaml +++ b/.github/ansible/neon-stress.hosts.yaml @@ -3,7 +3,6 @@ storage: bucket_name: neon-storage-ireland bucket_region: eu-west-1 console_mgmt_base_url: http://neon-stress-console.local - env_name: neon-stress etcd_endpoints: neon-stress-etcd.local:2379 safekeeper_enable_s3_offload: 'false' pageserver_config_stub: @@ -12,6 +11,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "{{ inventory_hostname }}" + safekeeper_s3_prefix: neon-stress/wal hostname_suffix: ".local" remote_user: admin children: diff --git a/.github/ansible/production.hosts.yaml b/.github/ansible/production.hosts.yaml index 9f9b12d25d..bca2614399 100644 --- a/.github/ansible/production.hosts.yaml +++ b/.github/ansible/production.hosts.yaml @@ -1,7 +1,6 @@ --- storage: vars: - env_name: prod-1 console_mgmt_base_url: http://console-release.local bucket_name: zenith-storage-oregon bucket_region: us-west-2 @@ -12,6 +11,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "{{ inventory_hostname }}" + safekeeper_s3_prefix: prod-1/wal hostname_suffix: ".local" remote_user: admin diff --git a/.github/ansible/staging.hosts.yaml b/.github/ansible/staging.hosts.yaml index 7e91e8e728..44d971455d 100644 --- a/.github/ansible/staging.hosts.yaml +++ b/.github/ansible/staging.hosts.yaml @@ -3,7 +3,6 @@ storage: bucket_name: zenith-staging-storage-us-east-1 bucket_region: us-east-1 console_mgmt_base_url: http://console-staging.local - env_name: us-stage etcd_endpoints: zenith-us-stage-etcd.local:2379 pageserver_config_stub: pg_distrib_dir: /usr/local @@ -11,6 +10,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "{{ inventory_hostname }}" + safekeeper_s3_prefix: us-stage/wal hostname_suffix: ".local" remote_user: admin diff --git a/.github/ansible/staging.us-east-2.hosts.yaml b/.github/ansible/staging.us-east-2.hosts.yaml index 5da0cce973..db3ed87c45 100644 --- a/.github/ansible/staging.us-east-2.hosts.yaml +++ b/.github/ansible/staging.us-east-2.hosts.yaml @@ -3,7 +3,6 @@ storage: bucket_name: neon-staging-storage-us-east-2 bucket_region: us-east-2 console_mgmt_base_url: http://console-staging.local - env_name: us-stage etcd_endpoints: etcd-0.us-east-2.aws.neon.build:2379 pageserver_config_stub: pg_distrib_dir: /usr/local @@ -11,6 +10,7 @@ storage: bucket_name: "{{ bucket_name }}" bucket_region: "{{ bucket_region }}" prefix_in_bucket: "pageserver/v1" + safekeeper_s3_prefix: safekeeper/v1/wal hostname_suffix: "" remote_user: ssm-user ansible_aws_ssm_region: us-east-2 diff --git a/.github/ansible/systemd/safekeeper.service b/.github/ansible/systemd/safekeeper.service index 877579fbfa..69827e36ac 100644 --- a/.github/ansible/systemd/safekeeper.service +++ b/.github/ansible/systemd/safekeeper.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=safekeeper Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib -ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}' +ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}' ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT diff --git a/Cargo.lock b/Cargo.lock index 657baf5d80..13774f7fe6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3932,6 +3932,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" @@ -3942,12 +3952,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -4042,6 +4055,8 @@ dependencies = [ "serde_json", "serde_with", "signal-hook", + "strum", + "strum_macros", "tempfile", "thiserror", "tokio", diff --git a/Dockerfile.compute-node-v14 b/Dockerfile.compute-node-v14 index f5ccdf7e99..6d2b285fa3 100644 --- a/Dockerfile.compute-node-v14 +++ b/Dockerfile.compute-node-v14 @@ -9,7 +9,7 @@ ARG TAG=pinned # FROM debian:bullseye-slim AS build-deps RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update RUN apt update && \ apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \ @@ -191,7 +191,7 @@ RUN apt update && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ echo "Installing GLIBC 2.34" && \ echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update && \ apt install -y --no-install-recommends -t testing libc6 && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ diff --git a/Dockerfile.compute-node-v15 b/Dockerfile.compute-node-v15 index ec555ad932..b7b1f25103 100644 --- a/Dockerfile.compute-node-v15 +++ b/Dockerfile.compute-node-v15 @@ -14,7 +14,7 @@ ARG TAG=pinned # FROM debian:bullseye-slim AS build-deps RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update RUN apt update && \ apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \ @@ -196,7 +196,7 @@ RUN apt update && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ echo "Installing GLIBC 2.34" && \ echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \ - echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \ + echo "Package: *\nPin: release n=bullseye\nPin-Priority: 50" > /etc/apt/preferences && \ apt update && \ apt install -y --no-install-recommends -t testing libc6 && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 1e7cd51b6e..58c94d74ae 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -423,11 +423,32 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { ); db_client.simple_query(&alter_query)?; - // // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. - // // This is needed since postgres 15, where this privilege is removed by default. - // let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string(); - // info!("grant query for db {} : {}", &db.name, &grant_query); - // db_client.simple_query(&grant_query)?; + // Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user. + // This is needed because since postgres 15 this privilege is removed by default. + let grant_query = "DO $$\n\ + BEGIN\n\ + IF EXISTS(\n\ + SELECT nspname\n\ + FROM pg_catalog.pg_namespace\n\ + WHERE nspname = 'public'\n\ + ) AND\n\ + current_setting('server_version_num')::int/10000 >= 15\n\ + THEN\n\ + IF EXISTS(\n\ + SELECT rolname\n\ + FROM pg_catalog.pg_roles\n\ + WHERE rolname = 'web_access'\n\ + )\n\ + THEN\n\ + GRANT CREATE ON SCHEMA public TO web_access;\n\ + END IF;\n\ + END IF;\n\ + END\n\ + $$;" + .to_string(); + + info!("grant query for db {} : {}", &db.name, &grant_query); + db_client.simple_query(&grant_query)?; } Ok(()) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 9f32ad31c1..b3f90b5922 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -282,9 +282,7 @@ impl PostgresNode { fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> { let mut conf = PostgresConf::new(); conf.append("max_wal_senders", "10"); - // wal_log_hints is mandatory when running against pageserver (see gh issue#192) - // TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE? - conf.append("wal_log_hints", "on"); + conf.append("wal_log_hints", "off"); conf.append("max_replication_slots", "10"); conf.append("hot_standby", "on"); conf.append("shared_buffers", "1MB"); diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 64a89124d2..17f5d0c109 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -123,7 +123,6 @@ impl SafekeeperNode { .args(&["--id", self.id.to_string().as_ref()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) - .args(&["--recall", "1 second"]) .arg("--daemonize"), ); if !self.conf.sync { diff --git a/libs/etcd_broker/src/subscription_value.rs b/libs/etcd_broker/src/subscription_value.rs index d3e2011761..60a5411926 100644 --- a/libs/etcd_broker/src/subscription_value.rs +++ b/libs/etcd_broker/src/subscription_value.rs @@ -29,6 +29,9 @@ pub struct SkTimelineInfo { #[serde_as(as = "Option")] #[serde(default)] pub peer_horizon_lsn: Option, + #[serde_as(as = "Option")] + #[serde(default)] + pub local_start_lsn: Option, /// A connection string to use for WAL receiving. #[serde(default)] pub safekeeper_connstr: Option, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a153f1a01e..dd40ba9e1c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -19,6 +19,22 @@ pub enum TenantState { Broken, } +/// A state of a timeline in pageserver's memory. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum TimelineState { + /// Timeline is fully operational, its background jobs are running. + Active, + /// A timeline is recognized by pageserver, but not yet ready to operate. + /// The status indicates, that the timeline could eventually go back to Active automatically: + /// for example, if the owning tenant goes back to Active again. + Suspended, + /// A timeline is recognized by pageserver, but not yet ready to operate and not allowed to + /// automatically become Active after certain events: only a management call can change this status. + Paused, + /// A timeline is recognized by the pageserver, but no longer used for any operations, as failed to get activated. + Broken, +} + #[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { @@ -160,6 +176,8 @@ pub struct TimelineInfo { pub remote_consistent_lsn: Option, pub awaits_download: bool, + pub state: TimelineState, + // Some of the above fields are duplicated in 'local' and 'remote', for backwards- // compatility with older clients. pub local: LocalTimelineInfo, diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index a7baddada4..1753ee81b9 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -19,7 +19,7 @@ thiserror = "1.0" tokio = { version = "1.17", features = ["macros"]} tokio-rustls = "0.23" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } nix = "0.25" signal-hook = "0.3.10" rand = "0.8.3" @@ -30,6 +30,8 @@ rustls-split = "0.3.0" git-version = "0.3.5" serde_with = "2.0" once_cell = "1.13.0" +strum = "0.24" +strum_macros = "0.24" metrics = { path = "../metrics" } diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index 059ce69ca4..f245f7c3d4 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -75,6 +75,12 @@ impl From<[u8; 16]> for Id { } } +impl From for u128 { + fn from(id: Id) -> Self { + u128::from_le_bytes(id.0) + } +} + impl fmt::Display for Id { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.hex_encode()) @@ -136,6 +142,12 @@ macro_rules! id_newtype { } } + impl From<$t> for u128 { + fn from(id: $t) -> Self { + u128::from(id.0) + } + } + impl fmt::Display for $t { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 1576a54c8e..31c0e02f98 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,11 +1,35 @@ use std::{ fs::{File, OpenOptions}, path::Path, + str::FromStr, }; use anyhow::{Context, Result}; +use strum_macros::{EnumString, EnumVariantNames}; -pub fn init(log_filename: impl AsRef, daemonize: bool) -> Result { +#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] +#[strum(serialize_all = "snake_case")] +pub enum LogFormat { + Plain, + Json, +} + +impl LogFormat { + pub fn from_config(s: &str) -> anyhow::Result { + use strum::VariantNames; + LogFormat::from_str(s).with_context(|| { + format!( + "Unrecognized log format. Please specify one of: {:?}", + LogFormat::VARIANTS + ) + }) + } +} +pub fn init( + log_filename: impl AsRef, + daemonize: bool, + log_format: LogFormat, +) -> Result { // Don't open the same file for output multiple times; // the different fds could overwrite each other's output. let log_file = OpenOptions::new() @@ -21,22 +45,50 @@ pub fn init(log_filename: impl AsRef, daemonize: bool) -> Result { let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str)); + let x: File = log_file.try_clone().unwrap(); let base_logger = tracing_subscriber::fmt() .with_env_filter(env_filter) - .with_target(false) // don't include event targets - .with_ansi(false); // don't use colors in log file; + .with_target(false) + .with_ansi(false) + .with_writer(move || -> Box { + // we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it + // if we do not use daemonization (e.g. in docker) it is better to log to stdout directly + // for example to be in line with docker log command which expects logs comimg from stdout + if daemonize { + Box::new(x.try_clone().unwrap()) + } else { + Box::new(std::io::stdout()) + } + }); - // we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it - // if we do not use daemonization (e.g. in docker) it is better to log to stdout directly - // for example to be in line with docker log command which expects logs comimg from stdout - if daemonize { - let x = log_file.try_clone().unwrap(); - base_logger - .with_writer(move || x.try_clone().unwrap()) - .init(); - } else { - base_logger.init(); + match log_format { + LogFormat::Json => base_logger.json().init(), + LogFormat::Plain => base_logger.init(), } Ok(log_file) } + +// #[cfg(test)] +// Due to global logger, can't run tests in same process. +// So until there's a non-global one, the tests are in ../tests/ as separate files. +#[macro_export(local_inner_macros)] +macro_rules! test_init_file_logger { + ($log_level:expr, $log_format:expr) => {{ + use std::str::FromStr; + std::env::set_var("RUST_LOG", $log_level); + + let tmp_dir = tempfile::TempDir::new().unwrap(); + let log_file_path = tmp_dir.path().join("logfile"); + + let log_format = $crate::logging::LogFormat::from_str($log_format).unwrap(); + let _log_file = $crate::logging::init(&log_file_path, true, log_format).unwrap(); + + let log_file = std::fs::OpenOptions::new() + .read(true) + .open(&log_file_path) + .unwrap(); + + log_file + }}; +} diff --git a/libs/utils/tests/logger_json_test.rs b/libs/utils/tests/logger_json_test.rs new file mode 100644 index 0000000000..5d63b9b004 --- /dev/null +++ b/libs/utils/tests/logger_json_test.rs @@ -0,0 +1,36 @@ +// This could be in ../src/logging.rs but since the logger is global, these +// can't be run in threads of the same process +use std::fs::File; +use std::io::{BufRead, BufReader, Lines}; +use tracing::*; +use utils::test_init_file_logger; + +fn read_lines(file: File) -> Lines> { + BufReader::new(file).lines() +} + +#[test] +fn test_json_format_has_message_and_custom_field() { + std::env::set_var("RUST_LOG", "info"); + + let log_file = test_init_file_logger!("info", "json"); + + let custom_field: &str = "hi"; + trace!(custom = %custom_field, "test log message"); + debug!(custom = %custom_field, "test log message"); + info!(custom = %custom_field, "test log message"); + warn!(custom = %custom_field, "test log message"); + error!(custom = %custom_field, "test log message"); + + let lines = read_lines(log_file); + for line in lines { + let content = line.unwrap(); + let json_object = serde_json::from_str::(&content).unwrap(); + + assert_eq!(json_object["fields"]["custom"], "hi"); + assert_eq!(json_object["fields"]["message"], "test log message"); + + assert_ne!(json_object["level"], "TRACE"); + assert_ne!(json_object["level"], "DEBUG"); + } +} diff --git a/libs/utils/tests/logger_plain_test.rs b/libs/utils/tests/logger_plain_test.rs new file mode 100644 index 0000000000..bc5abf45dd --- /dev/null +++ b/libs/utils/tests/logger_plain_test.rs @@ -0,0 +1,36 @@ +// This could be in ../src/logging.rs but since the logger is global, these +// can't be run in threads of the same process +use std::fs::File; +use std::io::{BufRead, BufReader, Lines}; +use tracing::*; +use utils::test_init_file_logger; + +fn read_lines(file: File) -> Lines> { + BufReader::new(file).lines() +} + +#[test] +fn test_plain_format_has_message_and_custom_field() { + std::env::set_var("RUST_LOG", "warn"); + + let log_file = test_init_file_logger!("warn", "plain"); + + let custom_field: &str = "hi"; + trace!(custom = %custom_field, "test log message"); + debug!(custom = %custom_field, "test log message"); + info!(custom = %custom_field, "test log message"); + warn!(custom = %custom_field, "test log message"); + error!(custom = %custom_field, "test log message"); + + let lines = read_lines(log_file); + for line in lines { + let content = line.unwrap(); + serde_json::from_str::(&content).unwrap_err(); + assert!(content.contains("custom=hi")); + assert!(content.contains("test log message")); + + assert!(!content.contains("TRACE")); + assert!(!content.contains("DEBUG")); + assert!(!content.contains("INFO")); + } +} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9317dd5dd7..802352be90 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -199,7 +199,7 @@ fn initialize_config( fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> { // Initialize logger - let log_file = logging::init(LOG_FILE_NAME, daemonize)?; + let log_file = logging::init(LOG_FILE_NAME, daemonize, conf.log_format)?; info!("version: {}", version()); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 4f80fc96b5..6a372fb081 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -17,6 +17,7 @@ use toml_edit::{Document, Item}; use url::Url; use utils::{ id::{NodeId, TenantId, TimelineId}, + logging::LogFormat, postgres_backend::AuthType, }; @@ -45,6 +46,8 @@ pub mod defaults { pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192; pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100; + pub const DEFAULT_LOG_FORMAT: &str = "plain"; + /// /// Default built-in configuration file. /// @@ -63,6 +66,7 @@ pub mod defaults { # initial superuser role name to use when creating a new tenant #initial_superuser_name = '{DEFAULT_SUPERUSER}' +#log_format = '{DEFAULT_LOG_FORMAT}' # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -126,6 +130,8 @@ pub struct PageServerConf { /// Etcd broker endpoints to connect to. pub broker_endpoints: Vec, + + pub log_format: LogFormat, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -192,6 +198,8 @@ struct PageServerConfigBuilder { profiling: BuilderValue, broker_etcd_prefix: BuilderValue, broker_endpoints: BuilderValue>, + + log_format: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -219,6 +227,7 @@ impl Default for PageServerConfigBuilder { profiling: Set(ProfilingConfig::Disabled), broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()), broker_endpoints: Set(Vec::new()), + log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), } } } @@ -291,6 +300,10 @@ impl PageServerConfigBuilder { self.profiling = BuilderValue::Set(profiling) } + pub fn log_format(&mut self, log_format: LogFormat) { + self.log_format = BuilderValue::Set(log_format) + } + pub fn build(self) -> anyhow::Result { let broker_endpoints = self .broker_endpoints @@ -335,6 +348,7 @@ impl PageServerConfigBuilder { broker_etcd_prefix: self .broker_etcd_prefix .ok_or(anyhow!("missing broker_etcd_prefix"))?, + log_format: self.log_format.ok_or(anyhow!("missing log_format"))?, }) } } @@ -459,6 +473,9 @@ impl PageServerConf { }) .collect::>()?, ), + "log_format" => builder.log_format( + LogFormat::from_config(&parse_toml_string(key, item)?)? + ), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -571,6 +588,7 @@ impl PageServerConf { default_tenant_conf: TenantConf::dummy_conf(), broker_endpoints: Vec::new(), broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), + log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), } } } @@ -665,6 +683,8 @@ max_file_descriptors = 333 initial_superuser_name = 'zzzz' id = 10 +log_format = 'json' + "#; #[test] @@ -704,6 +724,7 @@ id = 10 .parse() .expect("Failed to parse a valid broker endpoint URL")], broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), + log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), }, "Correct defaults should be used when no config values are provided" ); @@ -748,6 +769,7 @@ id = 10 .parse() .expect("Failed to parse a valid broker endpoint URL")], broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), + log_format: LogFormat::Json, }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 626cc07429..89609f5674 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -618,6 +618,7 @@ components: - last_record_lsn - disk_consistent_lsn - awaits_download + - state properties: timeline_id: type: string @@ -660,6 +661,8 @@ components: type: integer awaits_download: type: boolean + state: + type: string # These 'local' and 'remote' fields just duplicate some of the fields # above. They are kept for backwards-compatibility. They can be removed, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 489adbb2cf..8ec7604b8a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -129,6 +129,7 @@ async fn build_timeline_info( } }; let current_physical_size = Some(timeline.get_physical_size()); + let state = timeline.current_state(); let info = TimelineInfo { tenant_id: timeline.tenant_id, @@ -158,6 +159,7 @@ async fn build_timeline_info( remote_consistent_lsn, awaits_download, + state, // Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility // with the control plane. @@ -294,7 +296,7 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result format!("{}", lsn), + LsnForTimestamp::Present(lsn) => format!("{lsn}"), LsnForTimestamp::Future(_lsn) => "future".into(), LsnForTimestamp::Past(_lsn) => "past".into(), LsnForTimestamp::NoData(_lsn) => "nodata".into(), @@ -788,16 +789,16 @@ async fn timeline_gc_handler(mut request: Request) -> Result) -> Result) -> Result Result> { - tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id)) + tenant_mgr::get_tenant(tenant_id, true) + .and_then(|tenant| tenant.get_timeline(timeline_id, true)) } /// diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 69c89a80b4..84833e9c40 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -11,7 +11,8 @@ //! parent timeline, and the last LSN that has been written to disk. //! -use anyhow::{bail, ensure, Context}; +use anyhow::{bail, Context}; +use pageserver_api::models::TimelineState; use tokio::sync::watch; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -189,6 +190,7 @@ impl UninitializedTimeline<'_> { "Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}" ) })?; + new_timeline.set_state(TimelineState::Active); v.insert(Arc::clone(&new_timeline)); new_timeline.launch_wal_receiver(); } @@ -338,18 +340,26 @@ impl Tenant { /// Get Timeline handle for given Neon timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - pub fn get_timeline(&self, timeline_id: TimelineId) -> anyhow::Result> { - self.timelines - .lock() - .unwrap() - .get(&timeline_id) - .with_context(|| { - format!( - "Timeline {} was not found for tenant {}", - timeline_id, self.tenant_id - ) - }) - .map(Arc::clone) + pub fn get_timeline( + &self, + timeline_id: TimelineId, + active_only: bool, + ) -> anyhow::Result> { + let timelines_accessor = self.timelines.lock().unwrap(); + let timeline = timelines_accessor.get(&timeline_id).with_context(|| { + format!("Timeline {}/{} was not found", self.tenant_id, timeline_id) + })?; + + if active_only && !timeline.is_active() { + anyhow::bail!( + "Timeline {}/{} is not active, state: {:?}", + self.tenant_id, + timeline_id, + timeline.current_state() + ) + } else { + Ok(Arc::clone(timeline)) + } } /// Lists timelines the tenant contains. @@ -372,6 +382,11 @@ impl Tenant { initdb_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot create empty timelines on inactive tenant" + ); + let timelines = self.timelines.lock().unwrap(); let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; drop(timelines); @@ -408,9 +423,14 @@ impl Tenant { mut ancestor_start_lsn: Option, pg_version: u32, ) -> anyhow::Result>> { + anyhow::ensure!( + self.is_active(), + "Cannot create timelines on inactive tenant" + ); + let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate); - if self.get_timeline(new_timeline_id).is_ok() { + if self.get_timeline(new_timeline_id, false).is_ok() { debug!("timeline {new_timeline_id} already exists"); return Ok(None); } @@ -418,7 +438,7 @@ impl Tenant { let loaded_timeline = match ancestor_timeline_id { Some(ancestor_timeline_id) => { let ancestor_timeline = self - .get_timeline(ancestor_timeline_id) + .get_timeline(ancestor_timeline_id, false) .context("Cannot branch off the timeline that's not present in pageserver")?; if let Some(lsn) = ancestor_start_lsn.as_mut() { @@ -470,6 +490,11 @@ impl Tenant { pitr: Duration, checkpoint_before_gc: bool, ) -> anyhow::Result { + anyhow::ensure!( + self.is_active(), + "Cannot run GC iteration on inactive tenant" + ); + let timeline_str = target_timeline_id .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); @@ -486,6 +511,11 @@ impl Tenant { /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. pub fn compaction_iteration(&self) -> anyhow::Result<()> { + anyhow::ensure!( + self.is_active(), + "Cannot run compaction iteration on inactive tenant" + ); + // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -493,6 +523,7 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) .collect::>(); drop(timelines); @@ -515,13 +546,13 @@ impl Tenant { // checkpoints. We don't want to block everything else while the // checkpoint runs. let timelines = self.timelines.lock().unwrap(); - let timelines_to_compact = timelines + let timelines_to_checkpoint = timelines .iter() .map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline))) .collect::>(); drop(timelines); - for (timeline_id, timeline) in &timelines_to_compact { + for (timeline_id, timeline) in &timelines_to_checkpoint { let _entered = info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id) .entered(); @@ -543,7 +574,7 @@ impl Tenant { .iter() .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); - ensure!( + anyhow::ensure!( !children_exist, "Cannot delete timeline which has child timelines" ); @@ -552,7 +583,10 @@ impl Tenant { Entry::Vacant(_) => bail!("timeline not found"), }; - let layer_removal_guard = timeline_entry.get().layer_removal_guard()?; + let timeline = timeline_entry.get(); + timeline.set_state(TimelineState::Paused); + + let layer_removal_guard = timeline.layer_removal_guard()?; let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { @@ -569,58 +603,6 @@ impl Tenant { Ok(()) } - pub fn init_attach_timelines( - &self, - timelines: HashMap, - ) -> anyhow::Result<()> { - let sorted_timelines = if timelines.len() == 1 { - timelines.into_iter().collect() - } else if !timelines.is_empty() { - tree_sort_timelines(timelines)? - } else { - warn!("No timelines to attach received"); - return Ok(()); - }; - - let mut timelines_accessor = self.timelines.lock().unwrap(); - for (timeline_id, metadata) in sorted_timelines { - info!( - "Attaching timeline {} pg_version {}", - timeline_id, - metadata.pg_version() - ); - - if timelines_accessor.contains_key(&timeline_id) { - warn!( - "Timeline {}/{} already exists in the tenant map, skipping its initialization", - self.tenant_id, timeline_id - ); - continue; - } else { - let ancestor = metadata - .ancestor_timeline() - .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) - .cloned(); - let timeline = UninitializedTimeline { - owning_tenant: self, - timeline_id, - raw_timeline: Some(( - self.create_timeline_data(timeline_id, metadata, ancestor) - .with_context(|| { - format!("Failed to initialize timeline {timeline_id}") - })?, - TimelineUninitMark::dummy(), - )), - }; - let initialized_timeline = - timeline.initialize_with_lock(&mut timelines_accessor, true)?; - timelines_accessor.insert(timeline_id, initialized_timeline); - } - } - - Ok(()) - } - /// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn. pub fn get_remote_index(&self) -> &RemoteIndex { &self.remote_index @@ -661,10 +643,30 @@ impl Tenant { } (_, new_state) => { self.state.send_replace(new_state); - if self.should_run_tasks() { - // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - crate::tenant_tasks::start_background_loops(self.tenant_id); + + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + match new_state { + TenantState::Active { + background_jobs_running, + } => { + if background_jobs_running { + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + crate::tenant_tasks::start_background_loops(self.tenant_id); + } + + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Active); + } + } + TenantState::Paused | TenantState::Broken => { + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Suspended); + } + } } } } @@ -993,6 +995,7 @@ impl Tenant { timelines .iter() + .filter(|(_, timeline)| timeline.is_active()) .map(|(timeline_id, timeline_entry)| { // This is unresolved question for now, how to do gc in presence of remote timelines // especially when this is combined with branching. @@ -1026,7 +1029,7 @@ impl Tenant { for timeline_id in timeline_ids { // Timeline is known to be local and loaded. let timeline = self - .get_timeline(timeline_id) + .get_timeline(timeline_id, false) .with_context(|| format!("Timeline {timeline_id} was not found"))?; // If target_timeline is specified, ignore all other timelines @@ -1111,7 +1114,7 @@ impl Tenant { // Step 2 is to avoid initializing the new branch using data removed by past GC iterations // or in-queue GC iterations. - let src_timeline = self.get_timeline(src).with_context(|| { + let src_timeline = self.get_timeline(src, false).with_context(|| { format!( "No ancestor {} found for timeline {}/{}", src, self.tenant_id, dst @@ -1381,6 +1384,68 @@ impl Tenant { Ok(uninit_mark) } + + pub(super) fn init_attach_timelines( + &self, + timelines: HashMap, + ) -> anyhow::Result<()> { + let sorted_timelines = if timelines.len() == 1 { + timelines.into_iter().collect() + } else if !timelines.is_empty() { + tree_sort_timelines(timelines)? + } else { + warn!("No timelines to attach received"); + return Ok(()); + }; + + let tenant_id = self.tenant_id; + let mut timelines_accessor = self.timelines.lock().unwrap(); + for (timeline_id, metadata) in sorted_timelines { + info!( + "Attaching timeline {}/{} pg_version {}", + tenant_id, + timeline_id, + metadata.pg_version() + ); + + if timelines_accessor.contains_key(&timeline_id) { + warn!("Timeline {tenant_id}/{timeline_id} already exists in the tenant map, skipping its initialization"); + continue; + } + + let ancestor = metadata + .ancestor_timeline() + .and_then(|ancestor_timeline_id| timelines_accessor.get(&ancestor_timeline_id)) + .cloned(); + let dummy_timeline = self + .create_timeline_data(timeline_id, metadata.clone(), ancestor.clone()) + .with_context(|| { + format!("Failed to crate dummy timeline data for {tenant_id}/{timeline_id}") + })?; + let timeline = UninitializedTimeline { + owning_tenant: self, + timeline_id, + raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())), + }; + match timeline.initialize_with_lock(&mut timelines_accessor, true) { + Ok(initialized_timeline) => { + timelines_accessor.insert(timeline_id, initialized_timeline); + } + Err(e) => { + error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); + let broken_timeline = self + .create_timeline_data(timeline_id, metadata, ancestor) + .with_context(|| { + format!("Failed to crate broken timeline data for {tenant_id}/{timeline_id}") + })?; + broken_timeline.set_state(TimelineState::Broken); + timelines_accessor.insert(timeline_id, Arc::new(broken_timeline)); + } + } + } + + Ok(()) + } } /// Create the cluster temporarily in 'initdbpath' directory inside the repository @@ -1608,6 +1673,9 @@ pub mod harness { timelines_to_load.insert(timeline_id, timeline_metadata); } tenant.init_attach_timelines(timelines_to_load)?; + tenant.set_state(TenantState::Active { + background_jobs_running: false, + }); Ok(tenant) } @@ -1767,7 +1835,7 @@ mod tests { // Branch the history, modify relation differently on the new timeline tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer(); new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; @@ -1923,7 +1991,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; @@ -1942,7 +2010,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -1974,7 +2042,7 @@ mod tests { let tenant = harness.load(); tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot load timeline"); Ok(()) @@ -1997,7 +2065,7 @@ mod tests { tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -2009,11 +2077,11 @@ mod tests { // check that both, child and ancestor are loaded let _child_tline = tenant - .get_timeline(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID, true) .expect("cannot get child timeline loaded"); let _ancestor_tline = tenant - .get_timeline(TIMELINE_ID) + .get_timeline(TIMELINE_ID, true) .expect("cannot get ancestor timeline loaded"); Ok(()) @@ -2267,7 +2335,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; @@ -2330,7 +2398,7 @@ mod tests { let new_tline_id = TimelineId::generate(); tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; tline = tenant - .get_timeline(new_tline_id) + .get_timeline(new_tline_id, true) .expect("Should have the branched timeline"); tline_id = new_tline_id; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ccd094b65a..194ca0d857 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5,6 +5,8 @@ use bytes::Bytes; use fail::fail_point; use itertools::Itertools; use once_cell::sync::OnceCell; +use pageserver_api::models::TimelineState; +use tokio::sync::watch; use tokio::task::spawn_blocking; use tracing::*; @@ -160,6 +162,8 @@ pub struct Timeline { /// Relation size cache pub rel_size_cache: RwLock>, + + state: watch::Sender, } /// Internal structure to hold all data needed for logical size calculation. @@ -416,9 +420,11 @@ impl Timeline { /// those functions with an LSN that has been processed yet is an error. /// pub async fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { + anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline"); + // This should never be called from the WAL receiver, because that could lead // to a deadlock. - ensure!( + anyhow::ensure!( task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnection), "wait_lsn cannot be called in WAL receiver" ); @@ -635,6 +641,35 @@ impl Timeline { } Ok(()) } + + pub fn set_state(&self, new_state: TimelineState) { + match (self.current_state(), new_state) { + (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { + debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); + } + (TimelineState::Broken, _) => { + error!("Ignoring state update {new_state:?} for broken tenant"); + } + (TimelineState::Paused, TimelineState::Active) => { + debug!("Not activating a paused timeline"); + } + (_, new_state) => { + self.state.send_replace(new_state); + } + } + } + + pub fn current_state(&self) -> TimelineState { + *self.state.borrow() + } + + pub fn is_active(&self) -> bool { + self.current_state() == TimelineState::Active + } + + pub fn subscribe_for_state_updates(&self) -> watch::Receiver { + self.state.subscribe() + } } // Private functions @@ -688,8 +723,9 @@ impl Timeline { walredo_mgr: Arc, upload_layers: bool, pg_version: u32, - ) -> Timeline { + ) -> Self { let disk_consistent_lsn = metadata.disk_consistent_lsn(); + let (state, _) = watch::channel(TimelineState::Suspended); let mut result = Timeline { conf, @@ -746,6 +782,7 @@ impl Timeline { last_received_wal: Mutex::new(None), rel_size_cache: RwLock::new(HashMap::new()), + state, }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result @@ -883,8 +920,6 @@ impl Timeline { } fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { - let timeline_id = self.timeline_id; - // Atomically check if the timeline size calculation had already started. // If the flag was not already set, this sets it. if !self @@ -901,17 +936,42 @@ impl Timeline { "initial size calculation", false, async move { - let calculated_size = self_clone.calculate_logical_size(init_lsn)?; - let result = spawn_blocking(move || { - self_clone.current_logical_size.initial_logical_size.set(calculated_size) - }).await?; - match result { - Ok(()) => info!("Successfully calculated initial logical size"), - Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + let mut timeline_state_updates = self_clone.subscribe_for_state_updates(); + let self_calculation = Arc::clone(&self_clone); + tokio::select! { + calculation_result = spawn_blocking(move || self_calculation.calculate_logical_size(init_lsn)) => { + let calculated_size = calculation_result + .context("Failed to spawn calculation result task")? + .context("Failed to calculate logical size")?; + match self_clone.current_logical_size.initial_logical_size.set(calculated_size) { + Ok(()) => info!("Successfully calculated initial logical size"), + Err(existing_size) => error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing"), + } + Ok(()) + }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + // we're running this job for active timelines only + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return Some(new_state), + } + } + Err(_sender_dropped_error) => return None, + } + } + } => { + match new_event { + Some(new_state) => info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"), + None => info!("Timeline dropped state updates sender, stopping init size calculation"), + } + Ok(()) + }, } - Ok(()) - } - .instrument(info_span!("initial_logical_size_calculation", timeline = %timeline_id)) + }.instrument(info_span!("initial_logical_size_calculation", tenant = %self.tenant_id, timeline = %self.timeline_id)), ); } } @@ -1356,7 +1416,7 @@ impl Timeline { false, )?; - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1826,7 +1886,7 @@ impl Timeline { } drop(layers); - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_upload( self.tenant_id, self.timeline_id, @@ -1930,7 +1990,7 @@ impl Timeline { /// obsolete. /// pub(super) fn gc(&self) -> anyhow::Result { - let mut result: GcResult = Default::default(); + let mut result: GcResult = GcResult::default(); let now = SystemTime::now(); fail_point!("before-timeline-gc"); @@ -2110,7 +2170,7 @@ impl Timeline { fail_point!("after-timeline-gc-removed-layers"); } - if self.upload_layers.load(atomic::Ordering::Relaxed) { + if self.can_upload_layers() { storage_sync::schedule_layer_delete( self.tenant_id, self.timeline_id, @@ -2199,6 +2259,11 @@ impl Timeline { } } } + + fn can_upload_layers(&self) -> bool { + self.upload_layers.load(atomic::Ordering::Relaxed) + && self.current_state() != TimelineState::Broken + } } /// Helper function for get_reconstruct_data() to add the path of layers traversed diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 030055df6d..23ce9dc699 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -175,7 +175,7 @@ async fn wait_for_active_tenant( } state => { debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}"); - tokio::time::sleep(wait).await; + continue; } } } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 3a5d1c7ad6..53dd2d8eac 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -12,6 +12,7 @@ use std::{ collections::{hash_map, HashMap}, num::NonZeroU64, + ops::ControlFlow, sync::Arc, time::Duration, }; @@ -26,7 +27,8 @@ use etcd_broker::{ subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, BrokerUpdate, Client, }; -use tokio::select; +use pageserver_api::models::TimelineState; +use tokio::{select, sync::watch}; use tracing::*; use crate::{ @@ -58,10 +60,7 @@ pub fn spawn_connection_manager_task( TaskKind::WalReceiverManager, Some(tenant_id), Some(timeline_id), - &format!( - "walreceiver for tenant {} timeline {}", - timeline.tenant_id, timeline.timeline_id - ), + &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), false, async move { info!("WAL receiver broker started, connecting to etcd"); @@ -75,19 +74,21 @@ pub fn spawn_connection_manager_task( select! { _ = task_mgr::shutdown_watcher() => { info!("WAL receiver shutdown requested, shutting down"); - // Kill current connection, if any - if let Some(wal_connection) = walreceiver_state.wal_connection.take() - { - wal_connection.connection_task.shutdown().await; - } + walreceiver_state.shutdown().await; return Ok(()); }, - - _ = connection_manager_loop_step( + loop_step_result = connection_manager_loop_step( &broker_loop_prefix, &mut etcd_client, &mut walreceiver_state, - ) => {}, + ) => match loop_step_result { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(()) => { + info!("Connection manager loop ended, shutting down"); + walreceiver_state.shutdown().await; + return Ok(()); + } + }, } } } @@ -104,7 +105,17 @@ async fn connection_manager_loop_step( broker_prefix: &str, etcd_client: &mut Client, walreceiver_state: &mut WalreceiverState, -) { +) -> ControlFlow<(), ()> { + let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates(); + + match wait_for_active_timeline(&mut timeline_state_updates).await { + ControlFlow::Continue(()) => {} + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + } + let id = TenantTimelineId { tenant_id: walreceiver_state.timeline.tenant_id, timeline_id: walreceiver_state.timeline.timeline_id, @@ -129,10 +140,12 @@ async fn connection_manager_loop_step( // - change connection if the rules decide so, or if the current connection dies // - receive updates from broker // - this might change the current desired connection + // - timeline state changes to something that does not allow walreceiver to run concurrently select! { broker_connection_result = &mut broker_subscription.watcher_handle => { + info!("Broker connection was closed from the other side, ending current broker loop step"); cleanup_broker_connection(broker_connection_result, walreceiver_state); - return; + return ControlFlow::Continue(()); }, Some(wal_connection_update) = async { @@ -185,11 +198,36 @@ async fn connection_manager_loop_step( (&mut broker_subscription.watcher_handle).await, walreceiver_state, ); - return; + return ControlFlow::Continue(()); } } }, + new_event = async { + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = walreceiver_state.timeline.current_state(); + match new_state { + // we're already active as walreceiver, no need to reactivate + TimelineState::Active => continue, + TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return ControlFlow::Continue(new_state), + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } + } => match new_event { + ControlFlow::Continue(new_state) => { + info!("Timeline became inactive (new state: {new_state:?}), dropping current connections until it reactivates"); + return ControlFlow::Continue(()); + } + ControlFlow::Break(()) => { + info!("Timeline dropped state updates sender, stopping wal connection manager loop"); + return ControlFlow::Break(()); + } + }, + _ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {} } @@ -216,6 +254,34 @@ async fn connection_manager_loop_step( } } +async fn wait_for_active_timeline( + timeline_state_updates: &mut watch::Receiver, +) -> ControlFlow<(), ()> { + let current_state = *timeline_state_updates.borrow(); + if current_state == TimelineState::Active { + return ControlFlow::Continue(()); + } + + loop { + match timeline_state_updates.changed().await { + Ok(()) => { + let new_state = *timeline_state_updates.borrow(); + match new_state { + TimelineState::Active => { + debug!("Timeline state changed to active, continuing the walreceiver connection manager"); + return ControlFlow::Continue(()); + } + state => { + debug!("Not running the walreceiver connection manager, timeline is not active: {state:?}"); + continue; + } + } + } + Err(_sender_dropped_error) => return ControlFlow::Break(()), + } + } +} + fn cleanup_broker_connection( broker_connection_result: Result, tokio::task::JoinError>, walreceiver_state: &mut WalreceiverState, @@ -723,6 +789,12 @@ impl WalreceiverState { self.wal_connection_retries.remove(&node_id); } } + + async fn shutdown(mut self) { + if let Some(wal_connection) = self.wal_connection.take() { + wal_connection.connection_task.shutdown().await; + } + } } #[derive(Debug, PartialEq, Eq)] @@ -801,6 +873,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: None, }, etcd_version: 0, @@ -817,6 +890,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("no commit_lsn".to_string()), }, etcd_version: 0, @@ -833,6 +908,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: Some("no commit_lsn".to_string()), }, etcd_version: 0, @@ -849,6 +925,7 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, safekeeper_connstr: None, }, etcd_version: 0, @@ -908,6 +985,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -924,6 +1003,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("not advanced Lsn".to_string()), }, etcd_version: 0, @@ -940,6 +1021,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("not enough advanced Lsn".to_string()), }, etcd_version: 0, @@ -974,6 +1057,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1006,6 +1091,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("smaller commit_lsn".to_string()), }, etcd_version: 0, @@ -1022,6 +1109,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1038,6 +1127,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: None, }, etcd_version: 0, @@ -1083,6 +1174,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1099,6 +1192,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1168,6 +1263,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1184,6 +1281,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), }, etcd_version: 0, @@ -1255,6 +1354,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, @@ -1326,6 +1427,8 @@ mod tests { backup_lsn: None, remote_consistent_lsn: None, peer_horizon_lsn: None, + local_start_lsn: None, + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), }, etcd_version: 0, diff --git a/poetry.lock b/poetry.lock index 2af0d97511..27de8508ce 100644 --- a/poetry.lock +++ b/poetry.lock @@ -514,14 +514,6 @@ python-versions = ">=3.7" [package.dependencies] typing-extensions = ">=4.1.0" -[[package]] -name = "cached-property" -version = "1.5.2" -description = "A decorator for caching properties in classes." -category = "main" -optional = false -python-versions = "*" - [[package]] name = "certifi" version = "2022.6.15" @@ -1647,10 +1639,6 @@ botocore-stubs = [ {file = "botocore-stubs-1.27.38.tar.gz", hash = "sha256:408e8b86b5d171b58f81c74ca9d3b5317a5a8e2d3bc2073aa841ac13b8939e56"}, {file = "botocore_stubs-1.27.38-py3-none-any.whl", hash = "sha256:7add7641e9a479a9c8366893bb522fd9ca3d58714201e43662a200a148a1bc38"}, ] -cached-property = [ - {file = "cached-property-1.5.2.tar.gz", hash = "sha256:9fa5755838eecbb2d234c3aa390bd80fbd3ac6b6869109bfc1b499f7bd89a130"}, - {file = "cached_property-1.5.2-py2.py3-none-any.whl", hash = "sha256:df4f613cf7ad9a588cc381aaf4a512d26265ecebd5eb9e1ba12f1319eb85a6a0"}, -] certifi = [ {file = "certifi-2022.6.15-py3-none-any.whl", hash = "sha256:fe86415d55e84719d75f8b69414f6438ac3547d2078ab91b67e779ef69378412"}, {file = "certifi-2022.6.15.tar.gz", hash = "sha256:84c85a9078b11105f04f3036a9482ae10e4621616db313fe045dd24743a0820d"}, diff --git a/pyproject.toml b/pyproject.toml index 9c2aa39c7c..1ee6fbe6f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,6 @@ requests = "^2.26.0" pytest-xdist = "^2.3.0" asyncpg = "^0.24.0" aiopg = "^1.3.1" -cached-property = "^1.5.2" Jinja2 = "^3.0.2" types-requests = "^2.28.5" types-psycopg2 = "^2.9.18" @@ -74,7 +73,6 @@ strict = true [[tool.mypy.overrides]] module = [ "asyncpg.*", - "cached_property.*", "pg8000.*", ] ignore_missing_imports = true diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 9422b55d60..67c2c62f73 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -21,7 +21,8 @@ use metrics::set_build_info_metric; use safekeeper::broker; use safekeeper::control_file; use safekeeper::defaults::{ - DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, + DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, + DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, }; use safekeeper::http; use safekeeper::remove_wal; @@ -31,8 +32,12 @@ use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; use utils::auth::JwtAuth; use utils::{ - http::endpoint, id::NodeId, logging, project_git_version, shutdown::exit_now, signals, - tcp_listener, + http::endpoint, + id::NodeId, + logging::{self, LogFormat}, + project_git_version, + shutdown::exit_now, + signals, tcp_listener, }; const LOCK_FILE_NAME: &str = "safekeeper.lock"; @@ -72,10 +77,6 @@ fn main() -> anyhow::Result<()> { conf.listen_http_addr = addr.to_string(); } - if let Some(recall) = arg_matches.get_one::("recall") { - conf.recall_period = humantime::parse_duration(recall)?; - } - let mut given_id = None; if let Some(given_id_str) = arg_matches.get_one::("id") { given_id = Some(NodeId( @@ -93,6 +94,16 @@ fn main() -> anyhow::Result<()> { conf.broker_etcd_prefix = prefix.to_string(); } + if let Some(heartbeat_timeout_str) = arg_matches.get_one::("heartbeat-timeout") { + conf.heartbeat_timeout = + humantime::parse_duration(heartbeat_timeout_str).with_context(|| { + format!( + "failed to parse heartbeat-timeout {}", + heartbeat_timeout_str + ) + })?; + } + if let Some(backup_threads) = arg_matches.get_one::("wal-backup-threads") { conf.backup_runtime_threads = backup_threads .parse() @@ -105,6 +116,14 @@ fn main() -> anyhow::Result<()> { let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?); } + if let Some(max_offloader_lag_str) = arg_matches.get_one::("max-offloader-lag") { + conf.max_offloader_lag_bytes = max_offloader_lag_str.parse().with_context(|| { + format!( + "failed to parse max offloader lag {}", + max_offloader_lag_str + ) + })?; + } // Seems like there is no better way to accept bool values explicitly in clap. conf.wal_backup_enabled = arg_matches .get_one::("enable-wal-backup") @@ -116,11 +135,15 @@ fn main() -> anyhow::Result<()> { .get_one::("auth-validation-public-key-path") .map(PathBuf::from); + if let Some(log_format) = arg_matches.get_one::("log-format") { + conf.log_format = LogFormat::from_config(log_format)?; + } + start_safekeeper(conf, given_id, arg_matches.get_flag("init")) } fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bool) -> Result<()> { - let log_file = logging::init("safekeeper.log", conf.daemonize)?; + let log_file = logging::init("safekeeper.log", conf.daemonize, conf.log_format)?; info!("version: {GIT_VERSION}"); @@ -361,11 +384,6 @@ fn cli() -> Command { .short('p') .long("pageserver"), ) - .arg( - Arg::new("recall") - .long("recall") - .help("Period for requestion pageserver to call for replication"), - ) .arg( Arg::new("daemonize") .short('d') @@ -397,6 +415,11 @@ fn cli() -> Command { .long("broker-etcd-prefix") .help("a prefix to always use when polling/pusing data in etcd from this safekeeper"), ) + .arg( + Arg::new("heartbeat-timeout") + .long("heartbeat-timeout") + .help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs())) + ) .arg( Arg::new("wal-backup-threads").long("backup-threads").help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")), ).arg( @@ -404,6 +427,11 @@ fn cli() -> Command { .long("remote-storage") .help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"\", \"bucket_region\":\"\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]//, mirroring structure on the file system.") ) + .arg( + Arg::new("max-offloader-lag") + .long("max-offloader-lag") + .help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG_BYTES / (1 << 20))) + ) .arg( Arg::new("enable-wal-backup") .long("enable-wal-backup") @@ -416,6 +444,11 @@ fn cli() -> Command { .long("auth-validation-public-key-path") .help("Path to an RSA .pem public key which is used to check JWT tokens") ) + .arg( + Arg::new("log-format") + .long("log-format") + .help("Format for logging, either 'plain' or 'json'") + ) } #[test] diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 6a2456ecda..76135241b9 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -1,6 +1,5 @@ //! Communication with etcd, providing safekeeper peers and pageserver coordination. -use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; @@ -12,11 +11,9 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::collections::HashSet; use std::time::Duration; -use tokio::spawn; use tokio::task::JoinHandle; use tokio::{runtime, time::sleep}; use tracing::*; -use url::Url; use crate::GlobalTimelines; use crate::SafeKeeperConf; @@ -56,113 +53,6 @@ fn timeline_safekeeper_path( ) } -pub struct Election { - pub election_name: String, - pub candidate_name: String, - pub broker_endpoints: Vec, -} - -impl Election { - pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec) -> Self { - Self { - election_name, - candidate_name, - broker_endpoints, - } - } -} - -pub struct ElectionLeader { - client: Client, - keep_alive: JoinHandle>, -} - -impl ElectionLeader { - pub async fn check_am_i( - &mut self, - election_name: String, - candidate_name: String, - ) -> Result { - let resp = self.client.leader(election_name).await?; - - let kv = resp - .kv() - .ok_or_else(|| anyhow!("failed to get leader response"))?; - let leader = kv.value_str()?; - - Ok(leader == candidate_name) - } - - pub async fn give_up(self) { - self.keep_alive.abort(); - // TODO: it'll be wise to resign here but it'll happen after lease expiration anyway - // should we await for keep alive termination? - let _ = self.keep_alive.await; - } -} - -pub async fn get_leader(req: &Election, leader: &mut Option) -> Result<()> { - let mut client = Client::connect(req.broker_endpoints.clone(), None) - .await - .context("Could not connect to etcd")?; - - let lease = client - .lease_grant(LEASE_TTL_SEC, None) - .await - .context("Could not acquire a lease"); - - let lease_id = lease.map(|l| l.id()).unwrap(); - - // kill previous keepalive, if any - if let Some(l) = leader.take() { - l.give_up().await; - } - - let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id)); - // immediately save handle to kill task if we get canceled below - *leader = Some(ElectionLeader { - client: client.clone(), - keep_alive, - }); - - client - .campaign( - req.election_name.clone(), - req.candidate_name.clone(), - lease_id, - ) - .await?; - - Ok(()) -} - -async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { - let (mut keeper, mut ka_stream) = client - .lease_keep_alive(lease_id) - .await - .context("failed to create keepalive stream")?; - - loop { - let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); - - keeper - .keep_alive() - .await - .context("failed to send LeaseKeepAliveRequest")?; - - ka_stream - .message() - .await - .context("failed to receive LeaseKeepAliveResponse")?; - - sleep(push_interval).await; - } -} - -pub fn get_candiate_name(system_id: NodeId) -> String { - format!("id_{system_id}") -} - async fn push_sk_info( ttid: TenantTimelineId, mut client: Client, @@ -236,7 +126,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let handles = active_tlis .iter() .map(|tli| { - let sk_info = tli.get_public_info(&conf); + let sk_info = tli.get_safekeeper_info(&conf); let key = timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id); let lease = leases.remove(&tli.ttid).unwrap(); @@ -282,6 +172,9 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { Some(new_info) => { // note: there are blocking operations below, but it's considered fine for now if let Ok(tli) = GlobalTimelines::get(new_info.key.id) { + // Note that we also receive *our own* info. That's + // important, as it is used as an indication of live + // connection to the broker. tli.record_safekeeper_info(&new_info.value, new_info.key.node_id) .await? } diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 1ce9186085..856c164be8 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,6 +1,7 @@ //! Code to deal with safekeeper control file upgrades use crate::safekeeper::{ - AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, + AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, + TermSwitchEntry, }; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; @@ -134,7 +135,7 @@ pub struct SafeKeeperStateV4 { // fundamental; but state is saved here only for informational purposes and // obviously can be stale. (Currently not saved at all, but let's provision // place to have less file version upgrades). - pub peers: Peers, + pub peers: PersistedPeers, } pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { @@ -165,7 +166,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to hexing some ids } else if version == 2 { @@ -188,7 +189,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to moving tenant_id/timeline_id to the top and adding some lsns } else if version == 3 { @@ -211,7 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); // migrate to having timeline_start_lsn } else if version == 4 { @@ -234,7 +235,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result backup_lsn: Lsn::INVALID, peer_horizon_lsn: oldstate.peer_horizon_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(vec![]), + peers: PersistedPeers(vec![]), }); } else if version == 5 { info!("reading safekeeper control file version {}", version); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index e38a5a4633..c3b8227e17 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,11 +1,16 @@ -use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS; +use defaults::{ + DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, +}; // use remote_storage::RemoteStorageConfig; use std::path::PathBuf; use std::time::Duration; use url::Url; -use utils::id::{NodeId, TenantId, TenantTimelineId}; +use utils::{ + id::{NodeId, TenantId, TenantTimelineId}, + logging::LogFormat, +}; pub mod broker; pub mod control_file; @@ -34,8 +39,9 @@ pub mod defaults { DEFAULT_PG_LISTEN_PORT, }; - pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8; + pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); + pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); } #[derive(Debug, Clone)] @@ -52,7 +58,6 @@ pub struct SafeKeeperConf { pub no_sync: bool, pub listen_pg_addr: String, pub listen_http_addr: String, - pub recall_period: Duration, pub remote_storage: Option, pub backup_runtime_threads: usize, pub wal_backup_enabled: bool, @@ -60,6 +65,9 @@ pub struct SafeKeeperConf { pub broker_endpoints: Vec, pub broker_etcd_prefix: String, pub auth_validation_public_key_path: Option, + pub heartbeat_timeout: Duration, + pub max_offloader_lag_bytes: u64, + pub log_format: LogFormat, } impl SafeKeeperConf { @@ -85,13 +93,15 @@ impl Default for SafeKeeperConf { listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), remote_storage: None, - recall_period: defaults::DEFAULT_RECALL_PERIOD, my_id: NodeId(0), broker_endpoints: Vec::new(), broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS, wal_backup_enabled: true, auth_validation_public_key_path: None, + heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT, + max_offloader_lag_bytes: DEFAULT_MAX_OFFLOADER_LAG_BYTES, + log_format: LogFormat::Plain, } } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7b11aaf92a..3f9b70f282 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -11,6 +11,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; + use tracing::*; use crate::control_file; @@ -132,9 +133,8 @@ pub struct ServerInfo { pub wal_seg_size: u32, } -/// Data published by safekeeper to the peers #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PeerInfo { +pub struct PersistedPeerInfo { /// LSN up to which safekeeper offloaded WAL to s3. backup_lsn: Lsn, /// Term of the last entry. @@ -145,7 +145,7 @@ pub struct PeerInfo { commit_lsn: Lsn, } -impl PeerInfo { +impl PersistedPeerInfo { fn new() -> Self { Self { backup_lsn: Lsn::INVALID, @@ -156,10 +156,8 @@ impl PeerInfo { } } -// vector-based node id -> peer state map with very limited functionality we -// need/ #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Peers(pub Vec<(NodeId, PeerInfo)>); +pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); /// Persistent information stored on safekeeper node /// On disk data is prefixed by magic and format version and followed by checksum. @@ -203,7 +201,7 @@ pub struct SafeKeeperState { // fundamental; but state is saved here only for informational purposes and // obviously can be stale. (Currently not saved at all, but let's provision // place to have less file version upgrades). - pub peers: Peers, + pub peers: PersistedPeers, } #[derive(Debug, Clone)] @@ -240,7 +238,12 @@ impl SafeKeeperState { backup_lsn: local_start_lsn, peer_horizon_lsn: local_start_lsn, remote_consistent_lsn: Lsn(0), - peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()), + peers: PersistedPeers( + peers + .iter() + .map(|p| (*p, PersistedPeerInfo::new())) + .collect(), + ), } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 3fb77bf582..1930b3574a 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -7,7 +7,7 @@ use etcd_broker::subscription_value::SkTimelineInfo; use postgres_ffi::XLogSegNo; -use tokio::sync::watch; +use tokio::{sync::watch, time::Instant}; use std::cmp::{max, min}; @@ -26,7 +26,7 @@ use utils::{ use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, - SafekeeperMemState, ServerInfo, + SafekeeperMemState, ServerInfo, Term, }; use crate::send_wal::HotStandbyFeedback; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; @@ -36,6 +36,53 @@ use crate::wal_storage; use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; +/// Things safekeeper should know about timeline state on peers. +#[derive(Debug, Clone)] +pub struct PeerInfo { + pub sk_id: NodeId, + /// Term of the last entry. + _last_log_term: Term, + /// LSN of the last record. + _flush_lsn: Lsn, + pub commit_lsn: Lsn, + /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new + /// sk since backup_lsn. + pub local_start_lsn: Lsn, + /// When info was received. + ts: Instant, +} + +impl PeerInfo { + fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo { + PeerInfo { + sk_id, + _last_log_term: sk_info.last_log_term.unwrap_or(0), + _flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID), + commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID), + local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID), + ts, + } + } +} + +// vector-based node id -> peer state map with very limited functionality we +// need. +#[derive(Debug, Clone, Default)] +pub struct PeersInfo(pub Vec); + +impl PeersInfo { + fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> { + self.0.iter_mut().find(|p| p.sk_id == id) + } + + fn upsert(&mut self, p: &PeerInfo) { + match self.get(p.sk_id) { + Some(rp) => *rp = p.clone(), + None => self.0.push(p.clone()), + } + } +} + /// Replica status update + hot standby feedback #[derive(Debug, Clone, Copy)] pub struct ReplicaState { @@ -74,6 +121,8 @@ impl ReplicaState { pub struct SharedState { /// Safekeeper object sk: SafeKeeper, + /// In memory list containing state of peers sent in latest messages from them. + peers_info: PeersInfo, /// State of replicas replicas: Vec>, /// True when WAL backup launcher oversees the timeline, making sure WAL is @@ -123,7 +172,8 @@ impl SharedState { Ok(Self { sk, - replicas: Vec::new(), + peers_info: PeersInfo(vec![]), + replicas: vec![], wal_backup_active: false, active: false, num_computes: 0, @@ -142,6 +192,7 @@ impl SharedState { Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, + peers_info: PeersInfo(vec![]), replicas: Vec::new(), wal_backup_active: false, active: false, @@ -201,12 +252,6 @@ impl SharedState { self.wal_backup_active } - // Can this safekeeper offload to s3? Recently joined safekeepers might not - // have necessary WAL. - fn can_wal_backup(&self) -> bool { - self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn - } - fn get_wal_seg_size(&self) -> usize { self.sk.state.server.wal_seg_size as usize } @@ -268,6 +313,24 @@ impl SharedState { self.replicas.push(Some(state)); pos } + + fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { + SkTimelineInfo { + last_log_term: Some(self.sk.get_epoch()), + flush_lsn: Some(self.sk.wal_store.flush_lsn()), + // note: this value is not flushed to control file yet and can be lost + commit_lsn: Some(self.sk.inmem.commit_lsn), + // TODO: rework feedbacks to avoid max here + remote_consistent_lsn: Some(max( + self.get_replicas_state().remote_consistent_lsn, + self.sk.inmem.remote_consistent_lsn, + )), + peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn), + safekeeper_connstr: Some(conf.listen_pg_addr.clone()), + backup_lsn: Some(self.sk.inmem.backup_lsn), + local_start_lsn: Some(self.sk.state.local_start_lsn), + } + } } #[derive(Debug, thiserror::Error)] @@ -517,17 +580,6 @@ impl Timeline { self.write_shared_state().wal_backup_attend() } - /// Can this safekeeper offload to s3? Recently joined safekeepers might not - /// have necessary WAL. - pub fn can_wal_backup(&self) -> bool { - if self.is_cancelled() { - return false; - } - - let shared_state = self.write_shared_state(); - shared_state.can_wal_backup() - } - /// Returns full timeline info, required for the metrics. If the timeline is /// not active, returns None instead. pub fn info_for_metrics(&self) -> Option { @@ -632,36 +684,25 @@ impl Timeline { Ok(()) } - /// Return public safekeeper info for broadcasting to broker and other peers. - pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { + /// Get safekeeper info for broadcasting to broker and other peers. + pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { let shared_state = self.write_shared_state(); - SkTimelineInfo { - last_log_term: Some(shared_state.sk.get_epoch()), - flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), - // note: this value is not flushed to control file yet and can be lost - commit_lsn: Some(shared_state.sk.inmem.commit_lsn), - // TODO: rework feedbacks to avoid max here - remote_consistent_lsn: Some(max( - shared_state.get_replicas_state().remote_consistent_lsn, - shared_state.sk.inmem.remote_consistent_lsn, - )), - peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), - safekeeper_connstr: Some(conf.listen_pg_addr.clone()), - backup_lsn: Some(shared_state.sk.inmem.backup_lsn), - } + shared_state.get_safekeeper_info(conf) } /// Update timeline state with peer safekeeper data. pub async fn record_safekeeper_info( &self, sk_info: &SkTimelineInfo, - _sk_id: NodeId, + sk_id: NodeId, ) -> Result<()> { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { let mut shared_state = self.write_shared_state(); shared_state.sk.record_safekeeper_info(sk_info)?; + let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now()); + shared_state.peers_info.upsert(&peer_info); is_wal_backup_action_pending = shared_state.update_status(self.ttid); commit_lsn = shared_state.sk.inmem.commit_lsn; } @@ -673,6 +714,22 @@ impl Timeline { Ok(()) } + /// Get our latest view of alive peers status on the timeline. + /// We pass our own info through the broker as well, so when we don't have connection + /// to the broker returned vec is empty. + pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { + let shared_state = self.write_shared_state(); + let now = Instant::now(); + shared_state + .peers_info + .0 + .iter() + // Regard peer as absent if we haven't heard from it within heartbeat_timeout. + .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout) + .cloned() + .collect() + } + /// Add send_wal replica to the in-memory vector of replicas. pub fn add_replica(&self, state: ReplicaState) -> usize { self.write_shared_state().add_replica(state) diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index c82a003161..0a43d6085c 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -1,8 +1,7 @@ use anyhow::{Context, Result}; -use etcd_broker::subscription_key::{ - NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind, -}; + use tokio::task::JoinHandle; +use utils::id::NodeId; use std::cmp::min; use std::collections::HashMap; @@ -26,14 +25,11 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; -use crate::broker::{Election, ElectionLeader}; -use crate::timeline::Timeline; -use crate::{broker, GlobalTimelines, SafeKeeperConf}; +use crate::timeline::{PeerInfo, Timeline}; +use crate::{GlobalTimelines, SafeKeeperConf}; use once_cell::sync::OnceCell; -const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000; - const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; @@ -70,47 +66,104 @@ struct WalBackupTimelineEntry { handle: Option, } -/// Start per timeline task, if it makes sense for this safekeeper to offload. -fn consider_start_task( +async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) { + if let Some(wb_handle) = entry.handle.take() { + // Tell the task to shutdown. Error means task exited earlier, that's ok. + let _ = wb_handle.shutdown_tx.send(()).await; + // Await the task itself. TODO: restart panicked tasks earlier. + if let Err(e) = wb_handle.handle.await { + warn!("WAL backup task for {} panicked: {}", ttid, e); + } + } +} + +/// The goal is to ensure that normally only one safekeepers offloads. However, +/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short +/// time we have several ones as they PUT the same files. Also, +/// - frequently changing the offloader would be bad; +/// - electing seriously lagging safekeeper is undesirable; +/// So we deterministically choose among the reasonably caught up candidates. +/// TODO: take into account failed attempts to deal with hypothetical situation +/// where s3 is unreachable only for some sks. +fn determine_offloader( + alive_peers: &[PeerInfo], + wal_backup_lsn: Lsn, + ttid: TenantTimelineId, + conf: &SafeKeeperConf, +) -> (Option, String) { + // TODO: remove this once we fill newly joined safekeepers since backup_lsn. + let capable_peers = alive_peers + .iter() + .filter(|p| p.local_start_lsn <= wal_backup_lsn); + match capable_peers.clone().map(|p| p.commit_lsn).max() { + None => (None, "no connected peers to elect from".to_string()), + Some(max_commit_lsn) => { + let threshold = max_commit_lsn + .checked_sub(conf.max_offloader_lag_bytes) + .unwrap_or(Lsn(0)); + let mut caughtup_peers = capable_peers + .clone() + .filter(|p| p.commit_lsn >= threshold) + .collect::>(); + caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id)); + + // To distribute the load, shift by timeline_id. + let offloader = caughtup_peers + [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize] + .sk_id; + + let mut capable_peers_dbg = capable_peers + .map(|p| (p.sk_id, p.commit_lsn)) + .collect::>(); + capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0)); + ( + Some(offloader), + format!( + "elected {} among {:?} peers, with {} of them being caughtup", + offloader, + capable_peers_dbg, + caughtup_peers.len() + ), + ) + } + } +} + +/// Based on peer information determine which safekeeper should offload; if it +/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task +/// is running, kill it. +async fn update_task( conf: &SafeKeeperConf, ttid: TenantTimelineId, - task: &mut WalBackupTimelineEntry, + entry: &mut WalBackupTimelineEntry, ) { - if !task.timeline.can_wal_backup() { - return; + let alive_peers = entry.timeline.get_peers(conf); + let wal_backup_lsn = entry.timeline.get_wal_backup_lsn(); + let (offloader, election_dbg_str) = + determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf); + let elected_me = Some(conf.my_id) == offloader; + + if elected_me != (entry.handle.is_some()) { + if elected_me { + info!("elected for backup {}: {}", ttid, election_dbg_str); + + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + let timeline_dir = conf.timeline_dir(&ttid); + + let handle = tokio::spawn( + backup_task_main(ttid, timeline_dir, shutdown_rx) + .instrument(info_span!("WAL backup task", ttid = %ttid)), + ); + + entry.handle = Some(WalBackupTaskHandle { + shutdown_tx, + handle, + }); + } else { + info!("stepping down from backup {}: {}", ttid, election_dbg_str); + shut_down_task(ttid, entry).await; + } } - info!("starting WAL backup task for {}", ttid); - - // TODO: decide who should offload right here by simply checking current - // state instead of running elections in offloading task. - let election_name = SubscriptionKey { - cluster_prefix: conf.broker_etcd_prefix.clone(), - kind: SubscriptionKind::Operation( - ttid, - NodeKind::Safekeeper, - OperationKind::Safekeeper(SkOperationKind::WalBackup), - ), - } - .watch_key(); - let my_candidate_name = broker::get_candiate_name(conf.my_id); - let election = broker::Election::new( - election_name, - my_candidate_name, - conf.broker_endpoints.clone(), - ); - - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let timeline_dir = conf.timeline_dir(&ttid); - - let handle = tokio::spawn( - backup_task_main(ttid, timeline_dir, shutdown_rx, election) - .instrument(info_span!("WAL backup task", ttid = %ttid)), - ); - - task.handle = Some(WalBackupTaskHandle { - shutdown_tx, - handle, - }); } const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; @@ -158,27 +211,20 @@ async fn wal_backup_launcher_main_loop( timeline, handle: None, }); - consider_start_task(&conf, ttid, entry); + update_task(&conf, ttid, entry).await; } else { // need to stop the task info!("stopping WAL backup task for {}", ttid); - - let entry = tasks.remove(&ttid).unwrap(); - if let Some(wb_handle) = entry.handle { - // Tell the task to shutdown. Error means task exited earlier, that's ok. - let _ = wb_handle.shutdown_tx.send(()).await; - // Await the task itself. TODO: restart panicked tasks earlier. - if let Err(e) = wb_handle.handle.await { - warn!("WAL backup task for {} panicked: {}", ttid, e); - } - } + let mut entry = tasks.remove(&ttid).unwrap(); + shut_down_task(ttid, &mut entry).await; } } } - // Start known tasks, if needed and possible. + // For each timeline needing offloading, check if this safekeeper + // should do the job and start/stop the task accordingly. _ = ticker.tick() => { - for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) { - consider_start_task(&conf, *ttid, entry); + for (ttid, entry) in tasks.iter_mut() { + update_task(&conf, *ttid, entry).await; } } } @@ -190,17 +236,13 @@ struct WalBackupTask { timeline_dir: PathBuf, wal_seg_size: usize, commit_lsn_watch_rx: watch::Receiver, - leader: Option, - election: Election, } -/// Offload single timeline. Called only after we checked that backup -/// is required (wal_backup_attend) and possible (can_wal_backup). +/// Offload single timeline. async fn backup_task_main( ttid: TenantTimelineId, timeline_dir: PathBuf, mut shutdown_rx: Receiver<()>, - election: Election, ) { info!("started"); let res = GlobalTimelines::get(ttid); @@ -215,8 +257,6 @@ async fn backup_task_main( commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), timeline: tli, timeline_dir, - leader: None, - election, }; // task is spinned up only when wal_seg_size already initialized @@ -229,9 +269,6 @@ async fn backup_task_main( canceled = true; } } - if let Some(l) = wb.leader { - l.give_up().await; - } info!("task {}", if canceled { "canceled" } else { "terminated" }); } @@ -239,106 +276,71 @@ impl WalBackupTask { async fn run(&mut self) { let mut backup_lsn = Lsn(0); - // election loop + let mut retry_attempt = 0u32; + // offload loop loop { - let mut retry_attempt = 0u32; + if retry_attempt == 0 { + // wait for new WAL to arrive + if let Err(e) = self.commit_lsn_watch_rx.changed().await { + // should never happen, as we hold Arc to timeline. + error!("commit_lsn watch shut down: {:?}", e); + return; + } + } else { + // or just sleep if we errored previously + let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; + if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt) + { + retry_delay = min(retry_delay, backoff_delay); + } + sleep(Duration::from_millis(retry_delay)).await; + } - info!("acquiring leadership"); - if let Err(e) = broker::get_leader(&self.election, &mut self.leader).await { - error!("error during leader election {:?}", e); - sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await; + let commit_lsn = *self.commit_lsn_watch_rx.borrow(); + + // Note that backup_lsn can be higher than commit_lsn if we + // don't have much local WAL and others already uploaded + // segments we don't even have. + if backup_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) + { + retry_attempt = 0; + continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ + } + // Perhaps peers advanced the position, check shmem value. + backup_lsn = self.timeline.get_wal_backup_lsn(); + if backup_lsn.segment_number(self.wal_seg_size) + >= commit_lsn.segment_number(self.wal_seg_size) + { + retry_attempt = 0; continue; } - info!("acquired leadership"); - // offload loop - loop { - if retry_attempt == 0 { - // wait for new WAL to arrive - if let Err(e) = self.commit_lsn_watch_rx.changed().await { - // should never happen, as we hold Arc to timeline. - error!("commit_lsn watch shut down: {:?}", e); + match backup_lsn_range( + backup_lsn, + commit_lsn, + self.wal_seg_size, + &self.timeline_dir, + ) + .await + { + Ok(backup_lsn_result) => { + backup_lsn = backup_lsn_result; + let res = self.timeline.set_wal_backup_lsn(backup_lsn_result); + if let Err(e) = res { + error!("failed to set wal_backup_lsn: {}", e); return; } - } else { - // or just sleep if we errored previously - let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; - if let Some(backoff_delay) = - UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt) - { - retry_delay = min(retry_delay, backoff_delay); - } - sleep(Duration::from_millis(retry_delay)).await; + retry_attempt = 0; } + Err(e) => { + error!( + "failed while offloading range {}-{}: {:?}", + backup_lsn, commit_lsn, e + ); - let commit_lsn = *self.commit_lsn_watch_rx.borrow(); - - // Note that backup_lsn can be higher than commit_lsn if we - // don't have much local WAL and others already uploaded - // segments we don't even have. - if backup_lsn.segment_number(self.wal_seg_size) - >= commit_lsn.segment_number(self.wal_seg_size) - { - continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ - } - // Perhaps peers advanced the position, check shmem value. - backup_lsn = self.timeline.get_wal_backup_lsn(); - if backup_lsn.segment_number(self.wal_seg_size) - >= commit_lsn.segment_number(self.wal_seg_size) - { - continue; - } - - if let Some(l) = self.leader.as_mut() { - // Optimization idea for later: - // Avoid checking election leader every time by returning current lease grant expiration time - // Re-check leadership only after expiration time, - // such approach would reduce overhead on write-intensive workloads - - match l - .check_am_i( - self.election.election_name.clone(), - self.election.candidate_name.clone(), - ) - .await - { - Ok(leader) => { - if !leader { - info!("lost leadership"); - break; - } - } - Err(e) => { - warn!("error validating leader, {:?}", e); - break; - } - } - } - - match backup_lsn_range( - backup_lsn, - commit_lsn, - self.wal_seg_size, - &self.timeline_dir, - ) - .await - { - Ok(backup_lsn_result) => { - backup_lsn = backup_lsn_result; - let res = self.timeline.set_wal_backup_lsn(backup_lsn_result); - if let Err(e) = res { - error!("backup error: {}", e); - return; - } - retry_attempt = 0; - } - Err(e) => { - error!( - "failed while offloading range {}-{}: {:?}", - backup_lsn, commit_lsn, e - ); - - retry_attempt = min(retry_attempt + 1, u32::MAX); + if retry_attempt < u32::MAX { + retry_attempt += 1; } } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a77b3958c9..4b2638bb2a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -17,6 +17,7 @@ import uuid from contextlib import closing, contextmanager from dataclasses import dataclass, field from enum import Flag, auto +from functools import cached_property from pathlib import Path from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast @@ -27,7 +28,6 @@ import jwt import psycopg2 import pytest import requests -from cached_property import cached_property from fixtures.log_helper import log from fixtures.types import Lsn, TenantId, TimelineId diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 101cce9ffc..b747af4d09 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -70,18 +70,14 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # But all others are broken # First timeline would not get loaded into pageserver due to corrupt metadata file - with pytest.raises( - Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant1}/{timeline1} was not found") as err: pg1.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") # Second timeline has no ancestors, only the metadata file and no layer files # We don't have the remote storage enabled, which means timeline is in an incorrect state, # it's not loaded at all - with pytest.raises( - Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}" - ) as err: + with pytest.raises(Exception, match=f"Timeline {tenant2}/{timeline2} was not found") as err: pg2.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") diff --git a/test_runner/regress/test_close_fds.py b/test_runner/regress/test_close_fds.py index c7ea37f9c8..22f245f79b 100644 --- a/test_runner/regress/test_close_fds.py +++ b/test_runner/regress/test_close_fds.py @@ -1,10 +1,10 @@ import os.path import shutil import subprocess +import threading import time from contextlib import closing -from cached_property import threading from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index de05d445ed..4a78a2746e 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -65,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # check 404 with pytest.raises( NeonPageserverApiException, - match=f"Timeline {leaf_timeline_id} was not found for tenant {env.initial_tenant}", + match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ): ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)