diff --git a/.circleci/ansible/deploy.yaml b/.circleci/ansible/deploy.yaml index a8154ba3b0..b47db6a9b5 100644 --- a/.circleci/ansible/deploy.yaml +++ b/.circleci/ansible/deploy.yaml @@ -57,7 +57,7 @@ args: creates: "/storage/pageserver/data/tenants" environment: - ZENITH_REPO_DIR: "/storage/pageserver/data" + NEON_REPO_DIR: "/storage/pageserver/data" LD_LIBRARY_PATH: "/usr/local/lib" become: true tags: @@ -131,7 +131,7 @@ args: creates: "/storage/safekeeper/data/safekeeper.id" environment: - ZENITH_REPO_DIR: "/storage/safekeeper/data" + NEON_REPO_DIR: "/storage/safekeeper/data" LD_LIBRARY_PATH: "/usr/local/lib" become: true tags: diff --git a/.circleci/ansible/systemd/pageserver.service b/.circleci/ansible/systemd/pageserver.service index 54a7b1ba0a..bb78054fa3 100644 --- a/.circleci/ansible/systemd/pageserver.service +++ b/.circleci/ansible/systemd/pageserver.service @@ -5,7 +5,7 @@ After=network.target auditd.service [Service] Type=simple User=pageserver -Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib +Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed diff --git a/.circleci/ansible/systemd/safekeeper.service b/.circleci/ansible/systemd/safekeeper.service index e4a395a60e..9b1159d812 100644 --- a/.circleci/ansible/systemd/safekeeper.service +++ b/.circleci/ansible/systemd/safekeeper.service @@ -5,7 +5,7 @@ After=network.target auditd.service [Service] Type=simple User=safekeeper -Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib +Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="wal"}' ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed diff --git a/.dockerignore b/.dockerignore index 352336496f..0667d8870e 100644 --- a/.dockerignore +++ b/.dockerignore @@ -9,8 +9,8 @@ tmp_install tmp_check_cli test_output .vscode -.zenith -integration_tests/.zenith +.neon +integration_tests/.neon .mypy_cache Dockerfile diff --git a/.gitignore b/.gitignore index 291504ea81..ed718c8c79 100644 --- a/.gitignore +++ b/.gitignore @@ -6,8 +6,8 @@ __pycache__/ test_output/ .vscode .idea -/.zenith -/integration_tests/.zenith +/.neon +/integration_tests/.neon # Coverage *.profraw diff --git a/.yapfignore b/.yapfignore index 258f6c59cd..149428e452 100644 --- a/.yapfignore +++ b/.yapfignore @@ -6,5 +6,5 @@ target/ tmp_install/ __pycache__/ test_output/ -.zenith/ +.neon/ .git/ diff --git a/README.md b/README.md index be5032e87d..de9070ac0f 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ brew link --force libpq ```sh git clone --recursive https://github.com/neondatabase/neon.git cd neon -make -j5 +make -j`nproc` ``` #### dependency installation notes @@ -93,7 +93,7 @@ Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (r #### running neon database 1. Start pageserver and postgres on top of it (should be called from repo root): ```sh -# Create repository in .zenith with proper paths to binaries and data +# Create repository in .neon with proper paths to binaries and data # Later that would be responsibility of a package install script > ./target/debug/neon_local init initializing tenantid 9ef87a5bf0d92544f6fafeeb3239695c @@ -103,16 +103,16 @@ pageserver init succeeded # start pageserver and safekeeper > ./target/debug/neon_local start -Starting pageserver at '127.0.0.1:64000' in '.zenith' +Starting pageserver at '127.0.0.1:64000' in '.neon' Pageserver started initializing for sk 1 for 7676 -Starting safekeeper at '127.0.0.1:5454' in '.zenith/safekeepers/sk1' +Starting safekeeper at '127.0.0.1:5454' in '.neon/safekeepers/sk1' Safekeeper started # start postgres compute node > ./target/debug/neon_local pg start main Starting new postgres main on timeline de200bd42b49cc1814412c7e592dd6e9 ... -Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432 +Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432 Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres' # check list of running postgres instances @@ -149,7 +149,7 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant: # start postgres on that branch > ./target/debug/neon_local pg start migration_check --branch-name migration_check Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ... -Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433 +Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433 Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres' # check the new list of running postgres instances diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index f7bb890893..08389d29ba 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -21,9 +21,9 @@ use utils::{ use crate::safekeeper::SafekeeperNode; // -// This data structures represents zenith CLI config +// This data structures represents neon_local CLI config // -// It is deserialized from the .zenith/config file, or the config file passed +// It is deserialized from the .neon/config file, or the config file passed // to 'zenith init --config=' option. See control_plane/simple.conf for // an example. // @@ -34,8 +34,8 @@ pub struct LocalEnv { // compute nodes). // // This is not stored in the config file. Rather, this is the path where the - // config file itself is. It is read from the ZENITH_REPO_DIR env variable or - // '.zenith' if not given. + // config file itself is. It is read from the NEON_REPO_DIR env variable or + // '.neon' if not given. #[serde(skip)] pub base_data_dir: PathBuf, @@ -177,6 +177,7 @@ pub struct SafekeeperConf { pub sync: bool, pub remote_storage: Option, pub backup_threads: Option, + pub auth_enabled: bool, } impl Default for SafekeeperConf { @@ -188,6 +189,7 @@ impl Default for SafekeeperConf { sync: true, remote_storage: None, backup_threads: None, + auth_enabled: false, } } } @@ -337,7 +339,7 @@ impl LocalEnv { pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> { // Currently, the user first passes a config file with 'zenith init --config=' // We read that in, in `create_config`, and fill any missing defaults. Then it's saved - // to .zenith/config. TODO: We lose any formatting and comments along the way, which is + // to .neon/config. TODO: We lose any formatting and comments along the way, which is // a bit sad. let mut conf_content = r#"# This file describes a locale deployment of the page server # and safekeeeper node. It is read by the 'zenith' command-line @@ -481,9 +483,9 @@ impl LocalEnv { } fn base_path() -> PathBuf { - match std::env::var_os("ZENITH_REPO_DIR") { + match std::env::var_os("NEON_REPO_DIR") { Some(val) => PathBuf::from(val), - None => PathBuf::from(".zenith"), + None => PathBuf::from(".neon"), } } diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 972b6d48ae..c90f36d104 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -149,6 +149,11 @@ impl SafekeeperNode { if let Some(ref remote_storage) = self.conf.remote_storage { cmd.args(&["--remote-storage", remote_storage]); } + if self.conf.auth_enabled { + cmd.arg("--auth-validation-public-key-path"); + // PathBuf is better be passed as is, not via `String`. + cmd.arg(self.env.base_data_dir.join("auth_public_key.pem")); + } fill_aws_secrets_vars(&mut cmd); diff --git a/docs/rfcs/cluster-size-limits.md b/docs/rfcs/cluster-size-limits.md index bd12fb6eee..bd4cb9ef32 100644 --- a/docs/rfcs/cluster-size-limits.md +++ b/docs/rfcs/cluster-size-limits.md @@ -36,12 +36,12 @@ This is how the `LOGICAL_TIMELINE_SIZE` metric is implemented in the pageserver. Alternatively, we could count only relation data. As in pg_database_size(). This approach is somewhat more user-friendly because it is the data that is really affected by the user. On the other hand, it puts us in a weaker position than other services, i.e., RDS. -We will need to refactor the timeline_size counter or add another counter to implement it. +We will need to refactor the timeline_size counter or add another counter to implement it. Timeline size is updated during wal digestion. It is not versioned and is valid at the last_received_lsn moment. Then this size should be reported to compute node. -`current_timeline_size` value is included in the walreceiver's custom feedback message: `ZenithFeedback.` +`current_timeline_size` value is included in the walreceiver's custom feedback message: `ReplicationFeedback.` (PR about protocol changes https://github.com/zenithdb/zenith/pull/1037). @@ -64,11 +64,11 @@ We should warn users if the limit is soon to be reached. ### **Reliability, failure modes and corner cases** 1. `current_timeline_size` is valid at the last received and digested by pageserver lsn. - + If pageserver lags behind compute node, `current_timeline_size` will lag too. This lag can be tuned using backpressure, but it is not expected to be 0 all the time. - + So transactions that happen in this lsn range may cause limit overflow. Especially operations that generate (i.e., CREATE DATABASE) or free (i.e., TRUNCATE) a lot of data pages while generating a small amount of WAL. Are there other operations like this? - + Currently, CREATE DATABASE operations are restricted in the console. So this is not an issue. diff --git a/docs/settings.md b/docs/settings.md index 4d828f22bc..f2aaab75a8 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -154,7 +154,7 @@ The default distrib dir is `./tmp_install/`. #### workdir (-D) A directory in the file system, where pageserver will store its files. -The default is `./.zenith/`. +The default is `./.neon/`. This parameter has a special CLI alias (`-D`) and can not be overridden with regular `-c` way. diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 6b3293ec40..38d4a403c2 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -1,91 +1,43 @@ //! A set of primitives to access a shared data/updates, propagated via etcd broker (not persistent). //! Intended to connect services to each other, not to store their data. + +/// All broker keys, that are used when dealing with etcd. +pub mod subscription_key; +/// All broker values, possible to use when dealing with etcd. +pub mod subscription_value; + use std::{ collections::{hash_map, HashMap}, - fmt::Display, str::FromStr, }; -use once_cell::sync::Lazy; -use regex::{Captures, Regex}; -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; - -pub use etcd_client::*; +use serde::de::DeserializeOwned; +use subscription_key::SubscriptionKey; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::*; -use utils::{ - lsn::Lsn, - zid::{NodeId, ZTenantId, ZTenantTimelineId}, -}; +use utils::zid::{NodeId, ZTenantTimelineId}; + +use crate::subscription_key::SubscriptionFullKey; + +pub use etcd_client::*; /// Default value to use for prefixing to all etcd keys with. /// This way allows isolating safekeeper/pageserver groups in the same etcd cluster. pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon"; -#[derive(Debug, Deserialize, Serialize)] -struct SafekeeperTimeline { - safekeeper_id: NodeId, - info: SkTimelineInfo, -} - -/// Published data about safekeeper's timeline. Fields made optional for easy migrations. -#[serde_as] -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct SkTimelineInfo { - /// Term of the last entry. - pub last_log_term: Option, - /// LSN of the last record. - #[serde_as(as = "Option")] - #[serde(default)] - pub flush_lsn: Option, - /// Up to which LSN safekeeper regards its WAL as committed. - #[serde_as(as = "Option")] - #[serde(default)] - pub commit_lsn: Option, - /// LSN up to which safekeeper has backed WAL. - #[serde_as(as = "Option")] - #[serde(default)] - pub backup_lsn: Option, - /// LSN of last checkpoint uploaded by pageserver. - #[serde_as(as = "Option")] - #[serde(default)] - pub remote_consistent_lsn: Option, - #[serde_as(as = "Option")] - #[serde(default)] - pub peer_horizon_lsn: Option, - #[serde(default)] - pub safekeeper_connstr: Option, -} - -#[derive(Debug, thiserror::Error)] -pub enum BrokerError { - #[error("Etcd client error: {0}. Context: {1}")] - EtcdClient(etcd_client::Error, String), - #[error("Error during parsing etcd key: {0}")] - InvalidKey(String), - #[error("Error during parsing etcd value: {0}")] - ParsingError(String), - #[error("Internal error: {0}")] - InternalError(String), -} - /// A way to control the data retrieval from a certain subscription. -pub struct SkTimelineSubscription { - safekeeper_timeline_updates: - mpsc::UnboundedReceiver>>, - kind: SkTimelineSubscriptionKind, +pub struct BrokerSubscription { + value_updates: mpsc::UnboundedReceiver>>, + key: SubscriptionKey, watcher_handle: JoinHandle>, watcher: Watcher, } -impl SkTimelineSubscription { +impl BrokerSubscription { /// Asynchronously polls for more data from the subscription, suspending the current future if there's no data sent yet. - pub async fn fetch_data( - &mut self, - ) -> Option>> { - self.safekeeper_timeline_updates.recv().await + pub async fn fetch_data(&mut self) -> Option>> { + self.value_updates.recv().await } /// Cancels the subscription, stopping the data poller and waiting for it to shut down. @@ -93,117 +45,90 @@ impl SkTimelineSubscription { self.watcher.cancel().await.map_err(|e| { BrokerError::EtcdClient( e, - format!( - "Failed to cancel timeline subscription, kind: {:?}", - self.kind - ), + format!("Failed to cancel broker subscription, kind: {:?}", self.key), ) })?; self.watcher_handle.await.map_err(|e| { BrokerError::InternalError(format!( - "Failed to join the timeline updates task, kind: {:?}, error: {e}", - self.kind + "Failed to join the broker value updates task, kind: {:?}, error: {e}", + self.key )) })? } } -/// The subscription kind to the timeline updates from safekeeper. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct SkTimelineSubscriptionKind { - broker_etcd_prefix: String, - kind: SubscriptionKind, -} - -impl SkTimelineSubscriptionKind { - pub fn all(broker_etcd_prefix: String) -> Self { - Self { - broker_etcd_prefix, - kind: SubscriptionKind::All, - } - } - - pub fn tenant(broker_etcd_prefix: String, tenant: ZTenantId) -> Self { - Self { - broker_etcd_prefix, - kind: SubscriptionKind::Tenant(tenant), - } - } - - pub fn timeline(broker_etcd_prefix: String, timeline: ZTenantTimelineId) -> Self { - Self { - broker_etcd_prefix, - kind: SubscriptionKind::Timeline(timeline), - } - } - - /// Etcd key to use for watching a certain timeline updates from safekeepers. - pub fn watch_key(&self) -> String { - match self.kind { - SubscriptionKind::All => self.broker_etcd_prefix.to_string(), - SubscriptionKind::Tenant(tenant_id) => { - format!("{}/{tenant_id}/safekeeper", self.broker_etcd_prefix) - } - SubscriptionKind::Timeline(ZTenantTimelineId { - tenant_id, - timeline_id, - }) => format!( - "{}/{tenant_id}/{timeline_id}/safekeeper", - self.broker_etcd_prefix - ), - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -enum SubscriptionKind { - /// Get every timeline update. - All, - /// Get certain tenant timelines' updates. - Tenant(ZTenantId), - /// Get certain timeline updates. - Timeline(ZTenantTimelineId), +#[derive(Debug, thiserror::Error)] +pub enum BrokerError { + #[error("Etcd client error: {0}. Context: {1}")] + EtcdClient(etcd_client::Error, String), + #[error("Error during parsing etcd key: {0}")] + KeyNotParsed(String), + #[error("Internal error: {0}")] + InternalError(String), } /// Creates a background task to poll etcd for timeline updates from safekeepers. /// Stops and returns `Err` on any error during etcd communication. /// Watches the key changes until either the watcher is cancelled via etcd or the subscription cancellation handle, /// exiting normally in such cases. -pub async fn subscribe_to_safekeeper_timeline_updates( +/// Etcd values are parsed as json fukes into a type, specified in the generic patameter. +pub async fn subscribe_for_json_values( client: &mut Client, - subscription: SkTimelineSubscriptionKind, -) -> Result { - info!("Subscribing to timeline updates, subscription kind: {subscription:?}"); - let kind = subscription.clone(); + key: SubscriptionKey, +) -> Result, BrokerError> +where + V: DeserializeOwned + Send + 'static, +{ + subscribe_for_values(client, key, |_, value_str| { + match serde_json::from_str::(value_str) { + Ok(value) => Some(value), + Err(e) => { + error!("Failed to parse value str '{value_str}': {e}"); + None + } + } + }) + .await +} + +/// Same as [`subscribe_for_json_values`], but allows to specify a custom parser of a etcd value string. +pub async fn subscribe_for_values( + client: &mut Client, + key: SubscriptionKey, + value_parser: P, +) -> Result, BrokerError> +where + V: Send + 'static, + P: Fn(SubscriptionFullKey, &str) -> Option + Send + 'static, +{ + info!("Subscribing to broker value updates, key: {key:?}"); + let subscription_key = key.clone(); let (watcher, mut stream) = client - .watch( - subscription.watch_key(), - Some(WatchOptions::new().with_prefix()), - ) + .watch(key.watch_key(), Some(WatchOptions::new().with_prefix())) .await .map_err(|e| { BrokerError::EtcdClient( e, - format!("Failed to init the watch for subscription {subscription:?}"), + format!("Failed to init the watch for subscription {key:?}"), ) })?; - let (timeline_updates_sender, safekeeper_timeline_updates) = mpsc::unbounded_channel(); + let (value_updates_sender, value_updates_receiver) = mpsc::unbounded_channel(); let watcher_handle = tokio::spawn(async move { while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!( - "Failed to get messages from the subscription stream, kind: {:?}, error: {e}", subscription.kind + "Failed to get messages from the subscription stream, kind: {:?}, error: {e}", key.kind )))? { if resp.canceled() { info!("Watch for timeline updates subscription was canceled, exiting"); break; } - let mut timeline_updates: HashMap> = HashMap::new(); + let mut value_updates: HashMap> = HashMap::new(); // Keep track that the timeline data updates from etcd arrive in the right order. // https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas // > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering. - let mut timeline_etcd_versions: HashMap = HashMap::new(); + let mut value_etcd_versions: HashMap = HashMap::new(); let events = resp.events(); @@ -213,182 +138,78 @@ pub async fn subscribe_to_safekeeper_timeline_updates( if EventType::Put == event.event_type() { if let Some(new_etcd_kv) = event.kv() { let new_kv_version = new_etcd_kv.version(); - let (key_str, value_str) = match extract_key_value_str(new_etcd_kv) { - Ok(strs) => strs, - Err(e) => { - error!("Failed to represent etcd KV {new_etcd_kv:?} as pair of str: {e}"); - continue; - }, - }; - match parse_safekeeper_timeline(&subscription, key_str, value_str) { - Ok((zttid, timeline)) => { - match timeline_updates - .entry(zttid) - .or_default() - .entry(timeline.safekeeper_id) - { - hash_map::Entry::Occupied(mut o) => { - let old_etcd_kv_version = timeline_etcd_versions.get(&zttid).copied().unwrap_or(i64::MIN); - if old_etcd_kv_version < new_kv_version { - o.insert(timeline.info); - timeline_etcd_versions.insert(zttid,new_kv_version); - } else { - debug!("Skipping etcd timeline update due to older version compared to one that's already stored"); + match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) { + Ok(Some((key, value))) => match value_updates + .entry(key.id) + .or_default() + .entry(key.node_id) + { + hash_map::Entry::Occupied(mut o) => { + let old_etcd_kv_version = value_etcd_versions.get(&key.id).copied().unwrap_or(i64::MIN); + if old_etcd_kv_version < new_kv_version { + o.insert(value); + value_etcd_versions.insert(key.id,new_kv_version); + } else { + debug!("Skipping etcd timeline update due to older version compared to one that's already stored"); + } } - } - hash_map::Entry::Vacant(v) => { - v.insert(timeline.info); - timeline_etcd_versions.insert(zttid,new_kv_version); - } - } - } - // it is normal to get other keys when we subscribe to everything - Err(BrokerError::InvalidKey(e)) => debug!("Unexpected key for timeline update: {e}"), - Err(e) => error!("Failed to parse timeline update: {e}"), + hash_map::Entry::Vacant(v) => { + v.insert(value); + value_etcd_versions.insert(key.id,new_kv_version); + } + }, + Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"), + Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"), + Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"), }; } } } - if let Err(e) = timeline_updates_sender.send(timeline_updates) { - info!("Timeline updates sender got dropped, exiting: {e}"); - break; + if !value_updates.is_empty() { + if let Err(e) = value_updates_sender.send(value_updates) { + info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}"); + break; + } } } Ok(()) }.instrument(info_span!("etcd_broker"))); - Ok(SkTimelineSubscription { - kind, - safekeeper_timeline_updates, + Ok(BrokerSubscription { + key: subscription_key, + value_updates: value_updates_receiver, watcher_handle, watcher, }) } -fn extract_key_value_str(kv: &KeyValue) -> Result<(&str, &str), BrokerError> { - let key = kv.key_str().map_err(|e| { +fn parse_etcd_kv( + kv: &KeyValue, + value_parser: &P, + cluster_prefix: &str, +) -> Result, BrokerError> +where + P: Fn(SubscriptionFullKey, &str) -> Option, +{ + let key_str = kv.key_str().map_err(|e| { BrokerError::EtcdClient(e, "Failed to extract key str out of etcd KV".to_string()) })?; - let value = kv.value_str().map_err(|e| { + let value_str = kv.value_str().map_err(|e| { BrokerError::EtcdClient(e, "Failed to extract value str out of etcd KV".to_string()) })?; - Ok((key, value)) -} -static SK_TIMELINE_KEY_REGEX: Lazy = Lazy::new(|| { - Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]]+)$") - .expect("wrong regex for safekeeper timeline etcd key") -}); - -fn parse_safekeeper_timeline( - subscription: &SkTimelineSubscriptionKind, - key_str: &str, - value_str: &str, -) -> Result<(ZTenantTimelineId, SafekeeperTimeline), BrokerError> { - let broker_prefix = subscription.broker_etcd_prefix.as_str(); - if !key_str.starts_with(broker_prefix) { - return Err(BrokerError::InvalidKey(format!( - "KV has unexpected key '{key_str}' that does not start with broker prefix {broker_prefix}" + if !key_str.starts_with(cluster_prefix) { + return Err(BrokerError::KeyNotParsed(format!( + "KV has unexpected key '{key_str}' that does not start with cluster prefix {cluster_prefix}" ))); } - let key_part = &key_str[broker_prefix.len()..]; - let key_captures = match SK_TIMELINE_KEY_REGEX.captures(key_part) { - Some(captures) => captures, - None => { - return Err(BrokerError::InvalidKey(format!( - "KV has unexpected key part '{key_part}' that does not match required regex {}", - SK_TIMELINE_KEY_REGEX.as_str() - ))); - } - }; - let info = serde_json::from_str(value_str).map_err(|e| { - BrokerError::ParsingError(format!( - "Failed to parse '{value_str}' as safekeeper timeline info: {e}" - )) + let key = SubscriptionFullKey::from_str(&key_str[cluster_prefix.len()..]).map_err(|e| { + BrokerError::KeyNotParsed(format!("Failed to parse KV key '{key_str}': {e}")) })?; - let zttid = ZTenantTimelineId::new( - parse_capture(&key_captures, 1).map_err(BrokerError::ParsingError)?, - parse_capture(&key_captures, 2).map_err(BrokerError::ParsingError)?, - ); - let safekeeper_id = NodeId(parse_capture(&key_captures, 3).map_err(BrokerError::ParsingError)?); - - Ok(( - zttid, - SafekeeperTimeline { - safekeeper_id, - info, - }, - )) -} - -fn parse_capture(caps: &Captures, index: usize) -> Result -where - T: FromStr, - ::Err: Display, -{ - let capture_match = caps - .get(index) - .ok_or_else(|| format!("Failed to get capture match at index {index}"))? - .as_str(); - capture_match.parse().map_err(|e| { - format!( - "Failed to parse {} from {capture_match}: {e}", - std::any::type_name::() - ) - }) -} - -#[cfg(test)] -mod tests { - use utils::zid::ZTimelineId; - - use super::*; - - #[test] - fn typical_etcd_prefix_should_be_parsed() { - let prefix = "neon"; - let tenant_id = ZTenantId::generate(); - let timeline_id = ZTimelineId::generate(); - let all_subscription = SkTimelineSubscriptionKind { - broker_etcd_prefix: prefix.to_string(), - kind: SubscriptionKind::All, - }; - let tenant_subscription = SkTimelineSubscriptionKind { - broker_etcd_prefix: prefix.to_string(), - kind: SubscriptionKind::Tenant(tenant_id), - }; - let timeline_subscription = SkTimelineSubscriptionKind { - broker_etcd_prefix: prefix.to_string(), - kind: SubscriptionKind::Timeline(ZTenantTimelineId::new(tenant_id, timeline_id)), - }; - - let typical_etcd_kv_strs = [ - ( - format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/1"), - r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#, - ), - ( - format!("{prefix}/{tenant_id}/{timeline_id}/safekeeper/13"), - r#"{"last_log_term":231,"flush_lsn":"0/241BB70","commit_lsn":"0/241BB70","backup_lsn":"0/2000000","remote_consistent_lsn":"0/0","peer_horizon_lsn":"0/16960E8","safekeeper_connstr":"something.local:1234","pageserver_connstr":"postgresql://(null):@somethine.else.local:3456"}"#, - ), - ]; - - for (key_string, value_str) in typical_etcd_kv_strs { - for subscription in [ - &all_subscription, - &tenant_subscription, - &timeline_subscription, - ] { - let (id, _timeline) = - parse_safekeeper_timeline(subscription, &key_string, value_str) - .unwrap_or_else(|e| panic!("Should be able to parse etcd key string '{key_string}' and etcd value string '{value_str}' for subscription {subscription:?}, but got: {e}")); - assert_eq!(id, ZTenantTimelineId::new(tenant_id, timeline_id)); - } - } - } + Ok(value_parser(key, value_str).map(|value| (key, value))) } diff --git a/libs/etcd_broker/src/subscription_key.rs b/libs/etcd_broker/src/subscription_key.rs new file mode 100644 index 0000000000..8f8579f4e5 --- /dev/null +++ b/libs/etcd_broker/src/subscription_key.rs @@ -0,0 +1,310 @@ +//! Etcd broker keys, used in the project and shared between instances. +//! The keys are split into two categories: +//! +//! * [`SubscriptionFullKey`] full key format: `/////` +//! Always returned from etcd in this form, always start with the user key provided. +//! +//! * [`SubscriptionKey`] user input key format: always partial, since it's unknown which `node_id`'s are available. +//! Full key always starts with the user input one, due to etcd subscription properties. + +use std::{fmt::Display, str::FromStr}; + +use once_cell::sync::Lazy; +use regex::{Captures, Regex}; +use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId}; + +/// The subscription kind to the timeline updates from safekeeper. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SubscriptionKey { + /// Generic cluster prefix, allowing to use the same etcd instance by multiple logic groups. + pub cluster_prefix: String, + /// The subscription kind. + pub kind: SubscriptionKind, +} + +/// All currently possible key kinds of a etcd broker subscription. +/// Etcd works so, that every key that starts with the subbscription key given is considered matching and +/// returned as part of the subscrption. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SubscriptionKind { + /// Get every update in etcd. + All, + /// Get etcd updates for any timeiline of a certain tenant, affected by any operation from any node kind. + TenantTimelines(ZTenantId), + /// Get etcd updates for a certain timeline of a tenant, affected by any operation from any node kind. + Timeline(ZTenantTimelineId), + /// Get etcd timeline updates, specific to a certain node kind. + Node(ZTenantTimelineId, NodeKind), + /// Get etcd timeline updates for a certain operation on specific nodes. + Operation(ZTenantTimelineId, NodeKind, OperationKind), +} + +/// All kinds of nodes, able to write into etcd. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum NodeKind { + Safekeeper, + Pageserver, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OperationKind { + Safekeeper(SkOperationKind), +} + +/// Current operations, running inside the safekeeper node. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SkOperationKind { + TimelineInfo, + WalBackup, +} + +static SUBSCRIPTION_FULL_KEY_REGEX: Lazy = Lazy::new(|| { + Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/([^/]+)/([^/]+)/([[:digit:]]+)$") + .expect("wrong subscription full etcd key regex") +}); + +/// Full key, received from etcd during any of the component's work. +/// No other etcd keys are considered during system's work. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct SubscriptionFullKey { + pub id: ZTenantTimelineId, + pub node_kind: NodeKind, + pub operation: OperationKind, + pub node_id: NodeId, +} + +impl SubscriptionKey { + /// Subscribes for all etcd updates. + pub fn all(cluster_prefix: String) -> Self { + SubscriptionKey { + cluster_prefix, + kind: SubscriptionKind::All, + } + } + + /// Subscribes to a given timeline info updates from safekeepers. + pub fn sk_timeline_info(cluster_prefix: String, timeline: ZTenantTimelineId) -> Self { + Self { + cluster_prefix, + kind: SubscriptionKind::Operation( + timeline, + NodeKind::Safekeeper, + OperationKind::Safekeeper(SkOperationKind::TimelineInfo), + ), + } + } + + /// Subscribes to all timeine updates during specific operations, running on the corresponding nodes. + pub fn operation( + cluster_prefix: String, + timeline: ZTenantTimelineId, + node_kind: NodeKind, + operation: OperationKind, + ) -> Self { + Self { + cluster_prefix, + kind: SubscriptionKind::Operation(timeline, node_kind, operation), + } + } + + /// Etcd key to use for watching a certain timeline updates from safekeepers. + pub fn watch_key(&self) -> String { + let cluster_prefix = &self.cluster_prefix; + match self.kind { + SubscriptionKind::All => cluster_prefix.to_string(), + SubscriptionKind::TenantTimelines(tenant_id) => { + format!("{cluster_prefix}/{tenant_id}") + } + SubscriptionKind::Timeline(id) => { + format!("{cluster_prefix}/{id}") + } + SubscriptionKind::Node(id, node_kind) => { + format!("{cluster_prefix}/{id}/{node_kind}") + } + SubscriptionKind::Operation(id, node_kind, operation_kind) => { + format!("{cluster_prefix}/{id}/{node_kind}/{operation_kind}") + } + } + } +} + +impl Display for OperationKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OperationKind::Safekeeper(o) => o.fmt(f), + } + } +} + +impl FromStr for OperationKind { + type Err = String; + + fn from_str(operation_kind_str: &str) -> Result { + match operation_kind_str { + "timeline_info" => Ok(OperationKind::Safekeeper(SkOperationKind::TimelineInfo)), + "wal_backup" => Ok(OperationKind::Safekeeper(SkOperationKind::WalBackup)), + _ => Err(format!("Unknown operation kind: {operation_kind_str}")), + } + } +} + +impl Display for SubscriptionFullKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + id, + node_kind, + operation, + node_id, + } = self; + write!(f, "{id}/{node_kind}/{operation}/{node_id}") + } +} + +impl FromStr for SubscriptionFullKey { + type Err = String; + + fn from_str(subscription_kind_str: &str) -> Result { + let key_captures = match SUBSCRIPTION_FULL_KEY_REGEX.captures(subscription_kind_str) { + Some(captures) => captures, + None => { + return Err(format!( + "Subscription kind str does not match a subscription full key regex {}", + SUBSCRIPTION_FULL_KEY_REGEX.as_str() + )); + } + }; + + Ok(Self { + id: ZTenantTimelineId::new( + parse_capture(&key_captures, 1)?, + parse_capture(&key_captures, 2)?, + ), + node_kind: parse_capture(&key_captures, 3)?, + operation: parse_capture(&key_captures, 4)?, + node_id: NodeId(parse_capture(&key_captures, 5)?), + }) + } +} + +fn parse_capture(caps: &Captures, index: usize) -> Result +where + T: FromStr, + ::Err: Display, +{ + let capture_match = caps + .get(index) + .ok_or_else(|| format!("Failed to get capture match at index {index}"))? + .as_str(); + capture_match.parse().map_err(|e| { + format!( + "Failed to parse {} from {capture_match}: {e}", + std::any::type_name::() + ) + }) +} + +impl Display for NodeKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Safekeeper => write!(f, "safekeeper"), + Self::Pageserver => write!(f, "pageserver"), + } + } +} + +impl FromStr for NodeKind { + type Err = String; + + fn from_str(node_kind_str: &str) -> Result { + match node_kind_str { + "safekeeper" => Ok(Self::Safekeeper), + "pageserver" => Ok(Self::Pageserver), + _ => Err(format!("Invalid node kind: {node_kind_str}")), + } + } +} + +impl Display for SkOperationKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::TimelineInfo => write!(f, "timeline_info"), + Self::WalBackup => write!(f, "wal_backup"), + } + } +} + +impl FromStr for SkOperationKind { + type Err = String; + + fn from_str(operation_str: &str) -> Result { + match operation_str { + "timeline_info" => Ok(Self::TimelineInfo), + "wal_backup" => Ok(Self::WalBackup), + _ => Err(format!("Invalid operation: {operation_str}")), + } + } +} + +#[cfg(test)] +mod tests { + use utils::zid::ZTimelineId; + + use super::*; + + #[test] + fn full_cluster_key_parsing() { + let prefix = "neon"; + let node_kind = NodeKind::Safekeeper; + let operation_kind = OperationKind::Safekeeper(SkOperationKind::WalBackup); + let tenant_id = ZTenantId::generate(); + let timeline_id = ZTimelineId::generate(); + let id = ZTenantTimelineId::new(tenant_id, timeline_id); + let node_id = NodeId(1); + + let timeline_subscription_keys = [ + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::All, + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::TenantTimelines(tenant_id), + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::Timeline(id), + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::Node(id, node_kind), + }, + SubscriptionKey { + cluster_prefix: prefix.to_string(), + kind: SubscriptionKind::Operation(id, node_kind, operation_kind), + }, + ]; + + let full_key_string = format!( + "{}/{node_id}", + timeline_subscription_keys.last().unwrap().watch_key() + ); + + for key in timeline_subscription_keys { + assert!(full_key_string.starts_with(&key.watch_key()), "Full key '{full_key_string}' should start with any of the keys, keys, but {key:?} did not match"); + } + + let full_key = SubscriptionFullKey::from_str(&full_key_string).unwrap_or_else(|e| { + panic!("Failed to parse {full_key_string} as a subscription full key: {e}") + }); + + assert_eq!( + full_key, + SubscriptionFullKey { + id, + node_kind, + operation: operation_kind, + node_id + } + ) + } +} diff --git a/libs/etcd_broker/src/subscription_value.rs b/libs/etcd_broker/src/subscription_value.rs new file mode 100644 index 0000000000..d3e2011761 --- /dev/null +++ b/libs/etcd_broker/src/subscription_value.rs @@ -0,0 +1,35 @@ +//! Module for the values to put into etcd. + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; +use utils::lsn::Lsn; + +/// Data about safekeeper's timeline. Fields made optional for easy migrations. +#[serde_as] +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SkTimelineInfo { + /// Term of the last entry. + pub last_log_term: Option, + /// LSN of the last record. + #[serde_as(as = "Option")] + #[serde(default)] + pub flush_lsn: Option, + /// Up to which LSN safekeeper regards its WAL as committed. + #[serde_as(as = "Option")] + #[serde(default)] + pub commit_lsn: Option, + /// LSN up to which safekeeper has backed WAL. + #[serde_as(as = "Option")] + #[serde(default)] + pub backup_lsn: Option, + /// LSN of last checkpoint uploaded by pageserver. + #[serde_as(as = "Option")] + #[serde(default)] + pub remote_consistent_lsn: Option, + #[serde_as(as = "Option")] + #[serde(default)] + pub peer_horizon_lsn: Option, + /// A connection string to use for WAL receiving. + #[serde(default)] + pub safekeeper_connstr: Option, +} diff --git a/libs/postgres_ffi/wal_generate/src/lib.rs b/libs/postgres_ffi/wal_generate/src/lib.rs index 3b19afb826..2b3f5ef703 100644 --- a/libs/postgres_ffi/wal_generate/src/lib.rs +++ b/libs/postgres_ffi/wal_generate/src/lib.rs @@ -4,6 +4,7 @@ use log::*; use postgres::types::PgLsn; use postgres::Client; use std::cmp::Ordering; +use std::fs; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::Instant; @@ -69,6 +70,12 @@ impl Conf { pub fn start_server(&self) -> Result { info!("Starting Postgres server in {:?}", self.datadir); + let log_file = fs::File::create(self.datadir.join("pg.log")).with_context(|| { + format!( + "Failed to create pg.log file in directory {}", + self.datadir.display() + ) + })?; let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols) let unix_socket_dir_path = unix_socket_dir.path().to_owned(); let server_process = self @@ -84,7 +91,7 @@ impl Conf { // Disable background processes as much as possible .args(&["-c", "wal_writer_delay=10s"]) .args(&["-c", "autovacuum=off"]) - .stderr(Stdio::null()) + .stderr(Stdio::from(log_file)) .spawn()?; let server = PostgresServer { process: server_process, diff --git a/libs/utils/src/pq_proto.rs b/libs/utils/src/pq_proto.rs index 599af3fc68..0a320f123c 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/utils/src/pq_proto.rs @@ -926,10 +926,10 @@ impl<'a> BeMessage<'a> { } } -// Zenith extension of postgres replication protocol -// See ZENITH_STATUS_UPDATE_TAG_BYTE +// Neon extension of postgres replication protocol +// See NEON_STATUS_UPDATE_TAG_BYTE #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -pub struct ZenithFeedback { +pub struct ReplicationFeedback { // Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, // Parts of StandbyStatusUpdate we resend to compute via safekeeper @@ -939,13 +939,13 @@ pub struct ZenithFeedback { pub ps_replytime: SystemTime, } -// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback. +// NOTE: Do not forget to increment this number when adding new fields to ReplicationFeedback. // Do not remove previously available fields because this might be backwards incompatible. -pub const ZENITH_FEEDBACK_FIELDS_NUMBER: u8 = 5; +pub const REPLICATION_FEEDBACK_FIELDS_NUMBER: u8 = 5; -impl ZenithFeedback { - pub fn empty() -> ZenithFeedback { - ZenithFeedback { +impl ReplicationFeedback { + pub fn empty() -> ReplicationFeedback { + ReplicationFeedback { current_timeline_size: 0, ps_writelsn: 0, ps_applylsn: 0, @@ -954,7 +954,7 @@ impl ZenithFeedback { } } - // Serialize ZenithFeedback using custom format + // Serialize ReplicationFeedback using custom format // to support protocol extensibility. // // Following layout is used: @@ -965,7 +965,7 @@ impl ZenithFeedback { // uint32 - value length in bytes // value itself pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> { - buf.put_u8(ZENITH_FEEDBACK_FIELDS_NUMBER); // # of keys + buf.put_u8(REPLICATION_FEEDBACK_FIELDS_NUMBER); // # of keys write_cstr(&Bytes::from("current_timeline_size"), buf)?; buf.put_i32(8); buf.put_u64(self.current_timeline_size); @@ -992,9 +992,9 @@ impl ZenithFeedback { Ok(()) } - // Deserialize ZenithFeedback message - pub fn parse(mut buf: Bytes) -> ZenithFeedback { - let mut zf = ZenithFeedback::empty(); + // Deserialize ReplicationFeedback message + pub fn parse(mut buf: Bytes) -> ReplicationFeedback { + let mut zf = ReplicationFeedback::empty(); let nfields = buf.get_u8(); let mut i = 0; while i < nfields { @@ -1035,14 +1035,14 @@ impl ZenithFeedback { _ => { let len = buf.get_i32(); warn!( - "ZenithFeedback parse. unknown key {} of len {}. Skip it.", + "ReplicationFeedback parse. unknown key {} of len {}. Skip it.", key, len ); buf.advance(len as usize); } } } - trace!("ZenithFeedback parsed is {:?}", zf); + trace!("ReplicationFeedback parsed is {:?}", zf); zf } } @@ -1052,8 +1052,8 @@ mod tests { use super::*; #[test] - fn test_zenithfeedback_serialization() { - let mut zf = ZenithFeedback::empty(); + fn test_replication_feedback_serialization() { + let mut zf = ReplicationFeedback::empty(); // Fill zf with some values zf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, @@ -1062,13 +1062,13 @@ mod tests { let mut data = BytesMut::new(); zf.serialize(&mut data).unwrap(); - let zf_parsed = ZenithFeedback::parse(data.freeze()); + let zf_parsed = ReplicationFeedback::parse(data.freeze()); assert_eq!(zf, zf_parsed); } #[test] - fn test_zenithfeedback_unknown_key() { - let mut zf = ZenithFeedback::empty(); + fn test_replication_feedback_unknown_key() { + let mut zf = ReplicationFeedback::empty(); // Fill zf with some values zf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, @@ -1079,7 +1079,7 @@ mod tests { // Add an extra field to the buffer and adjust number of keys if let Some(first) = data.first_mut() { - *first = ZENITH_FEEDBACK_FIELDS_NUMBER + 1; + *first = REPLICATION_FEEDBACK_FIELDS_NUMBER + 1; } write_cstr(&Bytes::from("new_field_one"), &mut data).unwrap(); @@ -1087,7 +1087,7 @@ mod tests { data.put_u64(42); // Parse serialized data and check that new field is not parsed - let zf_parsed = ZenithFeedback::parse(data.freeze()); + let zf_parsed = ReplicationFeedback::parse(data.freeze()); assert_eq!(zf, zf_parsed); } diff --git a/pageserver/README.md b/pageserver/README.md index cf841d1e46..cb752881af 100644 --- a/pageserver/README.md +++ b/pageserver/README.md @@ -69,7 +69,7 @@ Repository The repository stores all the page versions, or WAL records needed to reconstruct them. Each tenant has a separate Repository, which is -stored in the .zenith/tenants/ directory. +stored in the .neon/tenants/ directory. Repository is an abstract trait, defined in `repository.rs`. It is implemented by the LayeredRepository object in @@ -92,7 +92,7 @@ Each repository also has a WAL redo manager associated with it, see records, whenever we need to reconstruct a page version from WAL to satisfy a GetPage@LSN request, or to avoid accumulating too much WAL for a page. The WAL redo manager uses a Postgres process running in -special zenith wal-redo mode to do the actual WAL redo, and +special Neon wal-redo mode to do the actual WAL redo, and communicates with the process using a pipe. diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index ac90500b97..1d407a29bc 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> { return Ok(()); } - let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); + let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".neon")); let workdir = workdir .canonicalize() .with_context(|| format!("Error opening workdir '{}'", workdir.display()))?; diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 7696f0d021..5c5b03268a 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -4,7 +4,7 @@ //! The functions here are responsible for locating the correct layer for the //! get/put call, tracing timeline branching history as needed. //! -//! The files are stored in the .zenith/tenants//timelines/ +//! The files are stored in the .neon/tenants//timelines/ //! directory. See layered_repository/README for how the files are managed. //! In addition to the layer files, there is a metadata file in the same //! directory that contains information about the timeline, in particular its @@ -148,7 +148,7 @@ lazy_static! { .expect("failed to define a metric"); } -/// Parts of the `.zenith/tenants//timelines/` directory prefix. +/// Parts of the `.neon/tenants//timelines/` directory prefix. pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; /// diff --git a/pageserver/src/layered_repository/README.md b/pageserver/src/layered_repository/README.md index 15040d21b2..bd5fa59257 100644 --- a/pageserver/src/layered_repository/README.md +++ b/pageserver/src/layered_repository/README.md @@ -123,7 +123,7 @@ The files are called "layer files". Each layer file covers a range of keys, and a range of LSNs (or a single LSN, in case of image layers). You can think of it as a rectangle in the two-dimensional key-LSN space. The layer files for each timeline are stored in the timeline's subdirectory under -`.zenith/tenants//timelines`. +`.neon/tenants//timelines`. There are two kind of layer files: images, and delta layers. An image file contains a snapshot of all keys at a particular LSN, whereas a delta file @@ -178,7 +178,7 @@ version, and how branching and GC works is still valid. The full path of a delta file looks like this: ``` - .zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000 + .neon/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000 ``` For simplicity, the examples below use a simplified notation for the diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index f687f24c6e..756c3b8191 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -197,7 +197,7 @@ impl Display for TimelineSyncStatusUpdate { } /// -/// A repository corresponds to one .zenith directory. One repository holds multiple +/// A repository corresponds to one .neon directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. pub trait Repository: Send + Sync { type Timeline: Timeline; diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index a140149c23..5fe2cde3b7 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -186,8 +186,8 @@ use crate::{ }; use metrics::{ - register_histogram_vec, register_int_counter, register_int_gauge, HistogramVec, IntCounter, - IntGauge, + register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, + HistogramVec, IntCounter, IntCounterVec, IntGauge, }; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; @@ -208,14 +208,17 @@ lazy_static! { static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!( "pageserver_remote_storage_image_sync_seconds", "Time took to synchronize (download or upload) a whole pageserver image. \ - Grouped by `operation_kind` (upload|download) and `status` (success|failure)", - &["operation_kind", "status"], - vec![ - 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 12.5, 15.0, 17.5, 20.0 - ] + Grouped by tenant and timeline ids, `operation_kind` (upload|download) and `status` (success|failure)", + &["tenant_id", "timeline_id", "operation_kind", "status"], + vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 3.0, 10.0, 20.0] ) .expect("failed to register pageserver image sync time histogram vec"); + static ref REMOTE_INDEX_UPLOAD: IntCounterVec = register_int_counter_vec!( + "pageserver_remote_storage_remote_index_uploads_total", + "Number of remote index uploads", + &["tenant_id", "timeline_id"], + ) + .expect("failed to register pageserver remote index upload vec"); } static SYNC_QUEUE: OnceCell = OnceCell::new(); @@ -1146,19 +1149,19 @@ where .await { DownloadedTimeline::Abort => { - register_sync_status(sync_start, task_name, None); + register_sync_status(sync_id, sync_start, task_name, None); if let Err(e) = index.write().await.set_awaits_download(&sync_id, false) { error!("Timeline {sync_id} was expected to be in the remote index after a download attempt, but it's absent: {e:?}"); } } DownloadedTimeline::FailedAndRescheduled => { - register_sync_status(sync_start, task_name, Some(false)); + register_sync_status(sync_id, sync_start, task_name, Some(false)); } DownloadedTimeline::Successful(mut download_data) => { match update_local_metadata(conf, sync_id, current_remote_timeline).await { Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) { Ok(()) => { - register_sync_status(sync_start, task_name, Some(true)); + register_sync_status(sync_id, sync_start, task_name, Some(true)); return Some(TimelineSyncStatusUpdate::Downloaded); } Err(e) => { @@ -1169,7 +1172,7 @@ where error!("Failed to update local timeline metadata: {e:?}"); download_data.retries += 1; sync_queue.push(sync_id, SyncTask::Download(download_data)); - register_sync_status(sync_start, task_name, Some(false)); + register_sync_status(sync_id, sync_start, task_name, Some(false)); } } } @@ -1265,14 +1268,14 @@ async fn delete_timeline_data( error!("Failed to update remote timeline {sync_id}: {e:?}"); new_delete_data.retries += 1; sync_queue.push(sync_id, SyncTask::Delete(new_delete_data)); - register_sync_status(sync_start, task_name, Some(false)); + register_sync_status(sync_id, sync_start, task_name, Some(false)); return; } } timeline_delete.deletion_registered = true; let sync_status = delete_timeline_layers(storage, sync_queue, sync_id, new_delete_data).await; - register_sync_status(sync_start, task_name, Some(sync_status)); + register_sync_status(sync_id, sync_start, task_name, Some(sync_status)); } async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result { @@ -1306,7 +1309,7 @@ async fn upload_timeline_data( .await { UploadedTimeline::FailedAndRescheduled => { - register_sync_status(sync_start, task_name, Some(false)); + register_sync_status(sync_id, sync_start, task_name, Some(false)); return; } UploadedTimeline::Successful(upload_data) => upload_data, @@ -1325,13 +1328,13 @@ async fn upload_timeline_data( .await { Ok(()) => { - register_sync_status(sync_start, task_name, Some(true)); + register_sync_status(sync_id, sync_start, task_name, Some(true)); } Err(e) => { error!("Failed to update remote timeline {sync_id}: {e:?}"); uploaded_data.retries += 1; sync_queue.push(sync_id, SyncTask::Upload(uploaded_data)); - register_sync_status(sync_start, task_name, Some(false)); + register_sync_status(sync_id, sync_start, task_name, Some(false)); } } } @@ -1421,7 +1424,14 @@ where IndexPart::from_remote_timeline(&timeline_path, updated_remote_timeline) .context("Failed to create an index part from the updated remote timeline")?; - info!("Uploading remote index for the timeline"); + debug!("Uploading remote index for the timeline"); + REMOTE_INDEX_UPLOAD + .with_label_values(&[ + &sync_id.tenant_id.to_string(), + &sync_id.timeline_id.to_string(), + ]) + .inc(); + upload_index_part(conf, storage, sync_id, new_index_part) .await .context("Failed to upload new index part") @@ -1590,12 +1600,24 @@ fn compare_local_and_remote_timeline( (initial_timeline_status, awaits_download) } -fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option) { +fn register_sync_status( + sync_id: ZTenantTimelineId, + sync_start: Instant, + sync_name: &str, + sync_status: Option, +) { let secs_elapsed = sync_start.elapsed().as_secs_f64(); - info!("Processed a sync task in {secs_elapsed:.2} seconds"); + debug!("Processed a sync task in {secs_elapsed:.2} seconds"); + + let tenant_id = sync_id.tenant_id.to_string(); + let timeline_id = sync_id.timeline_id.to_string(); match sync_status { - Some(true) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "success"]), - Some(false) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "failure"]), + Some(true) => { + IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "success"]) + } + Some(false) => { + IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "failure"]) + } None => return, } .observe(secs_elapsed) diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs index 2f88fa95ba..f9ab3b7471 100644 --- a/pageserver/src/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -4,6 +4,7 @@ use std::{fmt::Debug, path::PathBuf}; use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; +use lazy_static::lazy_static; use remote_storage::RemoteStorage; use tokio::fs; use tracing::{debug, error, info, warn}; @@ -17,6 +18,16 @@ use super::{ use crate::{ config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask, }; +use metrics::{register_int_counter_vec, IntCounterVec}; + +lazy_static! { + static ref NO_LAYERS_UPLOAD: IntCounterVec = register_int_counter_vec!( + "pageserver_remote_storage_no_layers_uploads_total", + "Number of skipped uploads due to no layers", + &["tenant_id", "timeline_id"], + ) + .expect("failed to register pageserver no layers upload vec"); +} /// Serializes and uploads the given index part data to the remote storage. pub(super) async fn upload_index_part( @@ -102,7 +113,13 @@ where .collect::>(); if layers_to_upload.is_empty() { - info!("No layers to upload after filtering, aborting"); + debug!("No layers to upload after filtering, aborting"); + NO_LAYERS_UPLOAD + .with_label_values(&[ + &sync_id.tenant_id.to_string(), + &sync_id.timeline_id.to_string(), + ]) + .inc(); return UploadedTimeline::Successful(upload_data); } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 202a13545d..82401e1d8c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -50,7 +50,10 @@ use crate::thread_mgr::ThreadKind; use crate::{thread_mgr, DatadirTimelineImpl}; use anyhow::{ensure, Context}; use chrono::{NaiveDateTime, Utc}; -use etcd_broker::{Client, SkTimelineInfo, SkTimelineSubscription, SkTimelineSubscriptionKind}; +use etcd_broker::{ + subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription, + Client, +}; use itertools::Itertools; use once_cell::sync::Lazy; use std::cell::Cell; @@ -68,7 +71,7 @@ use tokio::{ use tracing::*; use url::Url; use utils::lsn::Lsn; -use utils::pq_proto::ZenithFeedback; +use utils::pq_proto::ReplicationFeedback; use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}; use self::connection_handler::{WalConnectionEvent, WalReceiverConnection}; @@ -403,7 +406,7 @@ async fn timeline_wal_broker_loop_step( // Endlessly try to subscribe for broker updates for a given timeline. // If there are no safekeepers to maintain the lease, the timeline subscription will be inavailable in the broker and the operation will fail constantly. // This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway. - let mut broker_subscription: SkTimelineSubscription; + let mut broker_subscription: BrokerSubscription; let mut attempt = 0; loop { select! { @@ -420,9 +423,9 @@ async fn timeline_wal_broker_loop_step( info!("Broker subscription loop cancelled, shutting down"); return Ok(ControlFlow::Break(())); }, - new_subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates( + new_subscription = etcd_broker::subscribe_for_json_values( etcd_client, - SkTimelineSubscriptionKind::timeline(broker_prefix.to_owned(), id), + SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id), ) .instrument(info_span!("etcd_subscription")) => match new_subscription { Ok(new_subscription) => { @@ -518,7 +521,7 @@ struct WalConnectionData { safekeeper_id: NodeId, connection: WalReceiverConnection, connection_init_time: NaiveDateTime, - last_wal_receiver_data: Option<(ZenithFeedback, NaiveDateTime)>, + last_wal_receiver_data: Option<(ReplicationFeedback, NaiveDateTime)>, } #[derive(Debug, PartialEq, Eq)] @@ -843,7 +846,7 @@ mod tests { .await; let now = Utc::now().naive_utc(); dummy_connection_data.last_wal_receiver_data = Some(( - ZenithFeedback { + ReplicationFeedback { current_timeline_size: 1, ps_writelsn: 1, ps_applylsn: current_lsn, @@ -1014,7 +1017,7 @@ mod tests { let time_over_threshold = Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; dummy_connection_data.last_wal_receiver_data = Some(( - ZenithFeedback { + ReplicationFeedback { current_timeline_size: 1, ps_writelsn: current_lsn.0, ps_applylsn: 1, diff --git a/pageserver/src/walreceiver/connection_handler.rs b/pageserver/src/walreceiver/connection_handler.rs index aaccee9730..97b9b8cc9b 100644 --- a/pageserver/src/walreceiver/connection_handler.rs +++ b/pageserver/src/walreceiver/connection_handler.rs @@ -19,7 +19,7 @@ use tokio_stream::StreamExt; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use utils::{ lsn::Lsn, - pq_proto::ZenithFeedback, + pq_proto::ReplicationFeedback, zid::{NodeId, ZTenantTimelineId}, }; @@ -33,7 +33,7 @@ use crate::{ #[derive(Debug, Clone)] pub enum WalConnectionEvent { Started, - NewWal(ZenithFeedback), + NewWal(ReplicationFeedback), End(Result<(), String>), } @@ -328,7 +328,7 @@ async fn handle_walreceiver_connection( // Send zenith feedback message. // Regular standby_status_update fields are put into this message. - let zenith_status_update = ZenithFeedback { + let zenith_status_update = ReplicationFeedback { current_timeline_size: timeline.get_current_logical_size() as u64, ps_writelsn: write_lsn, ps_flushlsn: flush_lsn, diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 5ce2591ff3..6c9c59c76b 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -10,6 +10,7 @@ use remote_storage::RemoteStorageConfig; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::thread; use tokio::sync::mpsc; use toml_edit::Document; @@ -27,6 +28,7 @@ use safekeeper::timeline::GlobalTimelines; use safekeeper::wal_backup; use safekeeper::wal_service; use safekeeper::SafeKeeperConf; +use utils::auth::JwtAuth; use utils::{ http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener, zid::NodeId, @@ -132,6 +134,12 @@ fn main() -> anyhow::Result<()> { .default_missing_value("true") .help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."), ) + .arg( + Arg::new("auth-validation-public-key-path") + .long("auth-validation-public-key-path") + .takes_value(true) + .help("Path to an RSA .pem public key which is used to check JWT tokens") + ) .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { @@ -204,6 +212,10 @@ fn main() -> anyhow::Result<()> { .parse() .context("failed to parse bool enable-s3-offload bool")?; + conf.auth_validation_public_key_path = arg_matches + .value_of("auth-validation-public-key-path") + .map(PathBuf::from); + start_safekeeper(conf, given_id, arg_matches.is_present("init")) } @@ -239,6 +251,19 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo e })?; + let auth = match conf.auth_validation_public_key_path.as_ref() { + None => { + info!("Auth is disabled"); + None + } + Some(path) => { + info!("Loading JWT auth key from {}", path.display()); + Some(Arc::new( + JwtAuth::from_key_path(path).context("failed to load the auth key")?, + )) + } + }; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -280,8 +305,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("http_endpoint_thread".into()) .spawn(|| { - // TODO authentication - let router = http::make_router(conf_); + let router = http::make_router(conf_, auth); endpoint::serve_thread_main( router, http_listener, @@ -295,6 +319,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let safekeeper_thread = thread::Builder::new() .name("Safekeeper thread".into()) .spawn(|| { + // TODO: add auth if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) { info!("safekeeper thread terminated: {e}"); } @@ -309,6 +334,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("broker thread".into()) .spawn(|| { + // TODO: add auth? broker::thread_main(conf_); })?, ); @@ -321,6 +347,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("WAL removal thread".into()) .spawn(|| { + // TODO: add auth? remove_wal::thread_main(conf_); })?, ); @@ -330,6 +357,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("wal backup launcher thread".into()) .spawn(move || { + // TODO: add auth? wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx); })?, ); diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 3d75fec587..169b106aa9 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -4,9 +4,7 @@ use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; -use etcd_broker::Client; -use etcd_broker::PutOptions; -use etcd_broker::SkTimelineSubscriptionKind; +use etcd_broker::subscription_value::SkTimelineInfo; use std::time::Duration; use tokio::spawn; use tokio::task::JoinHandle; @@ -15,6 +13,10 @@ use tracing::*; use url::Url; use crate::{timeline::GlobalTimelines, SafeKeeperConf}; +use etcd_broker::{ + subscription_key::{OperationKind, SkOperationKind, SubscriptionKey}, + Client, PutOptions, +}; use utils::zid::{NodeId, ZTenantTimelineId}; const RETRY_INTERVAL_MSEC: u64 = 1000; @@ -43,7 +45,7 @@ fn timeline_safekeeper_path( ) -> String { format!( "{}/{sk_id}", - SkTimelineSubscriptionKind::timeline(broker_etcd_prefix, zttid).watch_key() + SubscriptionKey::sk_timeline_info(broker_etcd_prefix, zttid).watch_key() ) } @@ -148,14 +150,6 @@ async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { } } -pub fn get_campaign_name( - election_name: &str, - broker_prefix: &str, - id: ZTenantTimelineId, -) -> String { - format!("{broker_prefix}/{id}/{election_name}") -} - pub fn get_candiate_name(system_id: NodeId) -> String { format!("id_{system_id}") } @@ -209,9 +203,20 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { let mut client = Client::connect(&conf.broker_endpoints, None).await?; - let mut subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates( + let mut subscription = etcd_broker::subscribe_for_values( &mut client, - SkTimelineSubscriptionKind::all(conf.broker_etcd_prefix.clone()), + SubscriptionKey::all(conf.broker_etcd_prefix.clone()), + |full_key, value_str| { + if full_key.operation == OperationKind::Safekeeper(SkOperationKind::TimelineInfo) { + match serde_json::from_str::(value_str) { + Ok(new_info) => return Some(new_info), + Err(e) => { + error!("Failed to parse timeline info from value str '{value_str}': {e}") + } + } + } + None + }, ) .await .context("failed to subscribe for safekeeper info")?; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index b0197a9a2a..33581c6c31 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,9 +1,9 @@ -use etcd_broker::SkTimelineInfo; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Body, Request, Response, StatusCode, Uri}; +use once_cell::sync::Lazy; use serde::Serialize; use serde::Serializer; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::sync::Arc; @@ -11,9 +11,11 @@ use crate::safekeeper::Term; use crate::safekeeper::TermHistory; use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult}; use crate::SafeKeeperConf; +use etcd_broker::subscription_value::SkTimelineInfo; use utils::{ + auth::JwtAuth, http::{ - endpoint, + endpoint::{self, auth_middleware, check_permission}, error::ApiError, json::{json_request, json_response}, request::{ensure_no_body, parse_request_param}, @@ -32,6 +34,7 @@ struct SafekeeperStatus { /// Healthcheck handler. async fn status_handler(request: Request) -> Result, ApiError> { + check_permission(&request, None)?; let conf = get_conf(&request); let status = SafekeeperStatus { id: conf.my_id }; json_response(StatusCode::OK, status) @@ -91,6 +94,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result, ) -> Result, ApiError> { let tenant_id = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; ensure_no_body(&mut request).await?; json_response( StatusCode::OK, @@ -178,6 +185,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result) -> Result RouterBuilder { - let router = endpoint::make_router(); +pub fn make_router( + conf: SafeKeeperConf, + auth: Option>, +) -> RouterBuilder { + let mut router = endpoint::make_router(); + if auth.is_some() { + router = router.middleware(auth_middleware(|request| { + #[allow(clippy::mutable_key_type)] + static ALLOWLIST_ROUTES: Lazy> = + Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect()); + if ALLOWLIST_ROUTES.contains(request.uri()) { + None + } else { + // Option> is always provided as data below, hence unwrap(). + request.data::>>().unwrap().as_deref() + } + })) + } router .data(Arc::new(conf)) + .data(auth) .get("/v1/status", status_handler) .get( "/v1/timeline/:tenant_id/:timeline_id", timeline_status_handler, ) + // Will be used in the future instead of implicit timeline creation .post("/v1/timeline", timeline_create_handler) .delete( "/v1/tenant/:tenant_id/timeline/:timeline_id", diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index f328d2e85a..0335d61d3f 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -57,6 +57,7 @@ pub struct SafeKeeperConf { pub my_id: NodeId, pub broker_endpoints: Vec, pub broker_etcd_prefix: String, + pub auth_validation_public_key_path: Option, } impl SafeKeeperConf { @@ -88,6 +89,7 @@ impl Default for SafeKeeperConf { broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS, wal_backup_enabled: true, + auth_validation_public_key_path: None, } } } diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 5a2e5f125f..fe4f9d231c 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -242,9 +242,9 @@ impl Collector for TimelineCollector { let timeline_id = tli.zttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; - let mut most_advanced: Option = None; + let mut most_advanced: Option = None; for replica in tli.replicas.iter() { - if let Some(replica_feedback) = replica.zenith_feedback { + if let Some(replica_feedback) = replica.pageserver_feedback { if let Some(current) = most_advanced { if current.ps_writelsn < replica_feedback.ps_writelsn { most_advanced = Some(replica_feedback); diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 1c00af7043..7986fa5834 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -4,7 +4,7 @@ use anyhow::{bail, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use etcd_broker::SkTimelineInfo; +use etcd_broker::subscription_value::SkTimelineInfo; use postgres_ffi::xlog_utils::TimeLineID; use postgres_ffi::xlog_utils::XLogSegNo; @@ -23,7 +23,7 @@ use postgres_ffi::xlog_utils::MAX_SEND_SIZE; use utils::{ bin_ser::LeSer, lsn::Lsn, - pq_proto::{SystemId, ZenithFeedback}, + pq_proto::{ReplicationFeedback, SystemId}, zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}, }; @@ -348,7 +348,7 @@ pub struct AppendResponse { // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, - pub zenith_feedback: ZenithFeedback, + pub pageserver_feedback: ReplicationFeedback, } impl AppendResponse { @@ -358,7 +358,7 @@ impl AppendResponse { flush_lsn: Lsn(0), commit_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), - zenith_feedback: ZenithFeedback::empty(), + pageserver_feedback: ReplicationFeedback::empty(), } } } @@ -476,7 +476,7 @@ impl AcceptorProposerMessage { buf.put_u64_le(msg.hs_feedback.xmin); buf.put_u64_le(msg.hs_feedback.catalog_xmin); - msg.zenith_feedback.serialize(buf)? + msg.pageserver_feedback.serialize(buf)? } } @@ -677,7 +677,7 @@ where commit_lsn: self.state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), - zenith_feedback: ZenithFeedback::empty(), + pageserver_feedback: ReplicationFeedback::empty(), }; trace!("formed AppendResponse {:?}", ar); ar diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index fd82a55efa..11e5b963c9 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -21,7 +21,7 @@ use utils::{ bin_ser::BeSer, lsn::Lsn, postgres_backend::PostgresBackend, - pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback}, + pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody}, sock_split::ReadStream, }; @@ -29,7 +29,7 @@ use utils::{ const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; // zenith extension of replication protocol -const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; +const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; type FullTransactionId = u64; @@ -122,15 +122,15 @@ impl ReplicationConn { warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet."); // timeline.update_replica_state(replica_id, Some(state)); } - Some(ZENITH_STATUS_UPDATE_TAG_BYTE) => { + Some(NEON_STATUS_UPDATE_TAG_BYTE) => { // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. let buf = Bytes::copy_from_slice(&m[9..]); - let reply = ZenithFeedback::parse(buf); + let reply = ReplicationFeedback::parse(buf); - trace!("ZenithFeedback is {:?}", reply); - // Only pageserver sends ZenithFeedback, so set the flag. + trace!("ReplicationFeedback is {:?}", reply); + // Only pageserver sends ReplicationFeedback, so set the flag. // This replica is the source of information to resend to compute. - state.zenith_feedback = Some(reply); + state.pageserver_feedback = Some(reply); timeline.update_replica_state(replica_id, state); } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 30c94f2543..39f2593dbc 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context, Result}; -use etcd_broker::SkTimelineInfo; +use etcd_broker::subscription_value::SkTimelineInfo; use lazy_static::lazy_static; use postgres_ffi::xlog_utils::XLogSegNo; @@ -21,7 +21,7 @@ use tracing::*; use utils::{ lsn::Lsn, - pq_proto::ZenithFeedback, + pq_proto::ReplicationFeedback, zid::{NodeId, ZTenantId, ZTenantTimelineId}, }; @@ -48,8 +48,8 @@ pub struct ReplicaState { pub remote_consistent_lsn: Lsn, /// combined hot standby feedback from all replicas pub hs_feedback: HotStandbyFeedback, - /// Zenith specific feedback received from pageserver, if any - pub zenith_feedback: Option, + /// Replication specific feedback received from pageserver, if any + pub pageserver_feedback: Option, } impl Default for ReplicaState { @@ -68,7 +68,7 @@ impl ReplicaState { xmin: u64::MAX, catalog_xmin: u64::MAX, }, - zenith_feedback: None, + pageserver_feedback: None, } } } @@ -149,8 +149,12 @@ impl SharedState { /// Mark timeline active/inactive and return whether s3 offloading requires /// start/stop action. - fn update_status(&mut self) -> bool { - self.active = self.is_active(); + fn update_status(&mut self, ttid: ZTenantTimelineId) -> bool { + let is_active = self.is_active(); + if self.active != is_active { + info!("timeline {} active={} now", ttid, is_active); + } + self.active = is_active; self.is_wal_backup_action_pending() } @@ -187,6 +191,12 @@ impl SharedState { self.wal_backup_active } + // Can this safekeeper offload to s3? Recently joined safekeepers might not + // have necessary WAL. + fn can_wal_backup(&self) -> bool { + self.sk.state.local_start_lsn <= self.sk.inmem.backup_lsn + } + fn get_wal_seg_size(&self) -> usize { self.sk.state.server.wal_seg_size as usize } @@ -211,25 +221,25 @@ impl SharedState { // we need to know which pageserver compute node considers to be main. // See https://github.com/zenithdb/zenith/issues/1171 // - if let Some(zenith_feedback) = state.zenith_feedback { - if let Some(acc_feedback) = acc.zenith_feedback { - if acc_feedback.ps_writelsn < zenith_feedback.ps_writelsn { + if let Some(pageserver_feedback) = state.pageserver_feedback { + if let Some(acc_feedback) = acc.pageserver_feedback { + if acc_feedback.ps_writelsn < pageserver_feedback.ps_writelsn { warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet."); - acc.zenith_feedback = Some(zenith_feedback); + acc.pageserver_feedback = Some(pageserver_feedback); } } else { - acc.zenith_feedback = Some(zenith_feedback); + acc.pageserver_feedback = Some(pageserver_feedback); } // last lsn received by pageserver // FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver. // See https://github.com/zenithdb/zenith/issues/1171 - acc.last_received_lsn = Lsn::from(zenith_feedback.ps_writelsn); + acc.last_received_lsn = Lsn::from(pageserver_feedback.ps_writelsn); // When at least one pageserver has preserved data up to remote_consistent_lsn, // safekeeper is free to delete it, so choose max of all pageservers. acc.remote_consistent_lsn = max( - Lsn::from(zenith_feedback.ps_applylsn), + Lsn::from(pageserver_feedback.ps_applylsn), acc.remote_consistent_lsn, ); } @@ -291,7 +301,7 @@ impl Timeline { { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes += 1; - is_wal_backup_action_pending = shared_state.update_status(); + is_wal_backup_action_pending = shared_state.update_status(self.zttid); } // Wake up wal backup launcher, if offloading not started yet. if is_wal_backup_action_pending { @@ -308,7 +318,7 @@ impl Timeline { { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes -= 1; - is_wal_backup_action_pending = shared_state.update_status(); + is_wal_backup_action_pending = shared_state.update_status(self.zttid); } // Wake up wal backup launcher, if it is time to stop the offloading. if is_wal_backup_action_pending { @@ -327,7 +337,7 @@ impl Timeline { (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); if stop { - shared_state.update_status(); + shared_state.update_status(self.zttid); return Ok(true); } } @@ -341,6 +351,12 @@ impl Timeline { shared_state.wal_backup_attend() } + // Can this safekeeper offload to s3? Recently joined safekeepers might not + // have necessary WAL. + pub fn can_wal_backup(&self) -> bool { + self.mutex.lock().unwrap().can_wal_backup() + } + /// Deactivates the timeline, assuming it is being deleted. /// Returns whether the timeline was already active. /// @@ -441,8 +457,8 @@ impl Timeline { if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; - if let Some(zenith_feedback) = state.zenith_feedback { - resp.zenith_feedback = zenith_feedback; + if let Some(pageserver_feedback) = state.pageserver_feedback { + resp.pageserver_feedback = pageserver_feedback; } } @@ -509,7 +525,7 @@ impl Timeline { } shared_state.sk.record_safekeeper_info(sk_info)?; self.notify_wal_senders(&mut shared_state); - is_wal_backup_action_pending = shared_state.update_status(); + is_wal_backup_action_pending = shared_state.update_status(self.zttid); commit_lsn = shared_state.sk.inmem.commit_lsn; } self.commit_lsn_watch_tx.send(commit_lsn)?; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 1f2e9c303a..1d7c8de3b8 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -1,4 +1,7 @@ use anyhow::{Context, Result}; +use etcd_broker::subscription_key::{ + NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind, +}; use tokio::task::JoinHandle; use std::cmp::min; @@ -26,8 +29,6 @@ use crate::{broker, SafeKeeperConf}; use once_cell::sync::OnceCell; -const BACKUP_ELECTION_NAME: &str = "WAL_BACKUP"; - const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000; const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; @@ -48,14 +49,10 @@ pub fn wal_backup_launcher_thread_main( }); } -/// Check whether wal backup is required for timeline and mark that launcher is -/// aware of current status (if timeline exists). -fn is_wal_backup_required(zttid: ZTenantTimelineId) -> bool { - if let Some(tli) = GlobalTimelines::get_loaded(zttid) { - tli.wal_backup_attend() - } else { - false - } +/// Check whether wal backup is required for timeline. If yes, mark that launcher is +/// aware of current status and return the timeline. +fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option> { + GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend()) } struct WalBackupTaskHandle { @@ -63,6 +60,56 @@ struct WalBackupTaskHandle { handle: JoinHandle<()>, } +struct WalBackupTimelineEntry { + timeline: Arc, + handle: Option, +} + +/// Start per timeline task, if it makes sense for this safekeeper to offload. +fn consider_start_task( + conf: &SafeKeeperConf, + zttid: ZTenantTimelineId, + task: &mut WalBackupTimelineEntry, +) { + if !task.timeline.can_wal_backup() { + return; + } + info!("starting WAL backup task for {}", zttid); + + // TODO: decide who should offload right here by simply checking current + // state instead of running elections in offloading task. + let election_name = SubscriptionKey { + cluster_prefix: conf.broker_etcd_prefix.clone(), + kind: SubscriptionKind::Operation( + zttid, + NodeKind::Safekeeper, + OperationKind::Safekeeper(SkOperationKind::WalBackup), + ), + } + .watch_key(); + let my_candidate_name = broker::get_candiate_name(conf.my_id); + let election = broker::Election::new( + election_name, + my_candidate_name, + conf.broker_endpoints.clone(), + ); + + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + let timeline_dir = conf.timeline_dir(&zttid); + + let handle = tokio::spawn( + backup_task_main(zttid, timeline_dir, shutdown_rx, election) + .instrument(info_span!("WAL backup task", zttid = %zttid)), + ); + + task.handle = Some(WalBackupTaskHandle { + shutdown_tx, + handle, + }); +} + +const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; + /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup /// tasks. Having this in separate task simplifies locking, allows to reap /// panics and separate elections from offloading itself. @@ -71,7 +118,7 @@ async fn wal_backup_launcher_main_loop( mut wal_backup_launcher_rx: Receiver, ) { info!( - "WAL backup launcher: started, remote config {:?}", + "WAL backup launcher started, remote config {:?}", conf.remote_storage ); @@ -82,60 +129,50 @@ async fn wal_backup_launcher_main_loop( }) }); - let mut tasks: HashMap = HashMap::new(); + // Presense in this map means launcher is aware s3 offloading is needed for + // the timeline, but task is started only if it makes sense for to offload + // from this safekeeper. + let mut tasks: HashMap = HashMap::new(); + let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC)); loop { - // channel is never expected to get closed - let zttid = wal_backup_launcher_rx.recv().await.unwrap(); - let is_wal_backup_required = is_wal_backup_required(zttid); - if conf.remote_storage.is_none() || !conf.wal_backup_enabled { - continue; /* just drain the channel and do nothing */ - } - // do we need to do anything at all? - if is_wal_backup_required != tasks.contains_key(&zttid) { - if is_wal_backup_required { - // need to start the task - info!("starting WAL backup task for {}", zttid); + tokio::select! { + zttid = wal_backup_launcher_rx.recv() => { + // channel is never expected to get closed + let zttid = zttid.unwrap(); + if conf.remote_storage.is_none() || !conf.wal_backup_enabled { + continue; /* just drain the channel and do nothing */ + } + let timeline = is_wal_backup_required(zttid); + // do we need to do anything at all? + if timeline.is_some() != tasks.contains_key(&zttid) { + if let Some(timeline) = timeline { + // need to start the task + let entry = tasks.entry(zttid).or_insert(WalBackupTimelineEntry { + timeline, + handle: None, + }); + consider_start_task(&conf, zttid, entry); + } else { + // need to stop the task + info!("stopping WAL backup task for {}", zttid); - // TODO: decide who should offload in launcher itself by simply checking current state - let election_name = broker::get_campaign_name( - BACKUP_ELECTION_NAME, - &conf.broker_etcd_prefix, - zttid, - ); - let my_candidate_name = broker::get_candiate_name(conf.my_id); - let election = broker::Election::new( - election_name, - my_candidate_name, - conf.broker_endpoints.clone(), - ); - - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let timeline_dir = conf.timeline_dir(&zttid); - - let handle = tokio::spawn( - backup_task_main(zttid, timeline_dir, shutdown_rx, election) - .instrument(info_span!("WAL backup task", zttid = %zttid)), - ); - - tasks.insert( - zttid, - WalBackupTaskHandle { - shutdown_tx, - handle, - }, - ); - } else { - // need to stop the task - info!("stopping WAL backup task for {}", zttid); - - let wb_handle = tasks.remove(&zttid).unwrap(); - // Tell the task to shutdown. Error means task exited earlier, that's ok. - let _ = wb_handle.shutdown_tx.send(()).await; - // Await the task itself. TODO: restart panicked tasks earlier. - // Hm, why I can't await on reference to handle? - if let Err(e) = wb_handle.handle.await { - warn!("WAL backup task for {} panicked: {}", zttid, e); + let entry = tasks.remove(&zttid).unwrap(); + if let Some(wb_handle) = entry.handle { + // Tell the task to shutdown. Error means task exited earlier, that's ok. + let _ = wb_handle.shutdown_tx.send(()).await; + // Await the task itself. TODO: restart panicked tasks earlier. + if let Err(e) = wb_handle.handle.await { + warn!("WAL backup task for {} panicked: {}", zttid, e); + } + } + } + } + } + // Start known tasks, if needed and possible. + _ = ticker.tick() => { + for (zttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) { + consider_start_task(&conf, *zttid, entry); } } } diff --git a/scripts/generate_perf_report_page.py b/scripts/generate_perf_report_page.py index a15d04e056..23fa4b76a3 100755 --- a/scripts/generate_perf_report_page.py +++ b/scripts/generate_perf_report_page.py @@ -26,6 +26,7 @@ KEY_EXCLUDE_FIELDS = frozenset({ }) NEGATIVE_COLOR = 'negative' POSITIVE_COLOR = 'positive' +EPS = 1e-6 @dataclass @@ -120,7 +121,8 @@ def get_row_values(columns: List[str], run_result: SuitRun, # this might happen when new metric is added and there is no value for it in previous run # let this be here, TODO add proper handling when this actually happens raise ValueError(f'{column} not found in previous result') - ratio = float(value) / float(prev_value['value']) - 1 + # adding `EPS` to each term to avoid ZeroDivisionError when the denominator is zero + ratio = (float(value) + EPS) / (float(prev_value['value']) + EPS) - 1 ratio_display, color = format_ratio(ratio, current_value['report']) row_values.append(RowValue(value, color, ratio_display)) return row_values diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index 73120880d3..b9eb9d7cee 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -7,7 +7,7 @@ import pytest def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): - neon_env_builder.pageserver_auth_enabled = True + neon_env_builder.auth_enabled = True env = neon_env_builder.init_start() ps = env.pageserver @@ -54,7 +54,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): @pytest.mark.parametrize('with_safekeepers', [False, True]) def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool): - neon_env_builder.pageserver_auth_enabled = True + neon_env_builder.auth_enabled = True if with_safekeepers: neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() diff --git a/test_runner/batch_others/test_normal_work.py b/test_runner/batch_others/test_normal_work.py index c0f44ce7a9..4635a70de6 100644 --- a/test_runner/batch_others/test_normal_work.py +++ b/test_runner/batch_others/test_normal_work.py @@ -42,8 +42,8 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s Repeat check for several tenants/timelines. """ - env = neon_env_builder.init_start() neon_env_builder.num_safekeepers = num_safekeepers + env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() for _ in range(num_timelines): diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 289eec74c5..7f9cb9493d 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -114,7 +114,7 @@ def test_pageserver_http_api_client(neon_simple_env: NeonEnv): def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilder): - neon_env_builder.pageserver_auth_enabled = True + neon_env_builder.auth_enabled = True env = neon_env_builder.init_start() management_token = env.auth_keys.generate_management_token() diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index d55c0f2bcc..af1956e196 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -10,7 +10,7 @@ from fixtures.log_helper import log # @pytest.mark.parametrize('with_safekeepers', [False, True]) def test_restart_compute(neon_env_builder: NeonEnvBuilder, with_safekeepers: bool): - neon_env_builder.pageserver_auth_enabled = True + neon_env_builder.auth_enabled = True if with_safekeepers: neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 1932c3e450..e4970272d4 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -16,6 +16,7 @@ from fixtures.neon_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Sa from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any +from uuid import uuid4 @dataclass @@ -349,10 +350,12 @@ def test_broker(neon_env_builder: NeonEnvBuilder): # Test that old WAL consumed by peers and pageserver is removed from safekeepers. -def test_wal_removal(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize('auth_enabled', [False, True]) +def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): neon_env_builder.num_safekeepers = 2 # to advance remote_consistent_llsn neon_env_builder.enable_local_fs_remote_storage() + neon_env_builder.auth_enabled = auth_enabled env = neon_env_builder.init_start() env.neon_cli.create_branch('test_safekeepers_wal_removal') @@ -369,7 +372,10 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder): timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] # force checkpoint to advance remote_consistent_lsn - with closing(env.pageserver.connect()) as psconn: + pageserver_conn_options = {} + if auth_enabled: + pageserver_conn_options['password'] = env.auth_keys.generate_tenant_token(tenant_id) + with closing(env.pageserver.connect(**pageserver_conn_options)) as psconn: with psconn.cursor() as pscur: pscur.execute(f"checkpoint {tenant_id} {timeline_id}") @@ -380,9 +386,29 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder): ] assert all(os.path.exists(p) for p in first_segments) - http_cli = env.safekeepers[0].http_client() + if not auth_enabled: + http_cli = env.safekeepers[0].http_client() + else: + http_cli = env.safekeepers[0].http_client( + auth_token=env.auth_keys.generate_tenant_token(tenant_id)) + http_cli_other = env.safekeepers[0].http_client( + auth_token=env.auth_keys.generate_tenant_token(uuid4().hex)) + http_cli_noauth = env.safekeepers[0].http_client() + # Pretend WAL is offloaded to s3. + if auth_enabled: + old_backup_lsn = http_cli.timeline_status(tenant_id=tenant_id, + timeline_id=timeline_id).backup_lsn + assert 'FFFFFFFF/FEFFFFFF' != old_backup_lsn + for cli in [http_cli_other, http_cli_noauth]: + with pytest.raises(cli.HTTPError, match='Forbidden|Unauthorized'): + cli.record_safekeeper_info(tenant_id, + timeline_id, {'backup_lsn': 'FFFFFFFF/FEFFFFFF'}) + assert old_backup_lsn == http_cli.timeline_status(tenant_id=tenant_id, + timeline_id=timeline_id).backup_lsn http_cli.record_safekeeper_info(tenant_id, timeline_id, {'backup_lsn': 'FFFFFFFF/FEFFFFFF'}) + assert 'FFFFFFFF/FEFFFFFF' == http_cli.timeline_status(tenant_id=tenant_id, + timeline_id=timeline_id).backup_lsn # wait till first segment is removed on all safekeepers started_at = time.time() @@ -596,25 +622,42 @@ def test_sync_safekeepers(neon_env_builder: NeonEnvBuilder, assert all(lsn_after_sync == lsn for lsn in lsn_after_append) -def test_timeline_status(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize('auth_enabled', [False, True]) +def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): + neon_env_builder.auth_enabled = auth_enabled env = neon_env_builder.init_start() env.neon_cli.create_branch('test_timeline_status') pg = env.postgres.create_start('test_timeline_status') wa = env.safekeepers[0] - wa_http_cli = wa.http_client() - wa_http_cli.check_status() # learn neon timeline from compute tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + if not auth_enabled: + wa_http_cli = wa.http_client() + wa_http_cli.check_status() + else: + wa_http_cli = wa.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id)) + wa_http_cli.check_status() + wa_http_cli_bad = wa.http_client( + auth_token=env.auth_keys.generate_tenant_token(uuid4().hex)) + wa_http_cli_bad.check_status() + wa_http_cli_noauth = wa.http_client() + wa_http_cli_noauth.check_status() + # fetch something sensible from status tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id) epoch = tli_status.acceptor_epoch timeline_start_lsn = tli_status.timeline_start_lsn + if auth_enabled: + for cli in [wa_http_cli_bad, wa_http_cli_noauth]: + with pytest.raises(cli.HTTPError, match='Forbidden|Unauthorized'): + cli.timeline_status(tenant_id, timeline_id) + pg.safe_psql("create table t(i int)") # ensure epoch goes up after reboot @@ -894,8 +937,10 @@ def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder): assert wal_size_after_checkpoint < 16 * 2.5 -def test_delete_force(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize('auth_enabled', [False, True]) +def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): neon_env_builder.num_safekeepers = 1 + neon_env_builder.auth_enabled = auth_enabled env = neon_env_builder.init_start() # Create two tenants: one will be deleted, other should be preserved. @@ -921,7 +966,14 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder): cur.execute('CREATE TABLE t(key int primary key)') sk = env.safekeepers[0] sk_data_dir = Path(sk.data_dir()) - sk_http = sk.http_client() + if not auth_enabled: + sk_http = sk.http_client() + sk_http_other = sk_http + else: + sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id)) + sk_http_other = sk.http_client( + auth_token=env.auth_keys.generate_tenant_token(tenant_id_other)) + sk_http_noauth = sk.http_client() assert (sk_data_dir / tenant_id / timeline_id_1).is_dir() assert (sk_data_dir / tenant_id / timeline_id_2).is_dir() assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() @@ -961,6 +1013,15 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder): assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + if auth_enabled: + # Ensure we cannot delete the other tenant + for sk_h in [sk_http, sk_http_noauth]: + with pytest.raises(sk_h.HTTPError, match='Forbidden|Unauthorized'): + assert sk_h.timeline_delete_force(tenant_id_other, timeline_id_other) + with pytest.raises(sk_h.HTTPError, match='Forbidden|Unauthorized'): + assert sk_h.tenant_delete_force(tenant_id_other) + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + # Remove initial tenant's br2 (inactive) assert sk_http.timeline_delete_force(tenant_id, timeline_id_2) == { "dir_existed": True, @@ -1001,7 +1062,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder): assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() # Ensure the other tenant still works - sk_http.timeline_status(tenant_id_other, timeline_id_other) + sk_http_other.timeline_status(tenant_id_other, timeline_id_other) with closing(pg_other.connect()) as conn: with conn.cursor() as cur: cur.execute('INSERT INTO t (key) VALUES (123)') diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 63ee6ec57d..4c0715bac3 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -500,7 +500,7 @@ class NeonEnvBuilder: num_safekeepers: int = 1, # Use non-standard SK ids to check for various parsing bugs safekeepers_id_start: int = 0, - pageserver_auth_enabled: bool = False, + auth_enabled: bool = False, rust_log_override: Optional[str] = None, default_branch_name=DEFAULT_BRANCH_NAME): self.repo_dir = repo_dir @@ -513,7 +513,7 @@ class NeonEnvBuilder: self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers self.safekeepers_id_start = safekeepers_id_start - self.pageserver_auth_enabled = pageserver_auth_enabled + self.auth_enabled = auth_enabled self.default_branch_name = default_branch_name self.env: Optional[NeonEnv] = None @@ -639,7 +639,7 @@ class NeonEnv: pg=self.port_distributor.get_port(), http=self.port_distributor.get_port(), ) - pageserver_auth_type = "ZenithJWT" if config.pageserver_auth_enabled else "Trust" + pageserver_auth_type = "ZenithJWT" if config.auth_enabled else "Trust" toml += textwrap.dedent(f""" [pageserver] @@ -667,6 +667,10 @@ class NeonEnv: pg_port = {port.pg} http_port = {port.http} sync = false # Disable fsyncs to make the tests go faster""") + if config.auth_enabled: + toml += textwrap.dedent(f""" + auth_enabled = true + """) if bool(self.remote_storage_users & RemoteStorageUsers.SAFEKEEPER) and self.remote_storage is not None: toml += textwrap.dedent(f""" @@ -1197,7 +1201,7 @@ class NeonCli: log.info(f'Running in "{self.env.repo_dir}"') env_vars = os.environ.copy() - env_vars['ZENITH_REPO_DIR'] = str(self.env.repo_dir) + env_vars['NEON_REPO_DIR'] = str(self.env.repo_dir) env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) if self.env.rust_log_override is not None: env_vars['RUST_LOG'] = self.env.rust_log_override @@ -1757,7 +1761,6 @@ class Safekeeper: env: NeonEnv port: SafekeeperPort id: int - auth_token: Optional[str] = None running: bool = False def start(self) -> 'Safekeeper': @@ -1813,8 +1816,8 @@ class Safekeeper: assert isinstance(res, dict) return res - def http_client(self) -> SafekeeperHttpClient: - return SafekeeperHttpClient(port=self.port.http) + def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient: + return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token) def data_dir(self) -> str: return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") @@ -1838,9 +1841,15 @@ class SafekeeperMetrics: class SafekeeperHttpClient(requests.Session): - def __init__(self, port: int): + HTTPError = requests.HTTPError + + def __init__(self, port: int, auth_token: Optional[str] = None): super().__init__() self.port = port + self.auth_token = auth_token + + if auth_token is not None: + self.headers['Authorization'] = f'Bearer {auth_token}' def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()