diff --git a/.github/ansible/production.hosts.yaml b/.github/ansible/production.hosts.yaml index 22bace5ade..ecb847bd61 100644 --- a/.github/ansible/production.hosts.yaml +++ b/.github/ansible/production.hosts.yaml @@ -7,7 +7,7 @@ storage: broker_endpoint: http://storage-broker.prod.local:50051 pageserver_config_stub: pg_distrib_dir: /usr/local - metric_collection_endpoint: http://console-staging.local/billing/api/v1/usage_events + metric_collection_endpoint: http://console-release.local/billing/api/v1/usage_events metric_collection_interval: 10min remote_storage: bucket_name: "{{ bucket_name }}" diff --git a/.github/ansible/staging.eu-west-1.hosts.yaml b/.github/ansible/staging.eu-west-1.hosts.yaml index fce450ed39..f28dc8e07b 100644 --- a/.github/ansible/staging.eu-west-1.hosts.yaml +++ b/.github/ansible/staging.eu-west-1.hosts.yaml @@ -18,7 +18,7 @@ storage: ansible_aws_ssm_region: eu-west-1 ansible_aws_ssm_bucket_name: neon-dev-storage-eu-west-1 console_region_id: aws-eu-west-1 - sentry_environment: development + sentry_environment: staging children: pageservers: diff --git a/.github/ansible/staging.us-east-2.hosts.yaml b/.github/ansible/staging.us-east-2.hosts.yaml index 1d1b8dbfa4..b46e729e32 100644 --- a/.github/ansible/staging.us-east-2.hosts.yaml +++ b/.github/ansible/staging.us-east-2.hosts.yaml @@ -18,7 +18,7 @@ storage: ansible_aws_ssm_region: us-east-2 ansible_aws_ssm_bucket_name: neon-staging-storage-us-east-2 console_region_id: aws-us-east-2 - sentry_environment: development + sentry_environment: staging children: pageservers: @@ -29,6 +29,8 @@ storage: ansible_host: i-0565a8b4008aa3f40 pageserver-2.us-east-2.aws.neon.build: ansible_host: i-01e31cdf7e970586a + pageserver-3.us-east-2.aws.neon.build: + ansible_host: i-0602a0291365ef7cc safekeepers: hosts: diff --git a/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml b/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml index 47924456ba..c49b8d2009 100644 --- a/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml +++ b/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml @@ -8,7 +8,7 @@ settings: authBackend: "console" authEndpoint: "http://console-staging.local/management/api/v2" domain: "*.eu-west-1.aws.neon.build" - sentryEnvironment: "development" + sentryEnvironment: "staging" wssPort: 8443 metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml b/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml index c6e682f571..ccf701f52d 100644 --- a/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml +++ b/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml @@ -49,4 +49,4 @@ extraManifests: - "{{ .Release.Namespace }}" settings: - sentryEnvironment: "development" + sentryEnvironment: "staging" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml b/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml index eb8fd50c0f..cb062f705d 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml @@ -8,7 +8,7 @@ settings: authBackend: "link" authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/" uri: "https://console.stage.neon.tech/psql_session/" - sentryEnvironment: "development" + sentryEnvironment: "staging" metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml index 8a08738d5f..99b67d75c1 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml @@ -8,7 +8,7 @@ settings: authBackend: "console" authEndpoint: "http://console-staging.local/management/api/v2" domain: "*.cloud.stage.neon.tech" - sentryEnvironment: "development" + sentryEnvironment: "staging" wssPort: 8443 metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml index b02d46917c..764bb25b64 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml @@ -8,7 +8,7 @@ settings: authBackend: "console" authEndpoint: "http://console-staging.local/management/api/v2" domain: "*.us-east-2.aws.neon.build" - sentryEnvironment: "development" + sentryEnvironment: "staging" wssPort: 8443 metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml b/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml index c7682d24c0..69363c5f13 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml @@ -49,4 +49,4 @@ extraManifests: - "{{ .Release.Namespace }}" settings: - sentryEnvironment: "development" + sentryEnvironment: "staging" diff --git a/Cargo.lock b/Cargo.lock index 59adf696a7..d8aba9ba68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -833,10 +833,8 @@ dependencies = [ "anyhow", "chrono", "clap 4.0.32", - "env_logger", "futures", "hyper", - "log", "notify", "postgres", "regex", @@ -845,6 +843,8 @@ dependencies = [ "tar", "tokio", "tokio-postgres", + "tracing", + "tracing-subscriber", "url", "workspace_hack", ] @@ -1954,7 +1954,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", - "serde", ] [[package]] @@ -4565,6 +4564,7 @@ dependencies = [ "tower", "tracing", "tracing-core", + "tracing-subscriber", "url", ] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 1e0aee81d7..4536604bdf 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -8,10 +8,8 @@ license.workspace = true anyhow.workspace = true chrono.workspace = true clap.workspace = true -env_logger.workspace = true futures.workspace = true hyper = { workspace = true, features = ["full"] } -log = { workspace = true, features = ["std", "serde"] } notify.workspace = true postgres.workspace = true regex.workspace = true @@ -20,6 +18,8 @@ serde_json.workspace = true tar.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio-postgres.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true url.workspace = true workspace_hack.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 307300cfd8..e5ab8eb153 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -40,7 +40,7 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; -use log::{error, info}; +use tracing::{error, info}; use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus}; use compute_tools::http::api::launch_http_server; @@ -53,7 +53,6 @@ use compute_tools::spec::*; use url::Url; fn main() -> Result<()> { - // TODO: re-use `utils::logging` later init_logger(DEFAULT_LOG_LEVEL)?; let matches = cli().get_matches(); @@ -122,29 +121,45 @@ fn main() -> Result<()> { // Also spawn the thread responsible for handling the VM informant -- if it's present let _vm_informant_handle = spawn_vm_informant_if_present().expect("cannot launch VM informant"); - // Run compute (Postgres) and hang waiting on it. - match compute.prepare_and_run() { - Ok(ec) => { - let code = ec.code().unwrap_or(1); - info!("Postgres exited with code {}, shutting down", code); - exit(code) - } - Err(error) => { - error!("could not start the compute node: {:?}", error); - + // Start Postgres + let mut delay_exit = false; + let mut exit_code = None; + let pg = match compute.start_compute() { + Ok(pg) => Some(pg), + Err(err) => { + error!("could not start the compute node: {:?}", err); let mut state = compute.state.write().unwrap(); - state.error = Some(format!("{:?}", error)); + state.error = Some(format!("{:?}", err)); state.status = ComputeStatus::Failed; drop(state); - - // Keep serving HTTP requests, so the cloud control plane was able to - // get the actual error. - info!("giving control plane 30s to collect the error before shutdown"); - thread::sleep(Duration::from_secs(30)); - info!("shutting down"); - Err(error) + delay_exit = true; + None } + }; + + // Wait for the child Postgres process forever. In this state Ctrl+C will + // propagate to Postgres and it will be shut down as well. + if let Some(mut pg) = pg { + let ecode = pg + .wait() + .expect("failed to start waiting on Postgres process"); + info!("Postgres exited with code {}, shutting down", ecode); + exit_code = ecode.code() } + + if let Err(err) = compute.check_for_core_dumps() { + error!("error while checking for core dumps: {err:?}"); + } + + // If launch failed, keep serving HTTP requests for a while, so the cloud + // control plane can get the actual error. + if delay_exit { + info!("giving control plane 30s to collect the error before shutdown"); + thread::sleep(Duration::from_secs(30)); + info!("shutting down"); + } + + exit(exit_code.unwrap_or(1)) } fn cli() -> clap::Command { diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs index ee1605c814..b8413de516 100644 --- a/compute_tools/src/checker.rs +++ b/compute_tools/src/checker.rs @@ -1,10 +1,11 @@ use anyhow::{anyhow, Result}; -use log::error; use postgres::Client; use tokio_postgres::NoTls; +use tracing::{error, instrument}; use crate::compute::ComputeNode; +#[instrument(skip_all)] pub fn create_writability_check_data(client: &mut Client) -> Result<()> { let query = " CREATE TABLE IF NOT EXISTS health_check ( @@ -21,6 +22,7 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> { Ok(()) } +#[instrument(skip_all)] pub async fn check_writability(compute: &ComputeNode) -> Result<()> { let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?; if client.is_closed() { diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c2c9ab2230..c8af8822b7 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -17,15 +17,15 @@ use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::Path; -use std::process::{Command, ExitStatus, Stdio}; +use std::process::{Command, Stdio}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::RwLock; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use log::{info, warn}; use postgres::{Client, NoTls}; use serde::{Serialize, Serializer}; +use tracing::{info, instrument, warn}; use crate::checker::create_writability_check_data; use crate::config; @@ -121,6 +121,7 @@ impl ComputeNode { // Get basebackup from the libpq connection to pageserver using `connstr` and // unarchive it to `pgdata` directory overriding all its previous content. + #[instrument(skip(self))] fn get_basebackup(&self, lsn: &str) -> Result<()> { let start_time = Utc::now(); @@ -154,6 +155,7 @@ impl ComputeNode { // Run `postgres` in a special mode with `--sync-safekeepers` argument // and return the reported LSN back to the caller. + #[instrument(skip(self))] fn sync_safekeepers(&self) -> Result { let start_time = Utc::now(); @@ -196,6 +198,7 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. + #[instrument(skip(self))] pub fn prepare_pgdata(&self) -> Result<()> { let spec = &self.spec; let pgdata_path = Path::new(&self.pgdata); @@ -229,9 +232,8 @@ impl ComputeNode { /// Start Postgres as a child process and manage DBs/roles. /// After that this will hang waiting on the postmaster process to exit. - pub fn run(&self) -> Result { - let start_time = Utc::now(); - + #[instrument(skip(self))] + pub fn start_postgres(&self) -> Result { let pgdata_path = Path::new(&self.pgdata); // Run postgres as a child process. @@ -242,10 +244,15 @@ impl ComputeNode { wait_for_postgres(&mut pg, pgdata_path)?; + Ok(pg) + } + + #[instrument(skip(self))] + pub fn apply_config(&self) -> Result<()> { // If connection fails, // it may be the old node with `zenith_admin` superuser. // - // In this case we need to connect with old `zenith_admin`name + // In this case we need to connect with old `zenith_admin` name // and create new user. We cannot simply rename connected user, // but we can create a new one and grant it all privileges. let mut client = match Client::connect(self.connstr.as_str(), NoTls) { @@ -271,6 +278,7 @@ impl ComputeNode { Ok(client) => client, }; + // Proceed with post-startup configuration. Note, that order of operations is important. handle_roles(&self.spec, &mut client)?; handle_databases(&self.spec, &mut client)?; handle_role_deletions(self, &mut client)?; @@ -279,8 +287,34 @@ impl ComputeNode { // 'Close' connection drop(client); - let startup_end_time = Utc::now(); + info!( + "finished configuration of compute for project {}", + self.spec.cluster.cluster_id + ); + + Ok(()) + } + + #[instrument(skip(self))] + pub fn start_compute(&self) -> Result { + info!( + "starting compute for project {}, operation {}, tenant {}, timeline {}", + self.spec.cluster.cluster_id, + self.spec.operation_uuid.as_ref().unwrap(), + self.tenant, + self.timeline, + ); + + self.prepare_pgdata()?; + + let start_time = Utc::now(); + + let pg = self.start_postgres()?; + + self.apply_config()?; + + let startup_end_time = Utc::now(); self.metrics.config_ms.store( startup_end_time .signed_duration_since(start_time) @@ -300,34 +334,7 @@ impl ComputeNode { self.set_status(ComputeStatus::Running); - info!( - "finished configuration of compute for project {}", - self.spec.cluster.cluster_id - ); - - // Wait for child Postgres process basically forever. In this state Ctrl+C - // will propagate to Postgres and it will be shut down as well. - let ecode = pg - .wait() - .expect("failed to start waiting on Postgres process"); - - self.check_for_core_dumps() - .expect("failed to check for core dumps"); - - Ok(ecode) - } - - pub fn prepare_and_run(&self) -> Result { - info!( - "starting compute for project {}, operation {}, tenant {}, timeline {}", - self.spec.cluster.cluster_id, - self.spec.operation_uuid.as_ref().unwrap(), - self.tenant, - self.timeline, - ); - - self.prepare_pgdata()?; - self.run() + Ok(pg) } // Look for core dumps and collect backtraces. @@ -340,7 +347,7 @@ impl ComputeNode { // // Use that as a default location and pattern, except macos where core dumps are written // to /cores/ directory by default. - fn check_for_core_dumps(&self) -> Result<()> { + pub fn check_for_core_dumps(&self) -> Result<()> { let core_dump_dir = match std::env::consts::OS { "macos" => Path::new("/cores/"), _ => Path::new(&self.pgdata), diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 44f83e5003..f2a49f332c 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -6,8 +6,8 @@ use std::thread; use anyhow::Result; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use log::{error, info}; use serde_json; +use tracing::{error, info}; use crate::compute::ComputeNode; diff --git a/compute_tools/src/informant.rs b/compute_tools/src/informant.rs index 09bd5e3138..8a6e3ab43a 100644 --- a/compute_tools/src/informant.rs +++ b/compute_tools/src/informant.rs @@ -1,8 +1,8 @@ -use log::{info, warn}; use std::path::Path; use std::process; use std::thread; use std::time::Duration; +use tracing::{info, warn}; use anyhow::{Context, Result}; diff --git a/compute_tools/src/logger.rs b/compute_tools/src/logger.rs index dde0a950f8..57e5496e86 100644 --- a/compute_tools/src/logger.rs +++ b/compute_tools/src/logger.rs @@ -1,42 +1,20 @@ -use std::io::Write; - use anyhow::Result; -use chrono::Utc; -use env_logger::{Builder, Env}; - -macro_rules! info_println { - ($($tts:tt)*) => { - if log_enabled!(Level::Info) { - println!($($tts)*); - } - } -} - -macro_rules! info_print { - ($($tts:tt)*) => { - if log_enabled!(Level::Info) { - print!($($tts)*); - } - } -} +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::prelude::*; /// Initialize `env_logger` using either `default_level` or /// `RUST_LOG` environment variable as default log level. pub fn init_logger(default_level: &str) -> Result<()> { - let env = Env::default().filter_or("RUST_LOG", default_level); + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level)); - Builder::from_env(env) - .format(|buf, record| { - let thread_handle = std::thread::current(); - writeln!( - buf, - "{} [{}] {}: {}", - Utc::now().format("%Y-%m-%d %H:%M:%S%.3f %Z"), - thread_handle.name().unwrap_or("main"), - record.level(), - record.args() - ) - }) + let fmt_layer = tracing_subscriber::fmt::layer() + .with_target(false) + .with_writer(std::io::stderr); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) .init(); Ok(()) diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index c871422e78..7c9878ffcf 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -3,8 +3,8 @@ use std::{thread, time}; use anyhow::Result; use chrono::{DateTime, Utc}; -use log::{debug, info}; use postgres::{Client, NoTls}; +use tracing::{debug, info}; use crate::compute::ComputeNode; diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index ff422f1cf5..6ab2864721 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -11,6 +11,7 @@ use anyhow::{bail, Result}; use notify::{RecursiveMode, Watcher}; use postgres::{Client, Transaction}; use serde::Deserialize; +use tracing::{debug, instrument}; const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds @@ -129,8 +130,8 @@ impl Role { /// Serialize a list of role parameters into a Postgres-acceptable /// string of arguments. pub fn to_pg_options(&self) -> String { - // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in Rails. - // For now we do not use generic `options` for roles. Once used, add + // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane. + // For now, we do not use generic `options` for roles. Once used, add // `self.options.as_pg_options()` somewhere here. let mut params: String = "LOGIN".to_string(); @@ -229,6 +230,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result> { /// Wait for Postgres to become ready to accept connections. It's ready to /// accept connections when the state-field in `pgdata/postmaster.pid` says /// 'ready'. +#[instrument(skip(pg))] pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { let pid_path = pgdata.join("postmaster.pid"); @@ -287,18 +289,18 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { } let res = rx.recv_timeout(Duration::from_millis(100)); - log::debug!("woken up by notify: {res:?}"); + debug!("woken up by notify: {res:?}"); // If there are multiple events in the channel already, we only need to be // check once. Swallow the extra events before we go ahead to check the // pid file. while let Ok(res) = rx.try_recv() { - log::debug!("swallowing extra event: {res:?}"); + debug!("swallowing extra event: {res:?}"); } // Check that we can open pid file first. if let Ok(file) = File::open(&pid_path) { if !postmaster_pid_seen { - log::debug!("postmaster.pid appeared"); + debug!("postmaster.pid appeared"); watcher .unwatch(pgdata) .expect("Failed to remove pgdata dir watch"); @@ -314,7 +316,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { // Pid file could be there and we could read it, but it could be empty, for example. if let Some(Ok(line)) = last_line { let status = line.trim(); - log::debug!("last line of postmaster.pid: {status:?}"); + debug!("last line of postmaster.pid: {status:?}"); // Now Postgres is ready to accept connections if status == "ready" { @@ -330,7 +332,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { } } - log::info!("PostgreSQL is now running, continuing to configure it"); + tracing::info!("PostgreSQL is now running, continuing to configure it"); Ok(()) } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 81e01fe555..97cd623052 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,12 +1,11 @@ use std::path::Path; use std::str::FromStr; -use std::time::Instant; use anyhow::Result; -use log::{info, log_enabled, warn, Level}; use postgres::config::Config; use postgres::{Client, NoTls}; use serde::Deserialize; +use tracing::{info, info_span, instrument, span_enabled, warn, Level}; use crate::compute::ComputeNode; use crate::config; @@ -80,23 +79,25 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> { /// Given a cluster spec json and open transaction it handles roles creation, /// deletion and update. +#[instrument(skip_all)] pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { let mut xact = client.transaction()?; let existing_roles: Vec = get_existing_roles(&mut xact)?; // Print a list of existing Postgres roles (only in debug mode) - info!("postgres roles:"); - for r in &existing_roles { - info_println!( - "{} - {}:{}", - " ".repeat(27 + 5), - r.name, - if r.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - } - ); + if span_enabled!(Level::INFO) { + info!("postgres roles:"); + for r in &existing_roles { + info!( + " - {}:{}", + r.name, + if r.encrypted_password.is_some() { + "[FILTERED]" + } else { + "(null)" + } + ); + } } // Process delta operations first @@ -137,58 +138,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { info!("cluster spec roles:"); for role in &spec.cluster.roles { let name = &role.name; - - info_print!( - "{} - {}:{}", - " ".repeat(27 + 5), - name, - if role.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - } - ); - // XXX: with a limited number of roles it is fine, but consider making it a HashMap let pg_role = existing_roles.iter().find(|r| r.name == *name); - if let Some(r) = pg_role { - let mut update_role = false; - + enum RoleAction { + None, + Update, + Create, + } + let action = if let Some(r) = pg_role { if (r.encrypted_password.is_none() && role.encrypted_password.is_some()) || (r.encrypted_password.is_some() && role.encrypted_password.is_none()) { - update_role = true; + RoleAction::Update } else if let Some(pg_pwd) = &r.encrypted_password { // Check whether password changed or not (trim 'md5:' prefix first) - update_role = pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap(); + if pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap() { + RoleAction::Update + } else { + RoleAction::None + } + } else { + RoleAction::None } + } else { + RoleAction::Create + }; - if update_role { + match action { + RoleAction::None => {} + RoleAction::Update => { let mut query: String = format!("ALTER ROLE {} ", name.pg_quote()); - info_print!(" -> update"); - query.push_str(&role.to_pg_options()); xact.execute(query.as_str(), &[])?; } - } else { - info!("role name: '{}'", &name); - let mut query: String = format!("CREATE ROLE {} ", name.pg_quote()); - info!("role create query: '{}'", &query); - info_print!(" -> create"); + RoleAction::Create => { + let mut query: String = format!("CREATE ROLE {} ", name.pg_quote()); + info!("role create query: '{}'", &query); + query.push_str(&role.to_pg_options()); + xact.execute(query.as_str(), &[])?; - query.push_str(&role.to_pg_options()); - xact.execute(query.as_str(), &[])?; - - let grant_query = format!( - "GRANT pg_read_all_data, pg_write_all_data TO {}", - name.pg_quote() - ); - xact.execute(grant_query.as_str(), &[])?; - info!("role grant query: '{}'", &grant_query); + let grant_query = format!( + "GRANT pg_read_all_data, pg_write_all_data TO {}", + name.pg_quote() + ); + xact.execute(grant_query.as_str(), &[])?; + info!("role grant query: '{}'", &grant_query); + } } - info_print!("\n"); + if span_enabled!(Level::INFO) { + let pwd = if role.encrypted_password.is_some() { + "[FILTERED]" + } else { + "(null)" + }; + let action_str = match action { + RoleAction::None => "", + RoleAction::Create => " -> create", + RoleAction::Update => " -> update", + }; + info!(" - {}:{}{}", name, pwd, action_str); + } } xact.commit()?; @@ -197,12 +208,25 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { } /// Reassign all dependent objects and delete requested roles. +#[instrument(skip_all)] pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> { if let Some(ops) = &node.spec.delta_operations { // First, reassign all dependent objects to db owners. info!("reassigning dependent objects of to-be-deleted roles"); + + // Fetch existing roles. We could've exported and used `existing_roles` from + // `handle_roles()`, but we only make this list there before creating new roles. + // Which is probably fine as we never create to-be-deleted roles, but that'd + // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared + // buffers already, so this shouldn't be a big deal. + let mut xact = client.transaction()?; + let existing_roles: Vec = get_existing_roles(&mut xact)?; + xact.commit()?; + for op in ops { - if op.action == "delete_role" { + // Check that role is still present in Postgres, as this could be a + // restart with the same spec after role deletion. + if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) { reassign_owned_objects(node, &op.name)?; } } @@ -261,13 +285,16 @@ fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level /// atomicity should be enough here due to the order of operations and various checks, /// which together provide us idempotency. +#[instrument(skip_all)] pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { let existing_dbs: Vec = get_existing_dbs(client)?; // Print a list of existing Postgres databases (only in debug mode) - info!("postgres databases:"); - for r in &existing_dbs { - info_println!("{} - {}:{}", " ".repeat(27 + 5), r.name, r.owner); + if span_enabled!(Level::INFO) { + info!("postgres databases:"); + for r in &existing_dbs { + info!(" {}:{}", r.name, r.owner); + } } // Process delta operations first @@ -310,13 +337,15 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { for db in &spec.cluster.databases { let name = &db.name; - info_print!("{} - {}:{}", " ".repeat(27 + 5), db.name, db.owner); - // XXX: with a limited number of databases it is fine, but consider making it a HashMap let pg_db = existing_dbs.iter().find(|r| r.name == *name); - let start_time = Instant::now(); - if let Some(r) = pg_db { + enum DatabaseAction { + None, + Update, + Create, + } + let action = if let Some(r) = pg_db { // XXX: db owner name is returned as quoted string from Postgres, // when quoting is needed. let new_owner = if r.owner.starts_with('"') { @@ -326,29 +355,42 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { }; if new_owner != r.owner { + // Update the owner + DatabaseAction::Update + } else { + DatabaseAction::None + } + } else { + DatabaseAction::Create + }; + + match action { + DatabaseAction::None => {} + DatabaseAction::Update => { let query: String = format!( "ALTER DATABASE {} OWNER TO {}", name.pg_quote(), db.owner.pg_quote() ); - info_print!(" -> update"); - + let _ = info_span!("executing", query).entered(); client.execute(query.as_str(), &[])?; - let elapsed = start_time.elapsed().as_millis(); - info_print!(" ({} ms)", elapsed); } - } else { - let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote()); - info_print!(" -> create"); + DatabaseAction::Create => { + let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote()); + query.push_str(&db.to_pg_options()); + let _ = info_span!("executing", query).entered(); + client.execute(query.as_str(), &[])?; + } + }; - query.push_str(&db.to_pg_options()); - client.execute(query.as_str(), &[])?; - - let elapsed = start_time.elapsed().as_millis(); - info_print!(" ({} ms)", elapsed); + if span_enabled!(Level::INFO) { + let action_str = match action { + DatabaseAction::None => "", + DatabaseAction::Create => " -> create", + DatabaseAction::Update => " -> update", + }; + info!(" - {}:{}{}", db.name, db.owner, action_str); } - - info_print!("\n"); } Ok(()) @@ -356,6 +398,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. +#[instrument(skip_all)] pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { let spec = &node.spec; diff --git a/libs/tenant_size_model/src/lib.rs b/libs/tenant_size_model/src/lib.rs index 92bec8aebe..b156e1be9d 100644 --- a/libs/tenant_size_model/src/lib.rs +++ b/libs/tenant_size_model/src/lib.rs @@ -134,22 +134,25 @@ impl Storage { op: Cow<'static, str>, lsn: u64, size: Option, - ) where + ) -> anyhow::Result<()> + where K: std::borrow::Borrow, Q: std::hash::Hash + Eq + std::fmt::Debug, { - let lastseg_id = *self.branches.get(branch).unwrap(); + let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; let newseg_id = self.segments.len(); let lastseg = &mut self.segments[lastseg_id]; assert!(lsn > lastseg.end_lsn); + let Some(start_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; + let newseg = Segment { op, parent: Some(lastseg_id), start_lsn: lastseg.end_lsn, end_lsn: lsn, - start_size: lastseg.end_size.unwrap(), + start_size, end_size: size, children_after: Vec::new(), needed: false, @@ -158,6 +161,8 @@ impl Storage { self.segments.push(newseg); *self.branches.get_mut(branch).expect("read already") = newseg_id; + + Ok(()) } /// Advances the branch with the named operation, by the relative LSN and logical size bytes. @@ -167,21 +172,24 @@ impl Storage { op: Cow<'static, str>, lsn_bytes: u64, size_bytes: i64, - ) where + ) -> anyhow::Result<()> + where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - let lastseg_id = *self.branches.get(branch).unwrap(); + let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; let newseg_id = self.segments.len(); let lastseg = &mut self.segments[lastseg_id]; + let Some(last_end_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; + let newseg = Segment { op, parent: Some(lastseg_id), start_lsn: lastseg.end_lsn, end_lsn: lastseg.end_lsn + lsn_bytes, - start_size: lastseg.end_size.unwrap(), - end_size: Some((lastseg.end_size.unwrap() as i64 + size_bytes) as u64), + start_size: last_end_size, + end_size: Some((last_end_size as i64 + size_bytes) as u64), children_after: Vec::new(), needed: false, }; @@ -189,33 +197,33 @@ impl Storage { self.segments.push(newseg); *self.branches.get_mut(branch).expect("read already") = newseg_id; + Ok(()) } - pub fn insert(&mut self, branch: &Q, bytes: u64) + pub fn insert(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "insert".into(), bytes, bytes as i64); + self.modify_branch(branch, "insert".into(), bytes, bytes as i64) } - pub fn update(&mut self, branch: &Q, bytes: u64) + pub fn update(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "update".into(), bytes, 0i64); + self.modify_branch(branch, "update".into(), bytes, 0i64) } - pub fn delete(&mut self, branch: &Q, bytes: u64) + pub fn delete(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)); + self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)) } - /// Panics if the parent branch cannot be found. pub fn branch(&mut self, parent: &Q, name: K) -> anyhow::Result<()> where K: std::borrow::Borrow + std::fmt::Debug, @@ -236,7 +244,7 @@ impl Storage { Ok(()) } - pub fn calculate(&mut self, retention_period: u64) -> SegmentSize { + pub fn calculate(&mut self, retention_period: u64) -> anyhow::Result { // Phase 1: Mark all the segments that need to be retained for (_branch, &last_seg_id) in self.branches.iter() { let last_seg = &self.segments[last_seg_id]; @@ -261,7 +269,7 @@ impl Storage { self.size_from_snapshot_later(0) } - fn size_from_wal(&self, seg_id: usize) -> SegmentSize { + fn size_from_wal(&self, seg_id: usize) -> anyhow::Result { let seg = &self.segments[seg_id]; let this_size = seg.end_lsn - seg.start_lsn; @@ -272,10 +280,10 @@ impl Storage { for &child_id in seg.children_after.iter() { // try each child both ways let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id); + let p1 = self.size_from_wal(child_id)?; let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id); + let p2 = self.size_from_snapshot_later(child_id)?; if p1.total() < p2.total() { p1 } else { @@ -286,15 +294,15 @@ impl Storage { }; children.push(p); } - SegmentSize { + Ok(SegmentSize { seg_id, method: if seg.needed { WalNeeded } else { Wal }, this_size, children, - } + }) } - fn size_from_snapshot_later(&self, seg_id: usize) -> SegmentSize { + fn size_from_snapshot_later(&self, seg_id: usize) -> anyhow::Result { // If this is needed, then it's time to do the snapshot and continue // with wal method. let seg = &self.segments[seg_id]; @@ -305,10 +313,10 @@ impl Storage { for &child_id in seg.children_after.iter() { // try each child both ways let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id); + let p1 = self.size_from_wal(child_id)?; let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id); + let p2 = self.size_from_snapshot_later(child_id)?; if p1.total() < p2.total() { p1 } else { @@ -319,12 +327,12 @@ impl Storage { }; children.push(p); } - SegmentSize { + Ok(SegmentSize { seg_id, method: WalNeeded, this_size: seg.start_size, children, - } + }) } else { // If any of the direct children are "needed", need to be able to reconstruct here let mut children_needed = false; @@ -339,7 +347,7 @@ impl Storage { let method1 = if !children_needed { let mut children = Vec::new(); for child in seg.children_after.iter() { - children.push(self.size_from_snapshot_later(*child)); + children.push(self.size_from_snapshot_later(*child)?); } Some(SegmentSize { seg_id, @@ -355,20 +363,25 @@ impl Storage { let method2 = if children_needed || seg.children_after.len() >= 2 { let mut children = Vec::new(); for child in seg.children_after.iter() { - children.push(self.size_from_wal(*child)); + children.push(self.size_from_wal(*child)?); } + let Some(this_size) = seg.end_size else { anyhow::bail!("no end_size at junction {seg_id}") }; Some(SegmentSize { seg_id, method: SnapshotAfter, - this_size: seg.end_size.unwrap(), + this_size, children, }) } else { None }; - match (method1, method2) { - (None, None) => panic!(), + Ok(match (method1, method2) { + (None, None) => anyhow::bail!( + "neither method was applicable: children_after={}, children_needed={}", + seg.children_after.len(), + children_needed + ), (Some(method), None) => method, (None, Some(method)) => method, (Some(method1), Some(method2)) => { @@ -378,7 +391,7 @@ impl Storage { method2 } } - } + }) } } diff --git a/libs/tenant_size_model/src/main.rs b/libs/tenant_size_model/src/main.rs index 9378a98a09..e32dd055f4 100644 --- a/libs/tenant_size_model/src/main.rs +++ b/libs/tenant_size_model/src/main.rs @@ -7,118 +7,118 @@ use tenant_size_model::{Segment, SegmentSize, Storage}; // Main branch only. Some updates on it. -fn scenario_1() -> (Vec, SegmentSize) { +fn scenario_1() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Main branch only. Some updates on it. -fn scenario_2() -> (Vec, SegmentSize) { +fn scenario_2() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent - storage.update("main", 1_000); + storage.update("main", 1_000)?; - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Like 2, but more updates on main -fn scenario_3() -> (Vec, SegmentSize) { +fn scenario_3() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Diverged branches -fn scenario_4() -> (Vec, SegmentSize) { +fn scenario_4() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent for _ in 0..8 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } -fn scenario_5() -> (Vec, SegmentSize) { +fn scenario_5() -> anyhow::Result<(Vec, SegmentSize)> { let mut storage = Storage::new("a"); - storage.insert("a", 5000); - storage.branch("a", "b").unwrap(); - storage.update("b", 4000); - storage.update("a", 2000); - storage.branch("a", "c").unwrap(); - storage.insert("c", 4000); - storage.insert("a", 2000); + storage.insert("a", 5000)?; + storage.branch("a", "b")?; + storage.update("b", 4000)?; + storage.update("a", 2000)?; + storage.branch("a", "c")?; + storage.insert("c", 4000)?; + storage.insert("a", 2000)?; - let size = storage.calculate(5000); + let size = storage.calculate(5000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } -fn scenario_6() -> (Vec, SegmentSize) { +fn scenario_6() -> anyhow::Result<(Vec, SegmentSize)> { use std::borrow::Cow; const NO_OP: Cow<'static, str> = Cow::Borrowed(""); @@ -133,18 +133,18 @@ fn scenario_6() -> (Vec, SegmentSize) { let mut storage = Storage::new(None); - storage.branch(&None, branches[0]).unwrap(); // at 0 - storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128); // at 108951064 - storage.branch(&branches[0], branches[1]).unwrap(); // at 108951064 - storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392); // at 124511472 - storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904); // at 283415424 - storage.branch(&branches[0], branches[2]).unwrap(); // at 283415424 - storage.modify_branch(&branches[2], NO_OP, 15906192, 8192); // at 299321616 - storage.modify_branch(&branches[0], NO_OP, 18909976, 32768); // at 302325400 + storage.branch(&None, branches[0])?; // at 0 + storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128)?; // at 108951064 + storage.branch(&branches[0], branches[1])?; // at 108951064 + storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392)?; // at 124511472 + storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904)?; // at 283415424 + storage.branch(&branches[0], branches[2])?; // at 283415424 + storage.modify_branch(&branches[2], NO_OP, 15906192, 8192)?; // at 299321616 + storage.modify_branch(&branches[0], NO_OP, 18909976, 32768)?; // at 302325400 - let size = storage.calculate(100_000); + let size = storage.calculate(100_000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } fn main() { @@ -163,7 +163,8 @@ fn main() { eprintln!("invalid scenario {}", other); std::process::exit(1); } - }; + } + .unwrap(); graphviz_tree(&segments, &size); } @@ -251,7 +252,7 @@ fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) { #[test] fn scenarios_return_same_size() { - type ScenarioFn = fn() -> (Vec, SegmentSize); + type ScenarioFn = fn() -> anyhow::Result<(Vec, SegmentSize)>; let truths: &[(u32, ScenarioFn, _)] = &[ (line!(), scenario_1, 8000), (line!(), scenario_2, 9000), @@ -262,7 +263,7 @@ fn scenarios_return_same_size() { ]; for (line, scenario, expected) in truths { - let (_, size) = scenario(); + let (_, size) = scenario().unwrap(); assert_eq!(*expected, size.total_children(), "scenario on line {line}"); } } diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 3b1a1f5aff..82c9267f4a 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -8,6 +8,7 @@ use strum_macros::{EnumString, EnumVariantNames}; pub enum LogFormat { Plain, Json, + Test, } impl LogFormat { @@ -39,6 +40,7 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> { match log_format { LogFormat::Json => base_logger.json().init(), LogFormat::Plain => base_logger.init(), + LogFormat::Test => base_logger.with_test_writer().init(), } Ok(()) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index f8a0bc6f08..c07026261d 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -59,7 +59,7 @@ pub async fn collect_metrics( None, None, "synthetic size calculation", - true, + false, async move { calculate_synthetic_size_worker(synthetic_size_calculation_interval) .instrument(info_span!("synthetic_size_worker")) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 6ae70e3a30..cc521c5e35 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1405,15 +1405,15 @@ fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range { Key { field1: 0x01, field2, - field3: segno, - field4: 0, + field3: 1, + field4: segno, field5: 0, field6: 0, }..Key { field1: 0x01, field2, - field3: segno, - field4: 0, + field3: 1, + field4: segno, field5: 1, field6: 0, } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1d0d6b66ab..c18c645e5b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2627,8 +2627,10 @@ where pub mod harness { use bytes::{Bytes, BytesMut}; use once_cell::sync::Lazy; + use once_cell::sync::OnceCell; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::{fs, path::PathBuf}; + use utils::logging; use utils::lsn::Lsn; use crate::{ @@ -2692,6 +2694,8 @@ pub mod harness { ), } + static LOG_HANDLE: OnceCell<()> = OnceCell::new(); + impl<'a> TenantHarness<'a> { pub fn create(test_name: &'static str) -> anyhow::Result { Self::create_internal(test_name, false) @@ -2706,6 +2710,10 @@ pub mod harness { (Some(LOCK.read().unwrap()), None) }; + LOG_HANDLE.get_or_init(|| { + logging::init(logging::LogFormat::Test).expect("Failed to init test logging") + }); + let repo_dir = PageServerConf::test_repo_dir(test_name); let _ = fs::remove_dir_all(&repo_dir); fs::create_dir_all(&repo_dir)?; diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 44bed5959f..01c5359e88 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -250,15 +250,32 @@ where L: ?Sized + Layer, { /// - /// Find the latest layer that covers the given 'key', with lsn < - /// 'end_lsn'. + /// Find the latest layer (by lsn.end) that covers the given + /// 'key', with lsn.start < 'end_lsn'. /// - /// Returns the layer, if any, and an 'lsn_floor' value that - /// indicates which portion of the layer the caller should - /// check. 'lsn_floor' is normally the start-LSN of the layer, but - /// can be greater if there is an overlapping layer that might - /// contain the version, even if it's missing from the returned - /// layer. + /// The caller of this function is the page reconstruction + /// algorithm looking for the next relevant delta layer, or + /// the terminal image layer. The caller will pass the lsn_floor + /// value as end_lsn in the next call to search. + /// + /// If there's an image layer exactly below the given end_lsn, + /// search should return that layer regardless if there are + /// overlapping deltas. + /// + /// If the latest layer is a delta and there is an overlapping + /// image with it below, the lsn_floor returned should be right + /// above that image so we don't skip it in the search. Otherwise + /// the lsn_floor returned should be the bottom of the delta layer + /// because we should make as much progress down the lsn axis + /// as possible. It's fine if this way we skip some overlapping + /// deltas, because the delta we returned would contain the same + /// wal content. + /// + /// TODO: This API is convoluted and inefficient. If the caller + /// makes N search calls, we'll end up finding the same latest + /// image layer N times. We should either cache the latest image + /// layer result, or simplify the api to `get_latest_image` and + /// `get_latest_delta`, and only call `get_latest_image` once. /// /// NOTE: This only searches the 'historic' layers, *not* the /// 'open' and 'frozen' layers! @@ -401,7 +418,9 @@ where NUM_ONDISK_LAYERS.dec(); } - /// Is there a newer image layer for given key- and LSN-range? + /// Is there a newer image layer for given key- and LSN-range? Or a set + /// of image layers within the specified lsn range that cover the entire + /// specified key range? /// /// This is used for garbage collection, to determine if an old layer can /// be deleted. @@ -488,8 +507,8 @@ where /// /// Divide the whole given range of keys into sub-ranges based on the latest - /// image layer that covers each range. (This is used when creating new - /// image layers) + /// image layer that covers each range at the specified lsn (inclusive). + /// This is used when creating new image layers. /// // FIXME: clippy complains that the result type is very complex. She's probably // right... @@ -541,8 +560,15 @@ where Ok(ranges) } - /// Count how many L1 delta layers there are that overlap with the - /// given key and LSN range. + /// Count the height of the tallest stack of deltas in this 2d region. + /// + /// This number is used to compute the largest number of deltas that + /// we'll need to visit for any page reconstruction in this region. + /// We use this heuristic to decide whether to create an image layer. + /// + /// TODO currently we just return the total number of deltas in the + /// region, no matter if they're stacked on top of each other + /// or next to each other. pub fn count_deltas(&self, key_range: &Range, lsn_range: &Range) -> Result { let mut result = 0; if lsn_range.start >= lsn_range.end { diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index dd4bf768a7..61cb32fc76 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -23,7 +23,13 @@ use tracing::*; pub struct ModelInputs { updates: Vec, retention_period: u64, + + /// Relevant lsns per timeline. + /// + /// This field is not required for deserialization purposes, which is mostly used in tests. The + /// LSNs explain the outcome (updates) but are not needed in size calculation. #[serde_as(as = "HashMap")] + #[serde(default)] timeline_inputs: HashMap, } @@ -32,6 +38,8 @@ pub struct ModelInputs { #[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] struct TimelineInputs { + #[serde_as(as = "serde_with::DisplayFromStr")] + ancestor_lsn: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] last_record: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] @@ -178,19 +186,20 @@ pub(super) async fn gather_inputs( // our advantage with `?` error handling. let mut joinset = tokio::task::JoinSet::new(); - let timelines = tenant + // refresh is needed to update gc related pitr_cutoff and horizon_cutoff + tenant .refresh_gc_info() .await .context("Failed to refresh gc_info before gathering inputs")?; + let timelines = tenant.list_timelines(); + if timelines.is_empty() { - // All timelines are below tenant's gc_horizon; alternative would be to use - // Tenant::list_timelines but then those gc_info's would not be updated yet, possibly - // missing GcInfo::retain_lsns or having obsolete values for cutoff's. + // perhaps the tenant has just been created, and as such doesn't have any data yet return Ok(ModelInputs { updates: vec![], retention_period: 0, - timeline_inputs: HashMap::new(), + timeline_inputs: HashMap::default(), }); } @@ -201,13 +210,25 @@ pub(super) async fn gather_inputs( let mut updates = Vec::new(); - // record the per timline values used to determine `retention_period` + // record the per timeline values useful to debug the model inputs, also used to track + // ancestor_lsn without keeping a hold of Timeline let mut timeline_inputs = HashMap::with_capacity(timelines.len()); // used to determine the `retention_period` for the size model let mut max_cutoff_distance = None; + // mapping from (TimelineId, Lsn) => if this branch point has been handled already via + // GcInfo::retain_lsns or if it needs to have its logical_size calculated. + let mut referenced_branch_froms = HashMap::<(TimelineId, Lsn), bool>::new(); + for timeline in timelines { + if !timeline.is_active() { + anyhow::bail!( + "timeline {} is not active, cannot calculate tenant_size now", + timeline.timeline_id + ); + } + let last_record_lsn = timeline.get_last_record_lsn(); let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = { @@ -273,13 +294,30 @@ pub(super) async fn gather_inputs( // all timelines branch from something, because it might be impossible to pinpoint // which is the tenant_size_model's "default" branch. + + let ancestor_lsn = timeline.get_ancestor_lsn(); + updates.push(Update { - lsn: timeline.get_ancestor_lsn(), + lsn: ancestor_lsn, command: Command::BranchFrom(timeline.get_ancestor_timeline_id()), timeline_id: timeline.timeline_id, }); + if let Some(parent_timeline_id) = timeline.get_ancestor_timeline_id() { + // refresh_gc_info will update branchpoints and pitr_cutoff but only do it for branches + // which are over gc_horizon. for example, a "main" branch which never received any + // updates apart from initdb not have branch points recorded. + referenced_branch_froms + .entry((parent_timeline_id, timeline.get_ancestor_lsn())) + .or_default(); + } + for (lsn, _kind) in &interesting_lsns { + // mark this visited so don't need to re-process this parent + *referenced_branch_froms + .entry((timeline.timeline_id, *lsn)) + .or_default() = true; + if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) { updates.push(Update { lsn: *lsn, @@ -295,22 +333,10 @@ pub(super) async fn gather_inputs( } } - // all timelines also have an end point if they have made any progress - if last_record_lsn > timeline.get_ancestor_lsn() - && !interesting_lsns - .iter() - .any(|(lsn, _)| lsn == &last_record_lsn) - { - updates.push(Update { - lsn: last_record_lsn, - command: Command::EndOfBranch, - timeline_id: timeline.timeline_id, - }); - } - timeline_inputs.insert( timeline.timeline_id, TimelineInputs { + ancestor_lsn, last_record: last_record_lsn, // this is not used above, because it might not have updated recently enough latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), @@ -321,6 +347,80 @@ pub(super) async fn gather_inputs( ); } + // iterate over discovered branch points and make sure we are getting logical sizes at those + // points. + for ((timeline_id, lsn), handled) in referenced_branch_froms.iter() { + if *handled { + continue; + } + + let timeline_id = *timeline_id; + let lsn = *lsn; + + match timeline_inputs.get(&timeline_id) { + Some(inputs) if inputs.ancestor_lsn == lsn => { + // we don't need an update at this branch point which is also point where + // timeline_id branch was branched from. + continue; + } + Some(_) => {} + None => { + // we should have this because we have iterated through all of the timelines + anyhow::bail!("missing timeline_input for {timeline_id}") + } + } + + if let Some(size) = logical_size_cache.get(&(timeline_id, lsn)) { + updates.push(Update { + lsn, + timeline_id, + command: Command::Update(*size), + }); + + needed_cache.insert((timeline_id, lsn)); + } else { + let timeline = tenant + .get_timeline(timeline_id, false) + .context("find referenced ancestor timeline")?; + let parallel_size_calcs = Arc::clone(limit); + joinset.spawn(calculate_logical_size( + parallel_size_calcs, + timeline.clone(), + lsn, + )); + + if let Some(parent_id) = timeline.get_ancestor_timeline_id() { + // we should not find new ones because we iterated tenants all timelines + anyhow::ensure!( + timeline_inputs.contains_key(&parent_id), + "discovered new timeline {parent_id} (parent of {timeline_id})" + ); + } + }; + } + + // finally add in EndOfBranch for all timelines where their last_record_lsn is not a branch + // point. this is needed by the model. + for (timeline_id, inputs) in timeline_inputs.iter() { + let lsn = inputs.last_record; + + if referenced_branch_froms.contains_key(&(*timeline_id, lsn)) { + // this means that the (timeline_id, last_record_lsn) represents a branch point + // we do not want to add EndOfBranch updates for these points because it doesn't fit + // into the current tenant_size_model. + continue; + } + + if lsn > inputs.ancestor_lsn { + // all timelines also have an end point if they have made any progress + updates.push(Update { + lsn, + command: Command::EndOfBranch, + timeline_id: *timeline_id, + }); + } + } + let mut have_any_error = false; while let Some(res) = joinset.join_next().await { @@ -379,6 +479,7 @@ pub(super) async fn gather_inputs( // handled by the variant order in `Command`. // updates.sort_unstable(); + // And another sort to handle Command::BranchFrom ordering // in case when there are multiple branches at the same LSN. let sorted_updates = sort_updates_in_tree_order(updates)?; @@ -413,10 +514,10 @@ impl ModelInputs { let Lsn(now) = *lsn; match op { Command::Update(sz) => { - storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz)); + storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz))?; } Command::EndOfBranch => { - storage.insert_point(&Some(*timeline_id), "".into(), now, None); + storage.insert_point(&Some(*timeline_id), "".into(), now, None)?; } Command::BranchFrom(parent) => { // This branch command may fail if it cannot find a parent to branch from. @@ -425,7 +526,7 @@ impl ModelInputs { } } - Ok(storage.calculate(self.retention_period).total_children()) + Ok(storage.calculate(self.retention_period)?.total_children()) } } @@ -574,7 +675,10 @@ fn updates_sort() { fn verify_size_for_multiple_branches() { // this is generated from integration test test_tenant_size_with_multiple_branches, but this way // it has the stable lsn's - let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072,"timeline_inputs":{"cd9d9409c216e64bf580904facedb01b":{"last_record":"0/18D5E40","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/18B5E40","pitr_cutoff":"0/18B5E40","next_gc_cutoff":"0/18B5E40"},"10b532a550540bc15385eac4edde416a":{"last_record":"0/1839818","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/1819818","pitr_cutoff":"0/1819818","next_gc_cutoff":"0/1819818"},"230fc9d756f7363574c0d66533564dcc":{"last_record":"0/222F438","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/220F438","pitr_cutoff":"0/220F438","next_gc_cutoff":"0/220F438"}}}"#; + // + // timelineinputs have been left out, because those explain the inputs, but don't participate + // in further size calculations. + let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072}"#; let inputs: ModelInputs = serde_json::from_str(doc).unwrap(); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a552c05d63..fd0524016f 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -626,24 +626,20 @@ impl PostgresRedoProcess { // Create empty data directory for wal-redo postgres, deleting old one first. if datadir.exists() { - info!( - "old temporary datadir {} exists, removing", - datadir.display() - ); - fs::remove_dir_all(&datadir)?; + info!("old temporary datadir {datadir:?} exists, removing"); + fs::remove_dir_all(&datadir).map_err(|e| { + Error::new( + e.kind(), + format!("Old temporary dir {datadir:?} removal failure: {e}"), + ) + })?; } - let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("incorrect pg_bin_dir path: {}", e), - ) - })?; - let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("incorrect pg_lib_dir path: {}", e), - ) - })?; + let pg_bin_dir_path = conf + .pg_bin_dir(pg_version) + .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?; + let pg_lib_dir_path = conf + .pg_lib_dir(pg_version) + .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?; info!("running initdb in {}", datadir.display()); let initdb = Command::new(pg_bin_dir_path.join("initdb")) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d6c4c32b0b..8476066691 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1206,6 +1206,9 @@ class PageserverHttpClient(requests.Session): return res_json def tenant_size(self, tenant_id: TenantId) -> int: + return self.tenant_size_and_modelinputs(tenant_id)[0] + + def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]: """ Returns the tenant size, together with the model inputs as the second tuple item. """ @@ -1216,9 +1219,9 @@ class PageserverHttpClient(requests.Session): assert TenantId(res["id"]) == tenant_id size = res["size"] assert type(size) == int - # there are additional inputs, which are the collected raw information before being fed to the tenant_size_model - # there are no tests for those right now. - return size + inputs = res["inputs"] + assert type(inputs) is dict + return (size, inputs) def timeline_list( self, diff --git a/test_runner/regress/test_compute_ctl.py b/test_runner/regress/test_compute_ctl.py index f973bd8e60..05ac3841dc 100644 --- a/test_runner/regress/test_compute_ctl.py +++ b/test_runner/regress/test_compute_ctl.py @@ -194,7 +194,7 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): ) except TimeoutExpired as exc: ctl_logs = (exc.stderr or b"").decode("utf-8") - log.info("compute_ctl output:\n{ctl_logs}") + log.info(f"compute_ctl stderr:\n{ctl_logs}") with ExternalProcessManager(Path(pgdata) / "postmaster.pid"): start = "starting safekeepers syncing" diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 5747ae235f..72cfbc9dda 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -1,5 +1,6 @@ -from typing import List, Tuple +from typing import Any, List, Tuple +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn from fixtures.types import Lsn @@ -9,28 +10,247 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv): env = neon_simple_env (tenant_id, _) = env.neon_cli.create_tenant() http_client = env.pageserver.http_client() - size = http_client.tenant_size(tenant_id) + initial_size = http_client.tenant_size(tenant_id) - # we should never have zero, because there should be the initdb however - # this is questionable if we should have anything in this case, as the - # gc_cutoff is negative - assert ( - size == 0 - ), "initial implementation returns zero tenant_size before last_record_lsn is past gc_horizon" + # we should never have zero, because there should be the initdb "changes" + assert initial_size > 0, "initial implementation returns ~initdb tenant_size" - with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + main_branch_name = "main" + + with env.postgres.create_start( + main_branch_name, + tenant_id=tenant_id, + config_lines=["autovacuum=off", "checkpoint_timeout=10min"], + ) as pg: with pg.cursor() as cur: cur.execute("SELECT 1") row = cur.fetchone() assert row is not None assert row[0] == 1 size = http_client.tenant_size(tenant_id) - assert size == 0, "starting idle compute should not change the tenant size" + # we've disabled the autovacuum and checkpoint + # so background processes should not change the size. + # If this test will flake we should probably loosen the check + assert size == initial_size, "starting idle compute should not change the tenant size" # the size should be the same, until we increase the size over the # gc_horizon - size = http_client.tenant_size(tenant_id) - assert size == 0, "tenant_size should not be affected by shutdown of compute" + size, inputs = http_client.tenant_size_and_modelinputs(tenant_id) + assert size == initial_size, "tenant_size should not be affected by shutdown of compute" + + expected_commands: List[Any] = [{"branch_from": None}, "end_of_branch"] + actual_commands: List[Any] = list(map(lambda x: x["command"], inputs["updates"])) # type: ignore + assert actual_commands == expected_commands + + +def test_branched_empty_timeline_size(neon_simple_env: NeonEnv): + """ + Issue found in production. Because the ancestor branch was under + gc_horizon, the branchpoint was "dangling" and the computation could not be + done. + + Assuming gc_horizon = 50 + root: I 0---10------>20 + branch: |-------------------I---------->150 + gc_horizon + """ + env = neon_simple_env + (tenant_id, _) = env.neon_cli.create_tenant() + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_branch_timeline_id = env.neon_cli.create_branch("first-branch", tenant_id=tenant_id) + + with env.postgres.create_start("first-branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + "CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, first_branch_timeline_id) + + size_after_branching = http_client.tenant_size(tenant_id) + log.info(f"size_after_branching: {size_after_branching}") + + assert size_after_branching > initial_size + + +def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv): + """ + More general version of test_branched_empty_timeline_size + + Assuming gc_horizon = 50 + + root: I 0------10 + first: I 10 + nth_0: I 10 + nth_1: I 10 + nth_n: 10------------I--------100 + """ + env = neon_simple_env + (tenant_id, _) = env.neon_cli.create_tenant() + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_branch_name = "first" + env.neon_cli.create_branch(first_branch_name, tenant_id=tenant_id) + + size_after_branching = http_client.tenant_size(tenant_id) + + # this might be flaky like test_get_tenant_size_with_multiple_branches + # https://github.com/neondatabase/neon/issues/2962 + assert size_after_branching == initial_size + + last_branch_name = first_branch_name + last_branch = None + + for i in range(0, 4): + latest_branch_name = f"nth_{i}" + last_branch = env.neon_cli.create_branch( + latest_branch_name, ancestor_branch_name=last_branch_name, tenant_id=tenant_id + ) + last_branch_name = latest_branch_name + + size_after_branching = http_client.tenant_size(tenant_id) + assert size_after_branching == initial_size + + assert last_branch is not None + + with env.postgres.create_start(last_branch_name, tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + "CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, last_branch) + + size_after_writes = http_client.tenant_size(tenant_id) + assert size_after_writes > initial_size + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_branch_point_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = 15 + + main: 0--I-10------>20 + branch: |-------------------I---------->150 + gc_horizon + """ + + env = neon_simple_env + gc_horizon = 20_000 + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) + http_client = env.pageserver.http_client() + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + with pg.cursor() as cur: + cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + size_before_branching = http_client.tenant_size(tenant_id) + + assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int + + branch_id = env.neon_cli.create_branch( + "branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn + ) + + with env.postgres.create_start("branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + wait_for_last_flush_lsn(env, pg, tenant_id, branch_id) + + size_after = http_client.tenant_size(tenant_id) + + assert size_before_branching < size_after + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_parent_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = 5 + + main: 0----10----I->20 + branch: |-------------------I---------->150 + gc_horizon + """ + + env = neon_simple_env + gc_horizon = 200_000 + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) + http_client = env.pageserver.http_client() + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + with pg.cursor() as cur: + cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + + flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + with pg.cursor() as cur: + cur.execute("CREATE TABLE t00 AS SELECT i::bigint n FROM generate_series(0, 2000) s(i)") + + wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + size_before_branching = http_client.tenant_size(tenant_id) + + assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int + + branch_id = env.neon_cli.create_branch( + "branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn + ) + + with env.postgres.create_start("branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 10000) s(i)") + wait_for_last_flush_lsn(env, pg, tenant_id, branch_id) + + size_after = http_client.tenant_size(tenant_id) + + assert size_before_branching < size_after + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_only_heads_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = small + + main: 0--------10-----I>20 + first: |-----------------------------I>150 + second: |---------I>30 + """ + + env = neon_simple_env + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": "1024"}) + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_id = env.neon_cli.create_branch("first", tenant_id=tenant_id) + second_id = env.neon_cli.create_branch("second", tenant_id=tenant_id) + + ids = {"main": main_id, "first": first_id, "second": second_id} + + latest_size = None + + # gc is not expected to change the results + + for branch_name, amount in [("main", 2000), ("first", 15000), ("second", 3000)]: + with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {amount}) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, ids[branch_name]) + size_now = http_client.tenant_size(tenant_id) + if latest_size is not None: + assert size_now > latest_size + else: + assert size_now > initial_size + + latest_size = size_now def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 8addfcf72e..f4b71ae9b7 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -26,7 +26,7 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } -log = { version = "0.4", default-features = false, features = ["serde", "std"] } +log = { version = "0.4", default-features = false, features = ["std"] } memchr = { version = "2" } nom = { version = "7" } num-bigint = { version = "0.4" } @@ -45,6 +45,7 @@ tokio-util = { version = "0.7", features = ["codec", "io"] } tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } url = { version = "2", features = ["serde"] } [build-dependencies] @@ -54,7 +55,7 @@ either = { version = "1" } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } -log = { version = "0.4", default-features = false, features = ["serde", "std"] } +log = { version = "0.4", default-features = false, features = ["std"] } memchr = { version = "2" } nom = { version = "7" } prost = { version = "0.11" }