From d90c5a03af3763949442686e118581e5cdd4dd90 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Tue, 17 Jan 2023 22:07:38 +0200 Subject: [PATCH 01/15] Add more io::Error context when fail to operate on a path (#3254) I have a test failure that shows ``` Caused by: 0: Failed to reconstruct a page image: 1: Directory not empty (os error 39) ``` but does not really show where exactly that happens. https://neon-github-public-dev.s3.amazonaws.com/reports/pr-3227/release/3823785365/index.html#categories/c0057473fc9ec8fb70876fd29a171ce8/7088dab272f2c7b7/?attachment=60fe6ed2add4d82d The PR aims to add more context in debugging that issue. --- pageserver/src/walredo.rs | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a552c05d63..fd0524016f 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -626,24 +626,20 @@ impl PostgresRedoProcess { // Create empty data directory for wal-redo postgres, deleting old one first. if datadir.exists() { - info!( - "old temporary datadir {} exists, removing", - datadir.display() - ); - fs::remove_dir_all(&datadir)?; + info!("old temporary datadir {datadir:?} exists, removing"); + fs::remove_dir_all(&datadir).map_err(|e| { + Error::new( + e.kind(), + format!("Old temporary dir {datadir:?} removal failure: {e}"), + ) + })?; } - let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("incorrect pg_bin_dir path: {}", e), - ) - })?; - let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("incorrect pg_lib_dir path: {}", e), - ) - })?; + let pg_bin_dir_path = conf + .pg_bin_dir(pg_version) + .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?; + let pg_lib_dir_path = conf + .pg_lib_dir(pg_version) + .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?; info!("running initdb in {}", datadir.display()); let initdb = Command::new(pg_bin_dir_path.join("initdb")) From bd535b3371f4e45f13a3f9abbacc3efbf931616f Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 18 Jan 2023 02:29:05 +0200 Subject: [PATCH 02/15] If an error happens while checking for core dumps, don't panic. If we panic, we skip the 30s wait in 'main', and don't give the console a chance to observe the error. Which is not nice. Spotted by @ololobus at https://github.com/neondatabase/neon/pull/3352#discussion_r1072806981 --- compute_tools/src/compute.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c2c9ab2230..d652084e00 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -23,7 +23,7 @@ use std::sync::RwLock; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use log::{info, warn}; +use log::{error, info, warn}; use postgres::{Client, NoTls}; use serde::{Serialize, Serializer}; @@ -311,8 +311,9 @@ impl ComputeNode { .wait() .expect("failed to start waiting on Postgres process"); - self.check_for_core_dumps() - .expect("failed to check for core dumps"); + if let Err(err) = self.check_for_core_dumps() { + error!("error while checking for core dumps: {err:?}"); + } Ok(ecode) } From 4992160677509996064df4f3dacc79c64fe2c9c2 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Wed, 18 Jan 2023 14:58:55 +0200 Subject: [PATCH 03/15] Fix metric_collection_endpoint for prod. It was incorrectly set to staging url --- .github/ansible/production.hosts.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/ansible/production.hosts.yaml b/.github/ansible/production.hosts.yaml index 22bace5ade..ecb847bd61 100644 --- a/.github/ansible/production.hosts.yaml +++ b/.github/ansible/production.hosts.yaml @@ -7,7 +7,7 @@ storage: broker_endpoint: http://storage-broker.prod.local:50051 pageserver_config_stub: pg_distrib_dir: /usr/local - metric_collection_endpoint: http://console-staging.local/billing/api/v1/usage_events + metric_collection_endpoint: http://console-release.local/billing/api/v1/usage_events metric_collection_interval: 10min remote_storage: bucket_name: "{{ bucket_name }}" From c85374295fd38e081c1a683281016355471852bd Mon Sep 17 00:00:00 2001 From: Vadim Kharitonov Date: Wed, 18 Jan 2023 14:49:59 +0100 Subject: [PATCH 04/15] Change SENTRY_ENVIRONMENT from "development" to "staging" --- .github/ansible/staging.eu-west-1.hosts.yaml | 2 +- .github/ansible/staging.us-east-2.hosts.yaml | 2 +- .github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml | 2 +- .github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml | 2 +- .github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml | 2 +- .../helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml | 2 +- .github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml | 2 +- .github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/ansible/staging.eu-west-1.hosts.yaml b/.github/ansible/staging.eu-west-1.hosts.yaml index fce450ed39..f28dc8e07b 100644 --- a/.github/ansible/staging.eu-west-1.hosts.yaml +++ b/.github/ansible/staging.eu-west-1.hosts.yaml @@ -18,7 +18,7 @@ storage: ansible_aws_ssm_region: eu-west-1 ansible_aws_ssm_bucket_name: neon-dev-storage-eu-west-1 console_region_id: aws-eu-west-1 - sentry_environment: development + sentry_environment: staging children: pageservers: diff --git a/.github/ansible/staging.us-east-2.hosts.yaml b/.github/ansible/staging.us-east-2.hosts.yaml index 1d1b8dbfa4..4891875369 100644 --- a/.github/ansible/staging.us-east-2.hosts.yaml +++ b/.github/ansible/staging.us-east-2.hosts.yaml @@ -18,7 +18,7 @@ storage: ansible_aws_ssm_region: us-east-2 ansible_aws_ssm_bucket_name: neon-staging-storage-us-east-2 console_region_id: aws-us-east-2 - sentry_environment: development + sentry_environment: staging children: pageservers: diff --git a/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml b/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml index 47924456ba..c49b8d2009 100644 --- a/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml +++ b/.github/helm-values/dev-eu-west-1-zeta.neon-proxy-scram.yaml @@ -8,7 +8,7 @@ settings: authBackend: "console" authEndpoint: "http://console-staging.local/management/api/v2" domain: "*.eu-west-1.aws.neon.build" - sentryEnvironment: "development" + sentryEnvironment: "staging" wssPort: 8443 metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml b/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml index c6e682f571..ccf701f52d 100644 --- a/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml +++ b/.github/helm-values/dev-eu-west-1-zeta.neon-storage-broker.yaml @@ -49,4 +49,4 @@ extraManifests: - "{{ .Release.Namespace }}" settings: - sentryEnvironment: "development" + sentryEnvironment: "staging" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml b/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml index eb8fd50c0f..cb062f705d 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-proxy-link.yaml @@ -8,7 +8,7 @@ settings: authBackend: "link" authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/" uri: "https://console.stage.neon.tech/psql_session/" - sentryEnvironment: "development" + sentryEnvironment: "staging" metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml index 8a08738d5f..99b67d75c1 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram-legacy.yaml @@ -8,7 +8,7 @@ settings: authBackend: "console" authEndpoint: "http://console-staging.local/management/api/v2" domain: "*.cloud.stage.neon.tech" - sentryEnvironment: "development" + sentryEnvironment: "staging" wssPort: 8443 metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml index b02d46917c..764bb25b64 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml @@ -8,7 +8,7 @@ settings: authBackend: "console" authEndpoint: "http://console-staging.local/management/api/v2" domain: "*.us-east-2.aws.neon.build" - sentryEnvironment: "development" + sentryEnvironment: "staging" wssPort: 8443 metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events" metricCollectionInterval: "1min" diff --git a/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml b/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml index c7682d24c0..69363c5f13 100644 --- a/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml +++ b/.github/helm-values/dev-us-east-2-beta.neon-storage-broker.yaml @@ -49,4 +49,4 @@ extraManifests: - "{{ .Release.Namespace }}" settings: - sentryEnvironment: "development" + sentryEnvironment: "staging" From cb356f325978241e20a82703b04b784cdf8bb605 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Sat, 7 Jan 2023 00:23:35 +0200 Subject: [PATCH 05/15] Use actual temporary dir for pageserver unit tests --- .gitignore | 2 - control_plane/.gitignore | 1 - pageserver/src/config.rs | 5 - pageserver/src/tenant.rs | 105 ++++++++---------- pageserver/src/tenant/ephemeral_file.rs | 36 +++--- .../src/tenant/remote_timeline_client.rs | 2 +- pageserver/src/virtual_file.rs | 8 +- pageserver/src/walingest.rs | 12 +- .../src/walreceiver/connection_manager.rs | 16 +-- test_runner/sql_regress/.gitignore | 1 - 10 files changed, 80 insertions(+), 108 deletions(-) delete mode 100644 control_plane/.gitignore diff --git a/.gitignore b/.gitignore index f1afdee599..2e241ee8cd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,5 @@ /pg_install /target -/tmp_check -/tmp_check_cli __pycache__/ test_output/ .vscode diff --git a/control_plane/.gitignore b/control_plane/.gitignore deleted file mode 100644 index c1e54a6bcb..0000000000 --- a/control_plane/.gitignore +++ /dev/null @@ -1 +0,0 @@ -tmp_check/ diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 51d1664e52..f3e5fb8c1a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -693,11 +693,6 @@ impl PageServerConf { Ok(t_conf) } - #[cfg(test)] - pub fn test_repo_dir(test_name: &str) -> PathBuf { - PathBuf::from(format!("../tmp_check/test_{test_name}")) - } - pub fn dummy_conf(repo_dir: PathBuf) -> Self { let pg_distrib_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install"); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1d0d6b66ab..0dd6735993 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2626,9 +2626,9 @@ where #[cfg(test)] pub mod harness { use bytes::{Bytes, BytesMut}; - use once_cell::sync::Lazy; - use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; + use std::sync::Arc; use std::{fs, path::PathBuf}; + use tempfile::TempDir; use utils::lsn::Lsn; use crate::{ @@ -2659,8 +2659,6 @@ pub mod harness { buf.freeze() } - static LOCK: Lazy> = Lazy::new(|| RwLock::new(())); - impl From for TenantConfOpt { fn from(tenant_conf: TenantConf) -> Self { Self { @@ -2681,36 +2679,27 @@ pub mod harness { } } - pub struct TenantHarness<'a> { + /// The harness saves some boilerplate and provides a way to create functional tenant + /// without running pageserver binary. It uses temporary directory to store data in it. + /// Tempdir gets removed on harness drop. + pub struct TenantHarness { + // keep the struct to not to remove tmp dir during the test + _temp_repo_dir: TempDir, pub conf: &'static PageServerConf, pub tenant_conf: TenantConf, pub tenant_id: TenantId, - - pub lock_guard: ( - Option>, - Option>, - ), } - impl<'a> TenantHarness<'a> { - pub fn create(test_name: &'static str) -> anyhow::Result { - Self::create_internal(test_name, false) - } - pub fn create_exclusive(test_name: &'static str) -> anyhow::Result { - Self::create_internal(test_name, true) - } - fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result { - let lock_guard = if exclusive { - (None, Some(LOCK.write().unwrap())) - } else { - (Some(LOCK.read().unwrap()), None) - }; + static LOG_HANDLE: OnceCell<()> = OnceCell::new(); - let repo_dir = PageServerConf::test_repo_dir(test_name); - let _ = fs::remove_dir_all(&repo_dir); - fs::create_dir_all(&repo_dir)?; + impl TenantHarness { + pub fn new() -> anyhow::Result { + let temp_repo_dir = tempfile::tempdir()?; + // `TempDir` uses a randomly generated subdirectory of a system tmp dir, + // so far it's enough to take care of concurrently running tests. + let repo_dir = temp_repo_dir.path(); - let conf = PageServerConf::dummy_conf(repo_dir); + let conf = PageServerConf::dummy_conf(repo_dir.to_path_buf()); // Make a static copy of the config. This can never be free'd, but that's // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); @@ -2728,10 +2717,10 @@ pub mod harness { fs::create_dir_all(conf.timelines_path(&tenant_id))?; Ok(Self { + _temp_repo_dir: temp_repo_dir, conf, tenant_conf, tenant_id, - lock_guard, }) } @@ -2825,7 +2814,8 @@ mod tests { #[tokio::test] async fn test_basic() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_basic")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -2858,9 +2848,8 @@ mod tests { #[tokio::test] async fn no_duplicate_timelines() -> anyhow::Result<()> { - let tenant = TenantHarness::create("no_duplicate_timelines")? - .load() - .await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let _ = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -2891,7 +2880,8 @@ mod tests { /// #[tokio::test] async fn test_branch() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_branch")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -2988,10 +2978,8 @@ mod tests { #[tokio::test] async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> { - let tenant = - TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? - .load() - .await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3026,9 +3014,8 @@ mod tests { #[tokio::test] async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")? - .load() - .await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; tenant .create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)? @@ -3077,9 +3064,8 @@ mod tests { #[tokio::test] async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")? - .load() - .await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3101,9 +3087,8 @@ mod tests { } #[tokio::test] async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_parent_keeps_data_forever_after_branching")? - .load() - .await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3134,8 +3119,7 @@ mod tests { #[tokio::test] async fn timeline_load() -> anyhow::Result<()> { - const TEST_NAME: &str = "timeline_load"; - let harness = TenantHarness::create(TEST_NAME)?; + let harness = TenantHarness::new()?; { let tenant = harness.load().await; let tline = tenant @@ -3154,8 +3138,7 @@ mod tests { #[tokio::test] async fn timeline_load_with_ancestor() -> anyhow::Result<()> { - const TEST_NAME: &str = "timeline_load_with_ancestor"; - let harness = TenantHarness::create(TEST_NAME)?; + let harness = TenantHarness::new()?; // create two timelines { let tenant = harness.load().await; @@ -3193,8 +3176,7 @@ mod tests { #[tokio::test] async fn corrupt_metadata() -> anyhow::Result<()> { - const TEST_NAME: &str = "corrupt_metadata"; - let harness = TenantHarness::create(TEST_NAME)?; + let harness = TenantHarness::new()?; let tenant = harness.load().await; tenant @@ -3235,7 +3217,8 @@ mod tests { #[tokio::test] async fn test_images() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_images")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3302,7 +3285,8 @@ mod tests { // #[tokio::test] async fn test_bulk_insert() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_bulk_insert")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3346,7 +3330,8 @@ mod tests { #[tokio::test] async fn test_random_updates() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_random_updates")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3419,9 +3404,8 @@ mod tests { #[tokio::test] async fn test_traverse_branches() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_traverse_branches")? - .load() - .await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let mut tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3505,9 +3489,8 @@ mod tests { #[tokio::test] async fn test_traverse_ancestors() -> anyhow::Result<()> { - let tenant = TenantHarness::create("test_traverse_ancestors")? - .load() - .await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let mut tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index c433e65ad2..0debeaff1c 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -76,7 +76,7 @@ impl EphemeralFile { }) } - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> { + fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> io::Result<()> { let mut off = 0; while off < PAGE_SZ { let n = self @@ -277,7 +277,7 @@ impl Drop for EphemeralFile { } } -pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), io::Error> { +pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> io::Result<()> { if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) { match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) { Ok(_) => Ok(()), @@ -332,25 +332,17 @@ mod tests { use super::*; use crate::tenant::blob_io::{BlobCursor, BlobWriter}; use crate::tenant::block_io::BlockCursor; + use crate::tenant::harness::TenantHarness; use rand::{seq::SliceRandom, thread_rng, RngCore}; use std::fs; use std::str::FromStr; - fn harness( - test_name: &str, - ) -> Result<(&'static PageServerConf, TenantId, TimelineId), io::Error> { - let repo_dir = PageServerConf::test_repo_dir(test_name); - let _ = fs::remove_dir_all(&repo_dir); - let conf = PageServerConf::dummy_conf(repo_dir); - // Make a static copy of the config. This can never be free'd, but that's - // OK in a test. - let conf: &'static PageServerConf = Box::leak(Box::new(conf)); - - let tenant_id = TenantId::from_str("11000000000000000000000000000000").unwrap(); + fn harness() -> Result<(TenantHarness, TimelineId), io::Error> { + let harness = TenantHarness::new().expect("Failed to create tenant harness"); let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap(); - fs::create_dir_all(conf.timeline_path(&timeline_id, &tenant_id))?; + fs::create_dir_all(harness.timeline_path(&timeline_id))?; - Ok((conf, tenant_id, timeline_id)) + Ok((harness, timeline_id)) } // Helper function to slurp contents of a file, starting at the current position, @@ -367,10 +359,10 @@ mod tests { } #[test] - fn test_ephemeral_files() -> Result<(), io::Error> { - let (conf, tenant_id, timeline_id) = harness("ephemeral_files")?; + fn test_ephemeral_files() -> io::Result<()> { + let (harness, timeline_id) = harness()?; - let file_a = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let file_a = EphemeralFile::create(harness.conf, harness.tenant_id, timeline_id)?; file_a.write_all_at(b"foo", 0)?; assert_eq!("foo", read_string(&file_a, 0, 20)?); @@ -381,7 +373,7 @@ mod tests { // Open a lot of files, enough to cause some page evictions. let mut efiles = Vec::new(); for fileno in 0..100 { - let efile = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let efile = EphemeralFile::create(harness.conf, harness.tenant_id, timeline_id)?; efile.write_all_at(format!("file {}", fileno).as_bytes(), 0)?; assert_eq!(format!("file {}", fileno), read_string(&efile, 0, 10)?); efiles.push((fileno, efile)); @@ -398,10 +390,10 @@ mod tests { } #[test] - fn test_ephemeral_blobs() -> Result<(), io::Error> { - let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?; + fn test_ephemeral_blobs() -> io::Result<()> { + let (harness, timeline_id) = harness()?; - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let mut file = EphemeralFile::create(harness.conf, harness.tenant_id, timeline_id)?; let pos_foo = file.write_blob(b"foo")?; assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice()); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 013591caee..58b7eea1eb 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1064,7 +1064,7 @@ mod tests { // Test scheduling #[test] fn upload_scheduling() -> anyhow::Result<()> { - let harness = TenantHarness::create("upload_scheduling")?; + let harness = TenantHarness::new()?; let timeline_path = harness.timeline_path(&TIMELINE_ID); std::fs::create_dir_all(&timeline_path)?; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index fb216123c1..3ad049cc21 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -525,12 +525,13 @@ mod tests { }) } - fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + fn test_files(test_name: &str, openfunc: OF) -> Result<(), Error> where FD: Read + Write + Seek + FileExt, OF: Fn(&Path, &OpenOptions) -> Result, { - let testdir = crate::config::PageServerConf::test_repo_dir(testname); + let temp_repo_dir = tempfile::tempdir()?; + let testdir = temp_repo_dir.path().join(test_name); std::fs::create_dir_all(&testdir)?; let path_a = testdir.join("file_a"); @@ -632,7 +633,8 @@ mod tests { const THREADS: usize = 100; const SAMPLE: [u8; SIZE] = [0xADu8; SIZE]; - let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency"); + let temp_repo_dir = tempfile::tempdir()?; + let testdir = temp_repo_dir.path().join("vfile_concurrency"); std::fs::create_dir_all(&testdir)?; // Create a test file. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 0de2e6654d..77fce95160 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1146,7 +1146,8 @@ mod tests { #[tokio::test] async fn test_relsize() -> Result<()> { - let tenant = TenantHarness::create("test_relsize")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; @@ -1323,7 +1324,8 @@ mod tests { // and then created it again within the same layer. #[tokio::test] async fn test_drop_extend() -> Result<()> { - let tenant = TenantHarness::create("test_drop_extend")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; @@ -1376,7 +1378,8 @@ mod tests { // and then extended it again within the same layer. #[tokio::test] async fn test_truncate_extend() -> Result<()> { - let tenant = TenantHarness::create("test_truncate_extend")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; @@ -1497,7 +1500,8 @@ mod tests { /// split into multiple 1 GB segments in Postgres. #[tokio::test] async fn test_large_rel() -> Result<()> { - let tenant = TenantHarness::create("test_large_rel")?.load().await; + let harness = TenantHarness::new()?; + let tenant = harness.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 8b60e59305..be58aa0e07 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -846,7 +846,7 @@ mod tests { #[tokio::test] async fn no_connection_no_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("no_connection_no_candidate")?; + let harness = TenantHarness::new()?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -879,7 +879,7 @@ mod tests { #[tokio::test] async fn connection_no_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("connection_no_candidate")?; + let harness = TenantHarness::new()?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -942,7 +942,7 @@ mod tests { #[tokio::test] async fn no_connection_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("no_connection_candidate")?; + let harness = TenantHarness::new()?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1001,7 +1001,7 @@ mod tests { #[tokio::test] async fn candidate_with_many_connection_failures() -> anyhow::Result<()> { - let harness = TenantHarness::create("candidate_with_many_connection_failures")?; + let harness = TenantHarness::new()?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1041,7 +1041,7 @@ mod tests { #[tokio::test] async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?; + let harness = TenantHarness::new()?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1105,7 +1105,7 @@ mod tests { #[tokio::test] async fn timeout_connection_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("timeout_connection_threshhold_current_candidate")?; + let harness = TenantHarness::new()?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1166,7 +1166,7 @@ mod tests { #[tokio::test] async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::create("timeout_wal_over_threshhold_current_candidate")?; + let harness = TenantHarness::new()?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let new_lsn = Lsn(100_100).align(); @@ -1232,7 +1232,7 @@ mod tests { const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr"; - async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState { + async fn dummy_state(harness: &TenantHarness) -> WalreceiverState { WalreceiverState { id: TenantTimelineId { tenant_id: harness.tenant_id, diff --git a/test_runner/sql_regress/.gitignore b/test_runner/sql_regress/.gitignore index 89129d7358..83186b5c86 100644 --- a/test_runner/sql_regress/.gitignore +++ b/test_runner/sql_regress/.gitignore @@ -2,7 +2,6 @@ /pg_regress # Generated subdirectories -/tmp_check/ /results/ /log/ From ffca97bc1e10b6c351067b14bdee6bbc68369c4b Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 12 Jan 2023 15:14:04 +0200 Subject: [PATCH 06/15] Enable logs in unit tests --- libs/utils/src/logging.rs | 2 ++ pageserver/src/tenant.rs | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 3b1a1f5aff..82c9267f4a 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -8,6 +8,7 @@ use strum_macros::{EnumString, EnumVariantNames}; pub enum LogFormat { Plain, Json, + Test, } impl LogFormat { @@ -39,6 +40,7 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> { match log_format { LogFormat::Json => base_logger.json().init(), LogFormat::Plain => base_logger.init(), + LogFormat::Test => base_logger.with_test_writer().init(), } Ok(()) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0dd6735993..c53c9bc3e1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2626,9 +2626,11 @@ where #[cfg(test)] pub mod harness { use bytes::{Bytes, BytesMut}; + use once_cell::sync::OnceCell; use std::sync::Arc; use std::{fs, path::PathBuf}; use tempfile::TempDir; + use utils::logging; use utils::lsn::Lsn; use crate::{ @@ -2694,6 +2696,10 @@ pub mod harness { impl TenantHarness { pub fn new() -> anyhow::Result { + LOG_HANDLE.get_or_init(|| { + logging::init(logging::LogFormat::Test).expect("Failed to init test logging") + }); + let temp_repo_dir = tempfile::tempdir()?; // `TempDir` uses a randomly generated subdirectory of a system tmp dir, // so far it's enough to take care of concurrently running tests. From 7b22b5c43321bf729caea4766c7f98b589c405ab Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 18 Jan 2023 11:30:02 +0200 Subject: [PATCH 07/15] Switch to 'tracing' for logging, restructure code to make use of spans. Refactors Compute::prepare_and_run. It's split into subroutines differently, to make it easier to attach tracing spans to the different stages. The high-level logic for waiting for Postgres to exit is moved to the caller. Replace 'env_logger' with 'tracing', and add `#instrument` directives to different stages fo the startup process. This is a fairly mechanical change, except for the changes in 'spec.rs'. 'spec.rs' contained some complicated formatting, where parts of log messages were printed directly to stdout with `print`s. That was a bit messed up because the log normally goes to stderr, but those lines were printed to stdout. In our docker images, stderr and stdout both go to the same place so you wouldn't notice, but I don't think it was intentional. This changes the log format to the default 'tracing_subscriber::format' format. It's different from the Postgres log format, however, and because both compute_tools and Postgres print to the same log, it's now a mix of two different formats. I'm not sure how the Grafana log parsing pipeline can handle that. If it's a problem, we can build custom formatter to change the compute_tools log format to be the same as Postgres's, like it was before this commit, or we can change the Postgres log format to match tracing_formatter's, or we can start printing compute_tool's log output to a different destination than Postgres --- Cargo.lock | 6 +- compute_tools/Cargo.toml | 4 +- compute_tools/src/bin/compute_ctl.rs | 55 +++++--- compute_tools/src/checker.rs | 4 +- compute_tools/src/compute.rs | 77 ++++++----- compute_tools/src/http/api.rs | 2 +- compute_tools/src/informant.rs | 2 +- compute_tools/src/logger.rs | 44 ++---- compute_tools/src/monitor.rs | 2 +- compute_tools/src/pg_helpers.rs | 12 +- compute_tools/src/spec.rs | 171 ++++++++++++++---------- test_runner/regress/test_compute_ctl.py | 2 +- workspace_hack/Cargo.toml | 5 +- 13 files changed, 210 insertions(+), 176 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 59adf696a7..d8aba9ba68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -833,10 +833,8 @@ dependencies = [ "anyhow", "chrono", "clap 4.0.32", - "env_logger", "futures", "hyper", - "log", "notify", "postgres", "regex", @@ -845,6 +843,8 @@ dependencies = [ "tar", "tokio", "tokio-postgres", + "tracing", + "tracing-subscriber", "url", "workspace_hack", ] @@ -1954,7 +1954,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", - "serde", ] [[package]] @@ -4565,6 +4564,7 @@ dependencies = [ "tower", "tracing", "tracing-core", + "tracing-subscriber", "url", ] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 1e0aee81d7..4536604bdf 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -8,10 +8,8 @@ license.workspace = true anyhow.workspace = true chrono.workspace = true clap.workspace = true -env_logger.workspace = true futures.workspace = true hyper = { workspace = true, features = ["full"] } -log = { workspace = true, features = ["std", "serde"] } notify.workspace = true postgres.workspace = true regex.workspace = true @@ -20,6 +18,8 @@ serde_json.workspace = true tar.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio-postgres.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true url.workspace = true workspace_hack.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 307300cfd8..e5ab8eb153 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -40,7 +40,7 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; -use log::{error, info}; +use tracing::{error, info}; use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus}; use compute_tools::http::api::launch_http_server; @@ -53,7 +53,6 @@ use compute_tools::spec::*; use url::Url; fn main() -> Result<()> { - // TODO: re-use `utils::logging` later init_logger(DEFAULT_LOG_LEVEL)?; let matches = cli().get_matches(); @@ -122,29 +121,45 @@ fn main() -> Result<()> { // Also spawn the thread responsible for handling the VM informant -- if it's present let _vm_informant_handle = spawn_vm_informant_if_present().expect("cannot launch VM informant"); - // Run compute (Postgres) and hang waiting on it. - match compute.prepare_and_run() { - Ok(ec) => { - let code = ec.code().unwrap_or(1); - info!("Postgres exited with code {}, shutting down", code); - exit(code) - } - Err(error) => { - error!("could not start the compute node: {:?}", error); - + // Start Postgres + let mut delay_exit = false; + let mut exit_code = None; + let pg = match compute.start_compute() { + Ok(pg) => Some(pg), + Err(err) => { + error!("could not start the compute node: {:?}", err); let mut state = compute.state.write().unwrap(); - state.error = Some(format!("{:?}", error)); + state.error = Some(format!("{:?}", err)); state.status = ComputeStatus::Failed; drop(state); - - // Keep serving HTTP requests, so the cloud control plane was able to - // get the actual error. - info!("giving control plane 30s to collect the error before shutdown"); - thread::sleep(Duration::from_secs(30)); - info!("shutting down"); - Err(error) + delay_exit = true; + None } + }; + + // Wait for the child Postgres process forever. In this state Ctrl+C will + // propagate to Postgres and it will be shut down as well. + if let Some(mut pg) = pg { + let ecode = pg + .wait() + .expect("failed to start waiting on Postgres process"); + info!("Postgres exited with code {}, shutting down", ecode); + exit_code = ecode.code() } + + if let Err(err) = compute.check_for_core_dumps() { + error!("error while checking for core dumps: {err:?}"); + } + + // If launch failed, keep serving HTTP requests for a while, so the cloud + // control plane can get the actual error. + if delay_exit { + info!("giving control plane 30s to collect the error before shutdown"); + thread::sleep(Duration::from_secs(30)); + info!("shutting down"); + } + + exit(exit_code.unwrap_or(1)) } fn cli() -> clap::Command { diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs index ee1605c814..b8413de516 100644 --- a/compute_tools/src/checker.rs +++ b/compute_tools/src/checker.rs @@ -1,10 +1,11 @@ use anyhow::{anyhow, Result}; -use log::error; use postgres::Client; use tokio_postgres::NoTls; +use tracing::{error, instrument}; use crate::compute::ComputeNode; +#[instrument(skip_all)] pub fn create_writability_check_data(client: &mut Client) -> Result<()> { let query = " CREATE TABLE IF NOT EXISTS health_check ( @@ -21,6 +22,7 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> { Ok(()) } +#[instrument(skip_all)] pub async fn check_writability(compute: &ComputeNode) -> Result<()> { let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?; if client.is_closed() { diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d652084e00..e229bb1cc2 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -17,15 +17,15 @@ use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::Path; -use std::process::{Command, ExitStatus, Stdio}; +use std::process::{Command, Stdio}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::RwLock; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use log::{error, info, warn}; use postgres::{Client, NoTls}; use serde::{Serialize, Serializer}; +use tracing::{info, instrument, warn}; use crate::checker::create_writability_check_data; use crate::config; @@ -121,6 +121,7 @@ impl ComputeNode { // Get basebackup from the libpq connection to pageserver using `connstr` and // unarchive it to `pgdata` directory overriding all its previous content. + #[instrument(skip(self))] fn get_basebackup(&self, lsn: &str) -> Result<()> { let start_time = Utc::now(); @@ -154,6 +155,7 @@ impl ComputeNode { // Run `postgres` in a special mode with `--sync-safekeepers` argument // and return the reported LSN back to the caller. + #[instrument(skip(self))] fn sync_safekeepers(&self) -> Result { let start_time = Utc::now(); @@ -196,6 +198,7 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. + #[instrument(skip(self))] pub fn prepare_pgdata(&self) -> Result<()> { let spec = &self.spec; let pgdata_path = Path::new(&self.pgdata); @@ -229,9 +232,8 @@ impl ComputeNode { /// Start Postgres as a child process and manage DBs/roles. /// After that this will hang waiting on the postmaster process to exit. - pub fn run(&self) -> Result { - let start_time = Utc::now(); - + #[instrument(skip(self))] + pub fn start_postgres(&self) -> Result { let pgdata_path = Path::new(&self.pgdata); // Run postgres as a child process. @@ -242,6 +244,11 @@ impl ComputeNode { wait_for_postgres(&mut pg, pgdata_path)?; + Ok(pg) + } + + #[instrument(skip(self))] + pub fn apply_config(&self) -> Result<()> { // If connection fails, // it may be the old node with `zenith_admin` superuser. // @@ -279,8 +286,34 @@ impl ComputeNode { // 'Close' connection drop(client); - let startup_end_time = Utc::now(); + info!( + "finished configuration of compute for project {}", + self.spec.cluster.cluster_id + ); + + Ok(()) + } + + #[instrument(skip(self))] + pub fn start_compute(&self) -> Result { + info!( + "starting compute for project {}, operation {}, tenant {}, timeline {}", + self.spec.cluster.cluster_id, + self.spec.operation_uuid.as_ref().unwrap(), + self.tenant, + self.timeline, + ); + + self.prepare_pgdata()?; + + let start_time = Utc::now(); + + let pg = self.start_postgres()?; + + self.apply_config()?; + + let startup_end_time = Utc::now(); self.metrics.config_ms.store( startup_end_time .signed_duration_since(start_time) @@ -300,35 +333,7 @@ impl ComputeNode { self.set_status(ComputeStatus::Running); - info!( - "finished configuration of compute for project {}", - self.spec.cluster.cluster_id - ); - - // Wait for child Postgres process basically forever. In this state Ctrl+C - // will propagate to Postgres and it will be shut down as well. - let ecode = pg - .wait() - .expect("failed to start waiting on Postgres process"); - - if let Err(err) = self.check_for_core_dumps() { - error!("error while checking for core dumps: {err:?}"); - } - - Ok(ecode) - } - - pub fn prepare_and_run(&self) -> Result { - info!( - "starting compute for project {}, operation {}, tenant {}, timeline {}", - self.spec.cluster.cluster_id, - self.spec.operation_uuid.as_ref().unwrap(), - self.tenant, - self.timeline, - ); - - self.prepare_pgdata()?; - self.run() + Ok(pg) } // Look for core dumps and collect backtraces. @@ -341,7 +346,7 @@ impl ComputeNode { // // Use that as a default location and pattern, except macos where core dumps are written // to /cores/ directory by default. - fn check_for_core_dumps(&self) -> Result<()> { + pub fn check_for_core_dumps(&self) -> Result<()> { let core_dump_dir = match std::env::consts::OS { "macos" => Path::new("/cores/"), _ => Path::new(&self.pgdata), diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 44f83e5003..f2a49f332c 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -6,8 +6,8 @@ use std::thread; use anyhow::Result; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use log::{error, info}; use serde_json; +use tracing::{error, info}; use crate::compute::ComputeNode; diff --git a/compute_tools/src/informant.rs b/compute_tools/src/informant.rs index 09bd5e3138..8a6e3ab43a 100644 --- a/compute_tools/src/informant.rs +++ b/compute_tools/src/informant.rs @@ -1,8 +1,8 @@ -use log::{info, warn}; use std::path::Path; use std::process; use std::thread; use std::time::Duration; +use tracing::{info, warn}; use anyhow::{Context, Result}; diff --git a/compute_tools/src/logger.rs b/compute_tools/src/logger.rs index dde0a950f8..57e5496e86 100644 --- a/compute_tools/src/logger.rs +++ b/compute_tools/src/logger.rs @@ -1,42 +1,20 @@ -use std::io::Write; - use anyhow::Result; -use chrono::Utc; -use env_logger::{Builder, Env}; - -macro_rules! info_println { - ($($tts:tt)*) => { - if log_enabled!(Level::Info) { - println!($($tts)*); - } - } -} - -macro_rules! info_print { - ($($tts:tt)*) => { - if log_enabled!(Level::Info) { - print!($($tts)*); - } - } -} +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::prelude::*; /// Initialize `env_logger` using either `default_level` or /// `RUST_LOG` environment variable as default log level. pub fn init_logger(default_level: &str) -> Result<()> { - let env = Env::default().filter_or("RUST_LOG", default_level); + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level)); - Builder::from_env(env) - .format(|buf, record| { - let thread_handle = std::thread::current(); - writeln!( - buf, - "{} [{}] {}: {}", - Utc::now().format("%Y-%m-%d %H:%M:%S%.3f %Z"), - thread_handle.name().unwrap_or("main"), - record.level(), - record.args() - ) - }) + let fmt_layer = tracing_subscriber::fmt::layer() + .with_target(false) + .with_writer(std::io::stderr); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) .init(); Ok(()) diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index c871422e78..7c9878ffcf 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -3,8 +3,8 @@ use std::{thread, time}; use anyhow::Result; use chrono::{DateTime, Utc}; -use log::{debug, info}; use postgres::{Client, NoTls}; +use tracing::{debug, info}; use crate::compute::ComputeNode; diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index ff422f1cf5..921289d7c2 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -11,6 +11,7 @@ use anyhow::{bail, Result}; use notify::{RecursiveMode, Watcher}; use postgres::{Client, Transaction}; use serde::Deserialize; +use tracing::{debug, instrument}; const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds @@ -229,6 +230,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result> { /// Wait for Postgres to become ready to accept connections. It's ready to /// accept connections when the state-field in `pgdata/postmaster.pid` says /// 'ready'. +#[instrument(skip(pg))] pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { let pid_path = pgdata.join("postmaster.pid"); @@ -287,18 +289,18 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { } let res = rx.recv_timeout(Duration::from_millis(100)); - log::debug!("woken up by notify: {res:?}"); + debug!("woken up by notify: {res:?}"); // If there are multiple events in the channel already, we only need to be // check once. Swallow the extra events before we go ahead to check the // pid file. while let Ok(res) = rx.try_recv() { - log::debug!("swallowing extra event: {res:?}"); + debug!("swallowing extra event: {res:?}"); } // Check that we can open pid file first. if let Ok(file) = File::open(&pid_path) { if !postmaster_pid_seen { - log::debug!("postmaster.pid appeared"); + debug!("postmaster.pid appeared"); watcher .unwatch(pgdata) .expect("Failed to remove pgdata dir watch"); @@ -314,7 +316,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { // Pid file could be there and we could read it, but it could be empty, for example. if let Some(Ok(line)) = last_line { let status = line.trim(); - log::debug!("last line of postmaster.pid: {status:?}"); + debug!("last line of postmaster.pid: {status:?}"); // Now Postgres is ready to accept connections if status == "ready" { @@ -330,7 +332,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { } } - log::info!("PostgreSQL is now running, continuing to configure it"); + tracing::info!("PostgreSQL is now running, continuing to configure it"); Ok(()) } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 81e01fe555..40c8366bf4 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,12 +1,11 @@ use std::path::Path; use std::str::FromStr; -use std::time::Instant; use anyhow::Result; -use log::{info, log_enabled, warn, Level}; use postgres::config::Config; use postgres::{Client, NoTls}; use serde::Deserialize; +use tracing::{info, info_span, instrument, span_enabled, warn, Level}; use crate::compute::ComputeNode; use crate::config; @@ -80,23 +79,25 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> { /// Given a cluster spec json and open transaction it handles roles creation, /// deletion and update. +#[instrument(skip_all)] pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { let mut xact = client.transaction()?; let existing_roles: Vec = get_existing_roles(&mut xact)?; // Print a list of existing Postgres roles (only in debug mode) - info!("postgres roles:"); - for r in &existing_roles { - info_println!( - "{} - {}:{}", - " ".repeat(27 + 5), - r.name, - if r.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - } - ); + if span_enabled!(Level::INFO) { + info!("postgres roles:"); + for r in &existing_roles { + info!( + " - {}:{}", + r.name, + if r.encrypted_password.is_some() { + "[FILTERED]" + } else { + "(null)" + } + ); + } } // Process delta operations first @@ -137,58 +138,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { info!("cluster spec roles:"); for role in &spec.cluster.roles { let name = &role.name; - - info_print!( - "{} - {}:{}", - " ".repeat(27 + 5), - name, - if role.encrypted_password.is_some() { - "[FILTERED]" - } else { - "(null)" - } - ); - // XXX: with a limited number of roles it is fine, but consider making it a HashMap let pg_role = existing_roles.iter().find(|r| r.name == *name); - if let Some(r) = pg_role { - let mut update_role = false; - + enum RoleAction { + None, + Update, + Create, + } + let action = if let Some(r) = pg_role { if (r.encrypted_password.is_none() && role.encrypted_password.is_some()) || (r.encrypted_password.is_some() && role.encrypted_password.is_none()) { - update_role = true; + RoleAction::Update } else if let Some(pg_pwd) = &r.encrypted_password { // Check whether password changed or not (trim 'md5:' prefix first) - update_role = pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap(); + if pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap() { + RoleAction::Update + } else { + RoleAction::None + } + } else { + RoleAction::None } + } else { + RoleAction::Create + }; - if update_role { + match action { + RoleAction::None => {} + RoleAction::Update => { let mut query: String = format!("ALTER ROLE {} ", name.pg_quote()); - info_print!(" -> update"); - query.push_str(&role.to_pg_options()); xact.execute(query.as_str(), &[])?; } - } else { - info!("role name: '{}'", &name); - let mut query: String = format!("CREATE ROLE {} ", name.pg_quote()); - info!("role create query: '{}'", &query); - info_print!(" -> create"); + RoleAction::Create => { + let mut query: String = format!("CREATE ROLE {} ", name.pg_quote()); + info!("role create query: '{}'", &query); + query.push_str(&role.to_pg_options()); + xact.execute(query.as_str(), &[])?; - query.push_str(&role.to_pg_options()); - xact.execute(query.as_str(), &[])?; - - let grant_query = format!( - "GRANT pg_read_all_data, pg_write_all_data TO {}", - name.pg_quote() - ); - xact.execute(grant_query.as_str(), &[])?; - info!("role grant query: '{}'", &grant_query); + let grant_query = format!( + "GRANT pg_read_all_data, pg_write_all_data TO {}", + name.pg_quote() + ); + xact.execute(grant_query.as_str(), &[])?; + info!("role grant query: '{}'", &grant_query); + } } - info_print!("\n"); + if span_enabled!(Level::INFO) { + let pwd = if role.encrypted_password.is_some() { + "[FILTERED]" + } else { + "(null)" + }; + let action_str = match action { + RoleAction::None => "", + RoleAction::Create => " -> create", + RoleAction::Update => " -> update", + }; + info!(" - {}:{}{}", name, pwd, action_str); + } } xact.commit()?; @@ -197,6 +208,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { } /// Reassign all dependent objects and delete requested roles. +#[instrument(skip_all)] pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> { if let Some(ops) = &node.spec.delta_operations { // First, reassign all dependent objects to db owners. @@ -261,13 +273,16 @@ fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level /// atomicity should be enough here due to the order of operations and various checks, /// which together provide us idempotency. +#[instrument(skip_all)] pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { let existing_dbs: Vec = get_existing_dbs(client)?; // Print a list of existing Postgres databases (only in debug mode) - info!("postgres databases:"); - for r in &existing_dbs { - info_println!("{} - {}:{}", " ".repeat(27 + 5), r.name, r.owner); + if span_enabled!(Level::INFO) { + info!("postgres databases:"); + for r in &existing_dbs { + info!(" {}:{}", r.name, r.owner); + } } // Process delta operations first @@ -310,13 +325,15 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { for db in &spec.cluster.databases { let name = &db.name; - info_print!("{} - {}:{}", " ".repeat(27 + 5), db.name, db.owner); - // XXX: with a limited number of databases it is fine, but consider making it a HashMap let pg_db = existing_dbs.iter().find(|r| r.name == *name); - let start_time = Instant::now(); - if let Some(r) = pg_db { + enum DatabaseAction { + None, + Update, + Create, + } + let action = if let Some(r) = pg_db { // XXX: db owner name is returned as quoted string from Postgres, // when quoting is needed. let new_owner = if r.owner.starts_with('"') { @@ -326,29 +343,42 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { }; if new_owner != r.owner { + // Update the owner + DatabaseAction::Update + } else { + DatabaseAction::None + } + } else { + DatabaseAction::Create + }; + + match action { + DatabaseAction::None => {} + DatabaseAction::Update => { let query: String = format!( "ALTER DATABASE {} OWNER TO {}", name.pg_quote(), db.owner.pg_quote() ); - info_print!(" -> update"); - + let _ = info_span!("executing", query).entered(); client.execute(query.as_str(), &[])?; - let elapsed = start_time.elapsed().as_millis(); - info_print!(" ({} ms)", elapsed); } - } else { - let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote()); - info_print!(" -> create"); + DatabaseAction::Create => { + let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote()); + query.push_str(&db.to_pg_options()); + let _ = info_span!("executing", query).entered(); + client.execute(query.as_str(), &[])?; + } + }; - query.push_str(&db.to_pg_options()); - client.execute(query.as_str(), &[])?; - - let elapsed = start_time.elapsed().as_millis(); - info_print!(" ({} ms)", elapsed); + if span_enabled!(Level::INFO) { + let action_str = match action { + DatabaseAction::None => "", + DatabaseAction::Create => " -> create", + DatabaseAction::Update => " -> update", + }; + info!(" - {}:{}{}", db.name, db.owner, action_str); } - - info_print!("\n"); } Ok(()) @@ -356,6 +386,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. +#[instrument(skip_all)] pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { let spec = &node.spec; diff --git a/test_runner/regress/test_compute_ctl.py b/test_runner/regress/test_compute_ctl.py index f973bd8e60..05ac3841dc 100644 --- a/test_runner/regress/test_compute_ctl.py +++ b/test_runner/regress/test_compute_ctl.py @@ -194,7 +194,7 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): ) except TimeoutExpired as exc: ctl_logs = (exc.stderr or b"").decode("utf-8") - log.info("compute_ctl output:\n{ctl_logs}") + log.info(f"compute_ctl stderr:\n{ctl_logs}") with ExternalProcessManager(Path(pgdata) / "postmaster.pid"): start = "starting safekeepers syncing" diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 8addfcf72e..f4b71ae9b7 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -26,7 +26,7 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } -log = { version = "0.4", default-features = false, features = ["serde", "std"] } +log = { version = "0.4", default-features = false, features = ["std"] } memchr = { version = "2" } nom = { version = "7" } num-bigint = { version = "0.4" } @@ -45,6 +45,7 @@ tokio-util = { version = "0.7", features = ["codec", "io"] } tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } url = { version = "2", features = ["serde"] } [build-dependencies] @@ -54,7 +55,7 @@ either = { version = "1" } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } -log = { version = "0.4", default-features = false, features = ["serde", "std"] } +log = { version = "0.4", default-features = false, features = ["std"] } memchr = { version = "2" } nom = { version = "7" } prost = { version = "0.11" } From 300da5b872e105e404eb0842c9302f5742fb164b Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Thu, 19 Jan 2023 10:29:15 -0500 Subject: [PATCH 08/15] Improve layer map docstrings (#3382) --- pageserver/src/tenant/layer_map.rs | 52 ++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 44bed5959f..01c5359e88 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -250,15 +250,32 @@ where L: ?Sized + Layer, { /// - /// Find the latest layer that covers the given 'key', with lsn < - /// 'end_lsn'. + /// Find the latest layer (by lsn.end) that covers the given + /// 'key', with lsn.start < 'end_lsn'. /// - /// Returns the layer, if any, and an 'lsn_floor' value that - /// indicates which portion of the layer the caller should - /// check. 'lsn_floor' is normally the start-LSN of the layer, but - /// can be greater if there is an overlapping layer that might - /// contain the version, even if it's missing from the returned - /// layer. + /// The caller of this function is the page reconstruction + /// algorithm looking for the next relevant delta layer, or + /// the terminal image layer. The caller will pass the lsn_floor + /// value as end_lsn in the next call to search. + /// + /// If there's an image layer exactly below the given end_lsn, + /// search should return that layer regardless if there are + /// overlapping deltas. + /// + /// If the latest layer is a delta and there is an overlapping + /// image with it below, the lsn_floor returned should be right + /// above that image so we don't skip it in the search. Otherwise + /// the lsn_floor returned should be the bottom of the delta layer + /// because we should make as much progress down the lsn axis + /// as possible. It's fine if this way we skip some overlapping + /// deltas, because the delta we returned would contain the same + /// wal content. + /// + /// TODO: This API is convoluted and inefficient. If the caller + /// makes N search calls, we'll end up finding the same latest + /// image layer N times. We should either cache the latest image + /// layer result, or simplify the api to `get_latest_image` and + /// `get_latest_delta`, and only call `get_latest_image` once. /// /// NOTE: This only searches the 'historic' layers, *not* the /// 'open' and 'frozen' layers! @@ -401,7 +418,9 @@ where NUM_ONDISK_LAYERS.dec(); } - /// Is there a newer image layer for given key- and LSN-range? + /// Is there a newer image layer for given key- and LSN-range? Or a set + /// of image layers within the specified lsn range that cover the entire + /// specified key range? /// /// This is used for garbage collection, to determine if an old layer can /// be deleted. @@ -488,8 +507,8 @@ where /// /// Divide the whole given range of keys into sub-ranges based on the latest - /// image layer that covers each range. (This is used when creating new - /// image layers) + /// image layer that covers each range at the specified lsn (inclusive). + /// This is used when creating new image layers. /// // FIXME: clippy complains that the result type is very complex. She's probably // right... @@ -541,8 +560,15 @@ where Ok(ranges) } - /// Count how many L1 delta layers there are that overlap with the - /// given key and LSN range. + /// Count the height of the tallest stack of deltas in this 2d region. + /// + /// This number is used to compute the largest number of deltas that + /// we'll need to visit for any page reconstruction in this region. + /// We use this heuristic to decide whether to create an image layer. + /// + /// TODO currently we just return the total number of deltas in the + /// region, no matter if they're stacked on top of each other + /// or next to each other. pub fn count_deltas(&self, key_range: &Range, lsn_range: &Range) -> Result { let mut result = 0; if lsn_range.start >= lsn_range.end { From 262265daad0d1a000d1f425d71ddfb723c3a05e8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 19 Jan 2023 18:49:36 +0100 Subject: [PATCH 09/15] Revert "Use actual temporary dir for pageserver unit tests" This reverts commit 826e89b9ce43ce2c4d046b2c5d6376c3de8dbbac. The problem with that commit was that it deletes the TempDir while there are still EphemeralFile instances open. At first I thought this could be fixed by simply adding Handle::current().block_on(task_mgr::shutdown(None, Some(tenant_id), None)) to TenantHarness::drop, but it turned out to be insufficient. So, reverting the commit until we find a proper solution. refs https://github.com/neondatabase/neon/issues/3385 --- .gitignore | 2 + control_plane/.gitignore | 1 + pageserver/src/config.rs | 5 + pageserver/src/tenant.rs | 105 +++++++++++------- pageserver/src/tenant/ephemeral_file.rs | 38 ++++--- .../src/tenant/remote_timeline_client.rs | 2 +- pageserver/src/virtual_file.rs | 8 +- pageserver/src/walingest.rs | 12 +- .../src/walreceiver/connection_manager.rs | 16 +-- test_runner/sql_regress/.gitignore | 1 + 10 files changed, 110 insertions(+), 80 deletions(-) create mode 100644 control_plane/.gitignore diff --git a/.gitignore b/.gitignore index 2e241ee8cd..f1afdee599 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ /pg_install /target +/tmp_check +/tmp_check_cli __pycache__/ test_output/ .vscode diff --git a/control_plane/.gitignore b/control_plane/.gitignore new file mode 100644 index 0000000000..c1e54a6bcb --- /dev/null +++ b/control_plane/.gitignore @@ -0,0 +1 @@ +tmp_check/ diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f3e5fb8c1a..51d1664e52 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -693,6 +693,11 @@ impl PageServerConf { Ok(t_conf) } + #[cfg(test)] + pub fn test_repo_dir(test_name: &str) -> PathBuf { + PathBuf::from(format!("../tmp_check/test_{test_name}")) + } + pub fn dummy_conf(repo_dir: PathBuf) -> Self { let pg_distrib_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install"); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c53c9bc3e1..c18c645e5b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2626,10 +2626,10 @@ where #[cfg(test)] pub mod harness { use bytes::{Bytes, BytesMut}; + use once_cell::sync::Lazy; use once_cell::sync::OnceCell; - use std::sync::Arc; + use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::{fs, path::PathBuf}; - use tempfile::TempDir; use utils::logging; use utils::lsn::Lsn; @@ -2661,6 +2661,8 @@ pub mod harness { buf.freeze() } + static LOCK: Lazy> = Lazy::new(|| RwLock::new(())); + impl From for TenantConfOpt { fn from(tenant_conf: TenantConf) -> Self { Self { @@ -2681,31 +2683,42 @@ pub mod harness { } } - /// The harness saves some boilerplate and provides a way to create functional tenant - /// without running pageserver binary. It uses temporary directory to store data in it. - /// Tempdir gets removed on harness drop. - pub struct TenantHarness { - // keep the struct to not to remove tmp dir during the test - _temp_repo_dir: TempDir, + pub struct TenantHarness<'a> { pub conf: &'static PageServerConf, pub tenant_conf: TenantConf, pub tenant_id: TenantId, + + pub lock_guard: ( + Option>, + Option>, + ), } static LOG_HANDLE: OnceCell<()> = OnceCell::new(); - impl TenantHarness { - pub fn new() -> anyhow::Result { + impl<'a> TenantHarness<'a> { + pub fn create(test_name: &'static str) -> anyhow::Result { + Self::create_internal(test_name, false) + } + pub fn create_exclusive(test_name: &'static str) -> anyhow::Result { + Self::create_internal(test_name, true) + } + fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result { + let lock_guard = if exclusive { + (None, Some(LOCK.write().unwrap())) + } else { + (Some(LOCK.read().unwrap()), None) + }; + LOG_HANDLE.get_or_init(|| { logging::init(logging::LogFormat::Test).expect("Failed to init test logging") }); - let temp_repo_dir = tempfile::tempdir()?; - // `TempDir` uses a randomly generated subdirectory of a system tmp dir, - // so far it's enough to take care of concurrently running tests. - let repo_dir = temp_repo_dir.path(); + let repo_dir = PageServerConf::test_repo_dir(test_name); + let _ = fs::remove_dir_all(&repo_dir); + fs::create_dir_all(&repo_dir)?; - let conf = PageServerConf::dummy_conf(repo_dir.to_path_buf()); + let conf = PageServerConf::dummy_conf(repo_dir); // Make a static copy of the config. This can never be free'd, but that's // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); @@ -2723,10 +2736,10 @@ pub mod harness { fs::create_dir_all(conf.timelines_path(&tenant_id))?; Ok(Self { - _temp_repo_dir: temp_repo_dir, conf, tenant_conf, tenant_id, + lock_guard, }) } @@ -2820,8 +2833,7 @@ mod tests { #[tokio::test] async fn test_basic() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_basic")?.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -2854,8 +2866,9 @@ mod tests { #[tokio::test] async fn no_duplicate_timelines() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("no_duplicate_timelines")? + .load() + .await; let _ = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -2886,8 +2899,7 @@ mod tests { /// #[tokio::test] async fn test_branch() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_branch")?.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -2984,8 +2996,10 @@ mod tests { #[tokio::test] async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = + TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? + .load() + .await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3020,8 +3034,9 @@ mod tests { #[tokio::test] async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")? + .load() + .await; tenant .create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)? @@ -3070,8 +3085,9 @@ mod tests { #[tokio::test] async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")? + .load() + .await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3093,8 +3109,9 @@ mod tests { } #[tokio::test] async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_parent_keeps_data_forever_after_branching")? + .load() + .await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3125,7 +3142,8 @@ mod tests { #[tokio::test] async fn timeline_load() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + const TEST_NAME: &str = "timeline_load"; + let harness = TenantHarness::create(TEST_NAME)?; { let tenant = harness.load().await; let tline = tenant @@ -3144,7 +3162,8 @@ mod tests { #[tokio::test] async fn timeline_load_with_ancestor() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + const TEST_NAME: &str = "timeline_load_with_ancestor"; + let harness = TenantHarness::create(TEST_NAME)?; // create two timelines { let tenant = harness.load().await; @@ -3182,7 +3201,8 @@ mod tests { #[tokio::test] async fn corrupt_metadata() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + const TEST_NAME: &str = "corrupt_metadata"; + let harness = TenantHarness::create(TEST_NAME)?; let tenant = harness.load().await; tenant @@ -3223,8 +3243,7 @@ mod tests { #[tokio::test] async fn test_images() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_images")?.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3291,8 +3310,7 @@ mod tests { // #[tokio::test] async fn test_bulk_insert() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_bulk_insert")?.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3336,8 +3354,7 @@ mod tests { #[tokio::test] async fn test_random_updates() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_random_updates")?.load().await; let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3410,8 +3427,9 @@ mod tests { #[tokio::test] async fn test_traverse_branches() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_traverse_branches")? + .load() + .await; let mut tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; @@ -3495,8 +3513,9 @@ mod tests { #[tokio::test] async fn test_traverse_ancestors() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_traverse_ancestors")? + .load() + .await; let mut tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 0debeaff1c..c433e65ad2 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -76,7 +76,7 @@ impl EphemeralFile { }) } - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> io::Result<()> { + fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> { let mut off = 0; while off < PAGE_SZ { let n = self @@ -277,7 +277,7 @@ impl Drop for EphemeralFile { } } -pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> io::Result<()> { +pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), io::Error> { if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) { match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) { Ok(_) => Ok(()), @@ -332,17 +332,25 @@ mod tests { use super::*; use crate::tenant::blob_io::{BlobCursor, BlobWriter}; use crate::tenant::block_io::BlockCursor; - use crate::tenant::harness::TenantHarness; use rand::{seq::SliceRandom, thread_rng, RngCore}; use std::fs; use std::str::FromStr; - fn harness() -> Result<(TenantHarness, TimelineId), io::Error> { - let harness = TenantHarness::new().expect("Failed to create tenant harness"); - let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap(); - fs::create_dir_all(harness.timeline_path(&timeline_id))?; + fn harness( + test_name: &str, + ) -> Result<(&'static PageServerConf, TenantId, TimelineId), io::Error> { + let repo_dir = PageServerConf::test_repo_dir(test_name); + let _ = fs::remove_dir_all(&repo_dir); + let conf = PageServerConf::dummy_conf(repo_dir); + // Make a static copy of the config. This can never be free'd, but that's + // OK in a test. + let conf: &'static PageServerConf = Box::leak(Box::new(conf)); - Ok((harness, timeline_id)) + let tenant_id = TenantId::from_str("11000000000000000000000000000000").unwrap(); + let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap(); + fs::create_dir_all(conf.timeline_path(&timeline_id, &tenant_id))?; + + Ok((conf, tenant_id, timeline_id)) } // Helper function to slurp contents of a file, starting at the current position, @@ -359,10 +367,10 @@ mod tests { } #[test] - fn test_ephemeral_files() -> io::Result<()> { - let (harness, timeline_id) = harness()?; + fn test_ephemeral_files() -> Result<(), io::Error> { + let (conf, tenant_id, timeline_id) = harness("ephemeral_files")?; - let file_a = EphemeralFile::create(harness.conf, harness.tenant_id, timeline_id)?; + let file_a = EphemeralFile::create(conf, tenant_id, timeline_id)?; file_a.write_all_at(b"foo", 0)?; assert_eq!("foo", read_string(&file_a, 0, 20)?); @@ -373,7 +381,7 @@ mod tests { // Open a lot of files, enough to cause some page evictions. let mut efiles = Vec::new(); for fileno in 0..100 { - let efile = EphemeralFile::create(harness.conf, harness.tenant_id, timeline_id)?; + let efile = EphemeralFile::create(conf, tenant_id, timeline_id)?; efile.write_all_at(format!("file {}", fileno).as_bytes(), 0)?; assert_eq!(format!("file {}", fileno), read_string(&efile, 0, 10)?); efiles.push((fileno, efile)); @@ -390,10 +398,10 @@ mod tests { } #[test] - fn test_ephemeral_blobs() -> io::Result<()> { - let (harness, timeline_id) = harness()?; + fn test_ephemeral_blobs() -> Result<(), io::Error> { + let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?; - let mut file = EphemeralFile::create(harness.conf, harness.tenant_id, timeline_id)?; + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?; let pos_foo = file.write_blob(b"foo")?; assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice()); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 58b7eea1eb..013591caee 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1064,7 +1064,7 @@ mod tests { // Test scheduling #[test] fn upload_scheduling() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("upload_scheduling")?; let timeline_path = harness.timeline_path(&TIMELINE_ID); std::fs::create_dir_all(&timeline_path)?; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 3ad049cc21..fb216123c1 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -525,13 +525,12 @@ mod tests { }) } - fn test_files(test_name: &str, openfunc: OF) -> Result<(), Error> + fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> where FD: Read + Write + Seek + FileExt, OF: Fn(&Path, &OpenOptions) -> Result, { - let temp_repo_dir = tempfile::tempdir()?; - let testdir = temp_repo_dir.path().join(test_name); + let testdir = crate::config::PageServerConf::test_repo_dir(testname); std::fs::create_dir_all(&testdir)?; let path_a = testdir.join("file_a"); @@ -633,8 +632,7 @@ mod tests { const THREADS: usize = 100; const SAMPLE: [u8; SIZE] = [0xADu8; SIZE]; - let temp_repo_dir = tempfile::tempdir()?; - let testdir = temp_repo_dir.path().join("vfile_concurrency"); + let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency"); std::fs::create_dir_all(&testdir)?; // Create a test file. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 77fce95160..0de2e6654d 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1146,8 +1146,7 @@ mod tests { #[tokio::test] async fn test_relsize() -> Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_relsize")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; @@ -1324,8 +1323,7 @@ mod tests { // and then created it again within the same layer. #[tokio::test] async fn test_drop_extend() -> Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_drop_extend")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; @@ -1378,8 +1376,7 @@ mod tests { // and then extended it again within the same layer. #[tokio::test] async fn test_truncate_extend() -> Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_truncate_extend")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; @@ -1500,8 +1497,7 @@ mod tests { /// split into multiple 1 GB segments in Postgres. #[tokio::test] async fn test_large_rel() -> Result<()> { - let harness = TenantHarness::new()?; - let tenant = harness.load().await; + let tenant = TenantHarness::create("test_large_rel")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&tline).await?; diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index be58aa0e07..8b60e59305 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -846,7 +846,7 @@ mod tests { #[tokio::test] async fn no_connection_no_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("no_connection_no_candidate")?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -879,7 +879,7 @@ mod tests { #[tokio::test] async fn connection_no_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("connection_no_candidate")?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -942,7 +942,7 @@ mod tests { #[tokio::test] async fn no_connection_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("no_connection_candidate")?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1001,7 +1001,7 @@ mod tests { #[tokio::test] async fn candidate_with_many_connection_failures() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("candidate_with_many_connection_failures")?; let mut state = dummy_state(&harness).await; let now = Utc::now().naive_utc(); @@ -1041,7 +1041,7 @@ mod tests { #[tokio::test] async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1105,7 +1105,7 @@ mod tests { #[tokio::test] async fn timeout_connection_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("timeout_connection_threshhold_current_candidate")?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1166,7 +1166,7 @@ mod tests { #[tokio::test] async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = TenantHarness::new()?; + let harness = TenantHarness::create("timeout_wal_over_threshhold_current_candidate")?; let mut state = dummy_state(&harness).await; let current_lsn = Lsn(100_000).align(); let new_lsn = Lsn(100_100).align(); @@ -1232,7 +1232,7 @@ mod tests { const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr"; - async fn dummy_state(harness: &TenantHarness) -> WalreceiverState { + async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState { WalreceiverState { id: TenantTimelineId { tenant_id: harness.tenant_id, diff --git a/test_runner/sql_regress/.gitignore b/test_runner/sql_regress/.gitignore index 83186b5c86..89129d7358 100644 --- a/test_runner/sql_regress/.gitignore +++ b/test_runner/sql_regress/.gitignore @@ -2,6 +2,7 @@ /pg_regress # Generated subdirectories +/tmp_check/ /results/ /log/ From 47f9890bae75c63fc4b29c009cf9020ac2bcbafa Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Fri, 20 Jan 2023 15:37:24 +0100 Subject: [PATCH 10/15] [compute_ctl] Make role deletion spec processing idempotent (#3380) Previously, we were trying to re-assign owned objects of the already deleted role. This were causing a crash loop in the case when compute was restarted with a spec that includes delta operation for role deletion. To avoid such cases, check that role is still present before calling `reassign_owned_objects`. Resolves neondatabase/cloud#3553 --- compute_tools/src/compute.rs | 3 ++- compute_tools/src/pg_helpers.rs | 4 ++-- compute_tools/src/spec.rs | 14 +++++++++++++- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index e229bb1cc2..c8af8822b7 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -252,7 +252,7 @@ impl ComputeNode { // If connection fails, // it may be the old node with `zenith_admin` superuser. // - // In this case we need to connect with old `zenith_admin`name + // In this case we need to connect with old `zenith_admin` name // and create new user. We cannot simply rename connected user, // but we can create a new one and grant it all privileges. let mut client = match Client::connect(self.connstr.as_str(), NoTls) { @@ -278,6 +278,7 @@ impl ComputeNode { Ok(client) => client, }; + // Proceed with post-startup configuration. Note, that order of operations is important. handle_roles(&self.spec, &mut client)?; handle_databases(&self.spec, &mut client)?; handle_role_deletions(self, &mut client)?; diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 921289d7c2..6ab2864721 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -130,8 +130,8 @@ impl Role { /// Serialize a list of role parameters into a Postgres-acceptable /// string of arguments. pub fn to_pg_options(&self) -> String { - // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in Rails. - // For now we do not use generic `options` for roles. Once used, add + // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane. + // For now, we do not use generic `options` for roles. Once used, add // `self.options.as_pg_options()` somewhere here. let mut params: String = "LOGIN".to_string(); diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 40c8366bf4..97cd623052 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -213,8 +213,20 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result< if let Some(ops) = &node.spec.delta_operations { // First, reassign all dependent objects to db owners. info!("reassigning dependent objects of to-be-deleted roles"); + + // Fetch existing roles. We could've exported and used `existing_roles` from + // `handle_roles()`, but we only make this list there before creating new roles. + // Which is probably fine as we never create to-be-deleted roles, but that'd + // just look a bit untidy. Anyway, the entire `pg_roles` should be in shared + // buffers already, so this shouldn't be a big deal. + let mut xact = client.transaction()?; + let existing_roles: Vec = get_existing_roles(&mut xact)?; + xact.commit()?; + for op in ops { - if op.action == "delete_role" { + // Check that role is still present in Postgres, as this could be a + // restart with the same spec after role deletion. + if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) { reassign_owned_objects(node, &op.name)?; } } From 802f17407259bf9ad027480721083662f66b13b1 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 20 Jan 2023 18:19:52 +0200 Subject: [PATCH 11/15] fix: dont stop pageserver if we fail to calculate synthetic size --- pageserver/src/consumption_metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index f8a0bc6f08..c07026261d 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -59,7 +59,7 @@ pub async fn collect_metrics( None, None, "synthetic size calculation", - true, + false, async move { calculate_synthetic_size_worker(synthetic_size_calculation_interval) .instrument(info_span!("synthetic_size_worker")) From 478322ebf90f8580258b02e1fb5c899c7f8ad279 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Fri, 20 Jan 2023 20:21:36 +0200 Subject: [PATCH 12/15] Fix tenant size orphans (#3377) Before only the timelines which have passed the `gc_horizon` were processed which failed with orphans at the tree_sort phase. Example input in added `test_branched_empty_timeline_size` test case. The PR changes iteration to happen through all timelines, and in addition to that, any learned branch points will be calculated as they would had been in the original implementation if the ancestor branch had been over the `gc_horizon`. This also changes how tenants where all timelines are below `gc_horizon` are handled. Previously tenant_size 0 was returned, but now they will have approximately `initdb_lsn` worth of tenant_size. The PR also adds several new tenant size tests that describe various corner cases of branching structure and `gc_horizon` setting. They are currently disabled to not consume time during CI. Co-authored-by: Joonas Koivunen Co-authored-by: Anastasia Lubennikova --- pageserver/src/tenant/size.rs | 149 ++++++++++++--- test_runner/fixtures/neon_fixtures.py | 9 +- test_runner/regress/test_tenant_size.py | 244 ++++++++++++++++++++++-- 3 files changed, 360 insertions(+), 42 deletions(-) diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index dd4bf768a7..2181d6d4dc 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -23,7 +23,13 @@ use tracing::*; pub struct ModelInputs { updates: Vec, retention_period: u64, + + /// Relevant lsns per timeline. + /// + /// This field is not required for deserialization purposes, which is mostly used in tests. The + /// LSNs explain the outcome (updates) but are not needed in size calculation. #[serde_as(as = "HashMap")] + #[serde(default)] timeline_inputs: HashMap, } @@ -32,6 +38,8 @@ pub struct ModelInputs { #[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] struct TimelineInputs { + #[serde_as(as = "serde_with::DisplayFromStr")] + ancestor_lsn: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] last_record: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] @@ -178,21 +186,13 @@ pub(super) async fn gather_inputs( // our advantage with `?` error handling. let mut joinset = tokio::task::JoinSet::new(); - let timelines = tenant + // refresh is needed to update gc related pitr_cutoff and horizon_cutoff + tenant .refresh_gc_info() .await .context("Failed to refresh gc_info before gathering inputs")?; - if timelines.is_empty() { - // All timelines are below tenant's gc_horizon; alternative would be to use - // Tenant::list_timelines but then those gc_info's would not be updated yet, possibly - // missing GcInfo::retain_lsns or having obsolete values for cutoff's. - return Ok(ModelInputs { - updates: vec![], - retention_period: 0, - timeline_inputs: HashMap::new(), - }); - } + let timelines = tenant.list_timelines(); // record the used/inserted cache keys here, to remove extras not to start leaking // after initial run the cache should be quite stable, but live timelines will eventually @@ -201,13 +201,25 @@ pub(super) async fn gather_inputs( let mut updates = Vec::new(); - // record the per timline values used to determine `retention_period` + // record the per timeline values useful to debug the model inputs, also used to track + // ancestor_lsn without keeping a hold of Timeline let mut timeline_inputs = HashMap::with_capacity(timelines.len()); // used to determine the `retention_period` for the size model let mut max_cutoff_distance = None; + // mapping from (TimelineId, Lsn) => if this branch point has been handled already via + // GcInfo::retain_lsns or if it needs to have its logical_size calculated. + let mut referenced_branch_froms = HashMap::<(TimelineId, Lsn), bool>::new(); + for timeline in timelines { + if !timeline.is_active() { + anyhow::bail!( + "timeline {} is not active, cannot calculate tenant_size now", + timeline.timeline_id + ); + } + let last_record_lsn = timeline.get_last_record_lsn(); let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = { @@ -273,13 +285,30 @@ pub(super) async fn gather_inputs( // all timelines branch from something, because it might be impossible to pinpoint // which is the tenant_size_model's "default" branch. + + let ancestor_lsn = timeline.get_ancestor_lsn(); + updates.push(Update { - lsn: timeline.get_ancestor_lsn(), + lsn: ancestor_lsn, command: Command::BranchFrom(timeline.get_ancestor_timeline_id()), timeline_id: timeline.timeline_id, }); + if let Some(parent_timeline_id) = timeline.get_ancestor_timeline_id() { + // refresh_gc_info will update branchpoints and pitr_cutoff but only do it for branches + // which are over gc_horizon. for example, a "main" branch which never received any + // updates apart from initdb not have branch points recorded. + referenced_branch_froms + .entry((parent_timeline_id, timeline.get_ancestor_lsn())) + .or_default(); + } + for (lsn, _kind) in &interesting_lsns { + // mark this visited so don't need to re-process this parent + *referenced_branch_froms + .entry((timeline.timeline_id, *lsn)) + .or_default() = true; + if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) { updates.push(Update { lsn: *lsn, @@ -295,22 +324,10 @@ pub(super) async fn gather_inputs( } } - // all timelines also have an end point if they have made any progress - if last_record_lsn > timeline.get_ancestor_lsn() - && !interesting_lsns - .iter() - .any(|(lsn, _)| lsn == &last_record_lsn) - { - updates.push(Update { - lsn: last_record_lsn, - command: Command::EndOfBranch, - timeline_id: timeline.timeline_id, - }); - } - timeline_inputs.insert( timeline.timeline_id, TimelineInputs { + ancestor_lsn, last_record: last_record_lsn, // this is not used above, because it might not have updated recently enough latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), @@ -321,6 +338,80 @@ pub(super) async fn gather_inputs( ); } + // iterate over discovered branch points and make sure we are getting logical sizes at those + // points. + for ((timeline_id, lsn), handled) in referenced_branch_froms.iter() { + if *handled { + continue; + } + + let timeline_id = *timeline_id; + let lsn = *lsn; + + match timeline_inputs.get(&timeline_id) { + Some(inputs) if inputs.ancestor_lsn == lsn => { + // we don't need an update at this branch point which is also point where + // timeline_id branch was branched from. + continue; + } + Some(_) => {} + None => { + // we should have this because we have iterated through all of the timelines + anyhow::bail!("missing timeline_input for {timeline_id}") + } + } + + if let Some(size) = logical_size_cache.get(&(timeline_id, lsn)) { + updates.push(Update { + lsn, + timeline_id, + command: Command::Update(*size), + }); + + needed_cache.insert((timeline_id, lsn)); + } else { + let timeline = tenant + .get_timeline(timeline_id, false) + .context("find referenced ancestor timeline")?; + let parallel_size_calcs = Arc::clone(limit); + joinset.spawn(calculate_logical_size( + parallel_size_calcs, + timeline.clone(), + lsn, + )); + + if let Some(parent_id) = timeline.get_ancestor_timeline_id() { + // we should not find new ones because we iterated tenants all timelines + anyhow::ensure!( + timeline_inputs.contains_key(&parent_id), + "discovered new timeline {parent_id} (parent of {timeline_id})" + ); + } + }; + } + + // finally add in EndOfBranch for all timelines where their last_record_lsn is not a branch + // point. this is needed by the model. + for (timeline_id, inputs) in timeline_inputs.iter() { + let lsn = inputs.last_record; + + if referenced_branch_froms.contains_key(&(*timeline_id, lsn)) { + // this means that the (timeline_id, last_record_lsn) represents a branch point + // we do not want to add EndOfBranch updates for these points because it doesn't fit + // into the current tenant_size_model. + continue; + } + + if lsn > inputs.ancestor_lsn { + // all timelines also have an end point if they have made any progress + updates.push(Update { + lsn, + command: Command::EndOfBranch, + timeline_id: *timeline_id, + }); + } + } + let mut have_any_error = false; while let Some(res) = joinset.join_next().await { @@ -379,6 +470,7 @@ pub(super) async fn gather_inputs( // handled by the variant order in `Command`. // updates.sort_unstable(); + // And another sort to handle Command::BranchFrom ordering // in case when there are multiple branches at the same LSN. let sorted_updates = sort_updates_in_tree_order(updates)?; @@ -574,7 +666,10 @@ fn updates_sort() { fn verify_size_for_multiple_branches() { // this is generated from integration test test_tenant_size_with_multiple_branches, but this way // it has the stable lsn's - let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072,"timeline_inputs":{"cd9d9409c216e64bf580904facedb01b":{"last_record":"0/18D5E40","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/18B5E40","pitr_cutoff":"0/18B5E40","next_gc_cutoff":"0/18B5E40"},"10b532a550540bc15385eac4edde416a":{"last_record":"0/1839818","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/1819818","pitr_cutoff":"0/1819818","next_gc_cutoff":"0/1819818"},"230fc9d756f7363574c0d66533564dcc":{"last_record":"0/222F438","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/220F438","pitr_cutoff":"0/220F438","next_gc_cutoff":"0/220F438"}}}"#; + // + // timelineinputs have been left out, because those explain the inputs, but don't participate + // in further size calculations. + let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072}"#; let inputs: ModelInputs = serde_json::from_str(doc).unwrap(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d6c4c32b0b..8476066691 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1206,6 +1206,9 @@ class PageserverHttpClient(requests.Session): return res_json def tenant_size(self, tenant_id: TenantId) -> int: + return self.tenant_size_and_modelinputs(tenant_id)[0] + + def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]: """ Returns the tenant size, together with the model inputs as the second tuple item. """ @@ -1216,9 +1219,9 @@ class PageserverHttpClient(requests.Session): assert TenantId(res["id"]) == tenant_id size = res["size"] assert type(size) == int - # there are additional inputs, which are the collected raw information before being fed to the tenant_size_model - # there are no tests for those right now. - return size + inputs = res["inputs"] + assert type(inputs) is dict + return (size, inputs) def timeline_list( self, diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 5747ae235f..72cfbc9dda 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -1,5 +1,6 @@ -from typing import List, Tuple +from typing import Any, List, Tuple +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn from fixtures.types import Lsn @@ -9,28 +10,247 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv): env = neon_simple_env (tenant_id, _) = env.neon_cli.create_tenant() http_client = env.pageserver.http_client() - size = http_client.tenant_size(tenant_id) + initial_size = http_client.tenant_size(tenant_id) - # we should never have zero, because there should be the initdb however - # this is questionable if we should have anything in this case, as the - # gc_cutoff is negative - assert ( - size == 0 - ), "initial implementation returns zero tenant_size before last_record_lsn is past gc_horizon" + # we should never have zero, because there should be the initdb "changes" + assert initial_size > 0, "initial implementation returns ~initdb tenant_size" - with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + main_branch_name = "main" + + with env.postgres.create_start( + main_branch_name, + tenant_id=tenant_id, + config_lines=["autovacuum=off", "checkpoint_timeout=10min"], + ) as pg: with pg.cursor() as cur: cur.execute("SELECT 1") row = cur.fetchone() assert row is not None assert row[0] == 1 size = http_client.tenant_size(tenant_id) - assert size == 0, "starting idle compute should not change the tenant size" + # we've disabled the autovacuum and checkpoint + # so background processes should not change the size. + # If this test will flake we should probably loosen the check + assert size == initial_size, "starting idle compute should not change the tenant size" # the size should be the same, until we increase the size over the # gc_horizon - size = http_client.tenant_size(tenant_id) - assert size == 0, "tenant_size should not be affected by shutdown of compute" + size, inputs = http_client.tenant_size_and_modelinputs(tenant_id) + assert size == initial_size, "tenant_size should not be affected by shutdown of compute" + + expected_commands: List[Any] = [{"branch_from": None}, "end_of_branch"] + actual_commands: List[Any] = list(map(lambda x: x["command"], inputs["updates"])) # type: ignore + assert actual_commands == expected_commands + + +def test_branched_empty_timeline_size(neon_simple_env: NeonEnv): + """ + Issue found in production. Because the ancestor branch was under + gc_horizon, the branchpoint was "dangling" and the computation could not be + done. + + Assuming gc_horizon = 50 + root: I 0---10------>20 + branch: |-------------------I---------->150 + gc_horizon + """ + env = neon_simple_env + (tenant_id, _) = env.neon_cli.create_tenant() + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_branch_timeline_id = env.neon_cli.create_branch("first-branch", tenant_id=tenant_id) + + with env.postgres.create_start("first-branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + "CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, first_branch_timeline_id) + + size_after_branching = http_client.tenant_size(tenant_id) + log.info(f"size_after_branching: {size_after_branching}") + + assert size_after_branching > initial_size + + +def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv): + """ + More general version of test_branched_empty_timeline_size + + Assuming gc_horizon = 50 + + root: I 0------10 + first: I 10 + nth_0: I 10 + nth_1: I 10 + nth_n: 10------------I--------100 + """ + env = neon_simple_env + (tenant_id, _) = env.neon_cli.create_tenant() + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_branch_name = "first" + env.neon_cli.create_branch(first_branch_name, tenant_id=tenant_id) + + size_after_branching = http_client.tenant_size(tenant_id) + + # this might be flaky like test_get_tenant_size_with_multiple_branches + # https://github.com/neondatabase/neon/issues/2962 + assert size_after_branching == initial_size + + last_branch_name = first_branch_name + last_branch = None + + for i in range(0, 4): + latest_branch_name = f"nth_{i}" + last_branch = env.neon_cli.create_branch( + latest_branch_name, ancestor_branch_name=last_branch_name, tenant_id=tenant_id + ) + last_branch_name = latest_branch_name + + size_after_branching = http_client.tenant_size(tenant_id) + assert size_after_branching == initial_size + + assert last_branch is not None + + with env.postgres.create_start(last_branch_name, tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + "CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, last_branch) + + size_after_writes = http_client.tenant_size(tenant_id) + assert size_after_writes > initial_size + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_branch_point_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = 15 + + main: 0--I-10------>20 + branch: |-------------------I---------->150 + gc_horizon + """ + + env = neon_simple_env + gc_horizon = 20_000 + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) + http_client = env.pageserver.http_client() + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + with pg.cursor() as cur: + cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + size_before_branching = http_client.tenant_size(tenant_id) + + assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int + + branch_id = env.neon_cli.create_branch( + "branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn + ) + + with env.postgres.create_start("branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + wait_for_last_flush_lsn(env, pg, tenant_id, branch_id) + + size_after = http_client.tenant_size(tenant_id) + + assert size_before_branching < size_after + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_parent_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = 5 + + main: 0----10----I->20 + branch: |-------------------I---------->150 + gc_horizon + """ + + env = neon_simple_env + gc_horizon = 200_000 + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) + http_client = env.pageserver.http_client() + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + with pg.cursor() as cur: + cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)") + + flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + with pg.cursor() as cur: + cur.execute("CREATE TABLE t00 AS SELECT i::bigint n FROM generate_series(0, 2000) s(i)") + + wait_for_last_flush_lsn(env, pg, tenant_id, main_id) + + size_before_branching = http_client.tenant_size(tenant_id) + + assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int + + branch_id = env.neon_cli.create_branch( + "branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn + ) + + with env.postgres.create_start("branch", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 10000) s(i)") + wait_for_last_flush_lsn(env, pg, tenant_id, branch_id) + + size_after = http_client.tenant_size(tenant_id) + + assert size_before_branching < size_after + + +@pytest.mark.skip("This should work, but is left out because assumed covered by other tests") +def test_only_heads_within_horizon(neon_simple_env: NeonEnv): + """ + gc_horizon = small + + main: 0--------10-----I>20 + first: |-----------------------------I>150 + second: |---------I>30 + """ + + env = neon_simple_env + (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": "1024"}) + http_client = env.pageserver.http_client() + + initial_size = http_client.tenant_size(tenant_id) + + first_id = env.neon_cli.create_branch("first", tenant_id=tenant_id) + second_id = env.neon_cli.create_branch("second", tenant_id=tenant_id) + + ids = {"main": main_id, "first": first_id, "second": second_id} + + latest_size = None + + # gc is not expected to change the results + + for branch_name, amount in [("main", 2000), ("first", 15000), ("second", 3000)]: + with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute( + f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {amount}) s(i)" + ) + wait_for_last_flush_lsn(env, pg, tenant_id, ids[branch_name]) + size_now = http_client.tenant_size(tenant_id) + if latest_size is not None: + assert size_now > latest_size + else: + assert size_now > initial_size + + latest_size = size_now def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): From 664a69e65ba79c8a3b6c5bd8d428de41f2243bd7 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 23 Jan 2023 10:51:09 +0200 Subject: [PATCH 13/15] Fix slru_segment_key_range function: segno was assigned to incorrect Key field (#3354) --- pageserver/src/pgdatadir_mapping.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 6ae70e3a30..cc521c5e35 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1405,15 +1405,15 @@ fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range { Key { field1: 0x01, field2, - field3: segno, - field4: 0, + field3: 1, + field4: segno, field5: 0, field6: 0, }..Key { field1: 0x01, field2, - field3: segno, - field4: 0, + field3: 1, + field4: segno, field5: 1, field6: 0, } From edb02d3299772e9075561a3b41141d42894e3287 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Mon, 23 Jan 2023 15:08:48 +0200 Subject: [PATCH 14/15] Adding pageserver3 to staging (#3403) --- .github/ansible/staging.us-east-2.hosts.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/ansible/staging.us-east-2.hosts.yaml b/.github/ansible/staging.us-east-2.hosts.yaml index 4891875369..b46e729e32 100644 --- a/.github/ansible/staging.us-east-2.hosts.yaml +++ b/.github/ansible/staging.us-east-2.hosts.yaml @@ -29,6 +29,8 @@ storage: ansible_host: i-0565a8b4008aa3f40 pageserver-2.us-east-2.aws.neon.build: ansible_host: i-01e31cdf7e970586a + pageserver-3.us-east-2.aws.neon.build: + ansible_host: i-0602a0291365ef7cc safekeepers: hosts: From f67f0c1c11b4f5226225195b5be956c154ea4200 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 23 Jan 2023 17:12:51 +0200 Subject: [PATCH 15/15] More tenant size fixes (#3410) Small changes, but hopefully this will help with the panic detected in staging, for which we cannot get the debugging information right now (end-of-branch before branch-point). --- libs/tenant_size_model/src/lib.rs | 83 ++++++++++++---------- libs/tenant_size_model/src/main.rs | 107 +++++++++++++++-------------- pageserver/src/tenant/size.rs | 15 +++- 3 files changed, 114 insertions(+), 91 deletions(-) diff --git a/libs/tenant_size_model/src/lib.rs b/libs/tenant_size_model/src/lib.rs index 92bec8aebe..b156e1be9d 100644 --- a/libs/tenant_size_model/src/lib.rs +++ b/libs/tenant_size_model/src/lib.rs @@ -134,22 +134,25 @@ impl Storage { op: Cow<'static, str>, lsn: u64, size: Option, - ) where + ) -> anyhow::Result<()> + where K: std::borrow::Borrow, Q: std::hash::Hash + Eq + std::fmt::Debug, { - let lastseg_id = *self.branches.get(branch).unwrap(); + let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; let newseg_id = self.segments.len(); let lastseg = &mut self.segments[lastseg_id]; assert!(lsn > lastseg.end_lsn); + let Some(start_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; + let newseg = Segment { op, parent: Some(lastseg_id), start_lsn: lastseg.end_lsn, end_lsn: lsn, - start_size: lastseg.end_size.unwrap(), + start_size, end_size: size, children_after: Vec::new(), needed: false, @@ -158,6 +161,8 @@ impl Storage { self.segments.push(newseg); *self.branches.get_mut(branch).expect("read already") = newseg_id; + + Ok(()) } /// Advances the branch with the named operation, by the relative LSN and logical size bytes. @@ -167,21 +172,24 @@ impl Storage { op: Cow<'static, str>, lsn_bytes: u64, size_bytes: i64, - ) where + ) -> anyhow::Result<()> + where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - let lastseg_id = *self.branches.get(branch).unwrap(); + let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; let newseg_id = self.segments.len(); let lastseg = &mut self.segments[lastseg_id]; + let Some(last_end_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; + let newseg = Segment { op, parent: Some(lastseg_id), start_lsn: lastseg.end_lsn, end_lsn: lastseg.end_lsn + lsn_bytes, - start_size: lastseg.end_size.unwrap(), - end_size: Some((lastseg.end_size.unwrap() as i64 + size_bytes) as u64), + start_size: last_end_size, + end_size: Some((last_end_size as i64 + size_bytes) as u64), children_after: Vec::new(), needed: false, }; @@ -189,33 +197,33 @@ impl Storage { self.segments.push(newseg); *self.branches.get_mut(branch).expect("read already") = newseg_id; + Ok(()) } - pub fn insert(&mut self, branch: &Q, bytes: u64) + pub fn insert(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "insert".into(), bytes, bytes as i64); + self.modify_branch(branch, "insert".into(), bytes, bytes as i64) } - pub fn update(&mut self, branch: &Q, bytes: u64) + pub fn update(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "update".into(), bytes, 0i64); + self.modify_branch(branch, "update".into(), bytes, 0i64) } - pub fn delete(&mut self, branch: &Q, bytes: u64) + pub fn delete(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)); + self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)) } - /// Panics if the parent branch cannot be found. pub fn branch(&mut self, parent: &Q, name: K) -> anyhow::Result<()> where K: std::borrow::Borrow + std::fmt::Debug, @@ -236,7 +244,7 @@ impl Storage { Ok(()) } - pub fn calculate(&mut self, retention_period: u64) -> SegmentSize { + pub fn calculate(&mut self, retention_period: u64) -> anyhow::Result { // Phase 1: Mark all the segments that need to be retained for (_branch, &last_seg_id) in self.branches.iter() { let last_seg = &self.segments[last_seg_id]; @@ -261,7 +269,7 @@ impl Storage { self.size_from_snapshot_later(0) } - fn size_from_wal(&self, seg_id: usize) -> SegmentSize { + fn size_from_wal(&self, seg_id: usize) -> anyhow::Result { let seg = &self.segments[seg_id]; let this_size = seg.end_lsn - seg.start_lsn; @@ -272,10 +280,10 @@ impl Storage { for &child_id in seg.children_after.iter() { // try each child both ways let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id); + let p1 = self.size_from_wal(child_id)?; let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id); + let p2 = self.size_from_snapshot_later(child_id)?; if p1.total() < p2.total() { p1 } else { @@ -286,15 +294,15 @@ impl Storage { }; children.push(p); } - SegmentSize { + Ok(SegmentSize { seg_id, method: if seg.needed { WalNeeded } else { Wal }, this_size, children, - } + }) } - fn size_from_snapshot_later(&self, seg_id: usize) -> SegmentSize { + fn size_from_snapshot_later(&self, seg_id: usize) -> anyhow::Result { // If this is needed, then it's time to do the snapshot and continue // with wal method. let seg = &self.segments[seg_id]; @@ -305,10 +313,10 @@ impl Storage { for &child_id in seg.children_after.iter() { // try each child both ways let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id); + let p1 = self.size_from_wal(child_id)?; let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id); + let p2 = self.size_from_snapshot_later(child_id)?; if p1.total() < p2.total() { p1 } else { @@ -319,12 +327,12 @@ impl Storage { }; children.push(p); } - SegmentSize { + Ok(SegmentSize { seg_id, method: WalNeeded, this_size: seg.start_size, children, - } + }) } else { // If any of the direct children are "needed", need to be able to reconstruct here let mut children_needed = false; @@ -339,7 +347,7 @@ impl Storage { let method1 = if !children_needed { let mut children = Vec::new(); for child in seg.children_after.iter() { - children.push(self.size_from_snapshot_later(*child)); + children.push(self.size_from_snapshot_later(*child)?); } Some(SegmentSize { seg_id, @@ -355,20 +363,25 @@ impl Storage { let method2 = if children_needed || seg.children_after.len() >= 2 { let mut children = Vec::new(); for child in seg.children_after.iter() { - children.push(self.size_from_wal(*child)); + children.push(self.size_from_wal(*child)?); } + let Some(this_size) = seg.end_size else { anyhow::bail!("no end_size at junction {seg_id}") }; Some(SegmentSize { seg_id, method: SnapshotAfter, - this_size: seg.end_size.unwrap(), + this_size, children, }) } else { None }; - match (method1, method2) { - (None, None) => panic!(), + Ok(match (method1, method2) { + (None, None) => anyhow::bail!( + "neither method was applicable: children_after={}, children_needed={}", + seg.children_after.len(), + children_needed + ), (Some(method), None) => method, (None, Some(method)) => method, (Some(method1), Some(method2)) => { @@ -378,7 +391,7 @@ impl Storage { method2 } } - } + }) } } diff --git a/libs/tenant_size_model/src/main.rs b/libs/tenant_size_model/src/main.rs index 9378a98a09..e32dd055f4 100644 --- a/libs/tenant_size_model/src/main.rs +++ b/libs/tenant_size_model/src/main.rs @@ -7,118 +7,118 @@ use tenant_size_model::{Segment, SegmentSize, Storage}; // Main branch only. Some updates on it. -fn scenario_1() -> (Vec, SegmentSize) { +fn scenario_1() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Main branch only. Some updates on it. -fn scenario_2() -> (Vec, SegmentSize) { +fn scenario_2() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent - storage.update("main", 1_000); + storage.update("main", 1_000)?; - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Like 2, but more updates on main -fn scenario_3() -> (Vec, SegmentSize) { +fn scenario_3() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Diverged branches -fn scenario_4() -> (Vec, SegmentSize) { +fn scenario_4() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent for _ in 0..8 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } -fn scenario_5() -> (Vec, SegmentSize) { +fn scenario_5() -> anyhow::Result<(Vec, SegmentSize)> { let mut storage = Storage::new("a"); - storage.insert("a", 5000); - storage.branch("a", "b").unwrap(); - storage.update("b", 4000); - storage.update("a", 2000); - storage.branch("a", "c").unwrap(); - storage.insert("c", 4000); - storage.insert("a", 2000); + storage.insert("a", 5000)?; + storage.branch("a", "b")?; + storage.update("b", 4000)?; + storage.update("a", 2000)?; + storage.branch("a", "c")?; + storage.insert("c", 4000)?; + storage.insert("a", 2000)?; - let size = storage.calculate(5000); + let size = storage.calculate(5000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } -fn scenario_6() -> (Vec, SegmentSize) { +fn scenario_6() -> anyhow::Result<(Vec, SegmentSize)> { use std::borrow::Cow; const NO_OP: Cow<'static, str> = Cow::Borrowed(""); @@ -133,18 +133,18 @@ fn scenario_6() -> (Vec, SegmentSize) { let mut storage = Storage::new(None); - storage.branch(&None, branches[0]).unwrap(); // at 0 - storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128); // at 108951064 - storage.branch(&branches[0], branches[1]).unwrap(); // at 108951064 - storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392); // at 124511472 - storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904); // at 283415424 - storage.branch(&branches[0], branches[2]).unwrap(); // at 283415424 - storage.modify_branch(&branches[2], NO_OP, 15906192, 8192); // at 299321616 - storage.modify_branch(&branches[0], NO_OP, 18909976, 32768); // at 302325400 + storage.branch(&None, branches[0])?; // at 0 + storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128)?; // at 108951064 + storage.branch(&branches[0], branches[1])?; // at 108951064 + storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392)?; // at 124511472 + storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904)?; // at 283415424 + storage.branch(&branches[0], branches[2])?; // at 283415424 + storage.modify_branch(&branches[2], NO_OP, 15906192, 8192)?; // at 299321616 + storage.modify_branch(&branches[0], NO_OP, 18909976, 32768)?; // at 302325400 - let size = storage.calculate(100_000); + let size = storage.calculate(100_000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } fn main() { @@ -163,7 +163,8 @@ fn main() { eprintln!("invalid scenario {}", other); std::process::exit(1); } - }; + } + .unwrap(); graphviz_tree(&segments, &size); } @@ -251,7 +252,7 @@ fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) { #[test] fn scenarios_return_same_size() { - type ScenarioFn = fn() -> (Vec, SegmentSize); + type ScenarioFn = fn() -> anyhow::Result<(Vec, SegmentSize)>; let truths: &[(u32, ScenarioFn, _)] = &[ (line!(), scenario_1, 8000), (line!(), scenario_2, 9000), @@ -262,7 +263,7 @@ fn scenarios_return_same_size() { ]; for (line, scenario, expected) in truths { - let (_, size) = scenario(); + let (_, size) = scenario().unwrap(); assert_eq!(*expected, size.total_children(), "scenario on line {line}"); } } diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 2181d6d4dc..61cb32fc76 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -194,6 +194,15 @@ pub(super) async fn gather_inputs( let timelines = tenant.list_timelines(); + if timelines.is_empty() { + // perhaps the tenant has just been created, and as such doesn't have any data yet + return Ok(ModelInputs { + updates: vec![], + retention_period: 0, + timeline_inputs: HashMap::default(), + }); + } + // record the used/inserted cache keys here, to remove extras not to start leaking // after initial run the cache should be quite stable, but live timelines will eventually // require new lsns to be inspected. @@ -505,10 +514,10 @@ impl ModelInputs { let Lsn(now) = *lsn; match op { Command::Update(sz) => { - storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz)); + storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz))?; } Command::EndOfBranch => { - storage.insert_point(&Some(*timeline_id), "".into(), now, None); + storage.insert_point(&Some(*timeline_id), "".into(), now, None)?; } Command::BranchFrom(parent) => { // This branch command may fail if it cannot find a parent to branch from. @@ -517,7 +526,7 @@ impl ModelInputs { } } - Ok(storage.calculate(self.retention_period).total_children()) + Ok(storage.calculate(self.retention_period)?.total_children()) } }