From b4e00b8b220b6443c117fbaf9746ab4ef9c38e55 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 13 Nov 2024 18:07:39 +0000 Subject: [PATCH 01/19] pageserver: refuse to load tenants with suspiciously old indices in old generations (#9719) ## Problem Historically, if a control component passed a pageserver "generation: 1" this could be a quick way to corrupt a tenant by loading a historic index. Follows https://github.com/neondatabase/neon/pull/9383 Closes #6951 ## Summary of changes - Introduce a Fatal variant to DownloadError, to enable index downloads to signal when they have encountered a scary enough situation that we shouldn't proceed to load the tenant. - Handle this variant by putting the tenant into a broken state (no matter which timeline within the tenant reported it) - Add a test for this case In the event that this behavior fires when we don't want it to, we have ways to intervene: - "Touch" an affected index to update its mtime (download+upload S3 object) - If this behavior is triggered, it indicates we're attaching in some old generation, so we should be able to fix that by manually bumping generation numbers in the storage controller database (this should never happen, but it's an option if it does) --- libs/remote_storage/src/error.rs | 6 +- pageserver/src/tenant.rs | 6 ++ .../src/tenant/remote_timeline_client.rs | 8 ++- .../regress/test_pageserver_generations.py | 68 ++++++++++++++++++- 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/libs/remote_storage/src/error.rs b/libs/remote_storage/src/error.rs index 17790e9f70..ec9f868998 100644 --- a/libs/remote_storage/src/error.rs +++ b/libs/remote_storage/src/error.rs @@ -15,6 +15,9 @@ pub enum DownloadError { /// /// Concurrency control is not timed within timeout. Timeout, + /// Some integrity/consistency check failed during download. This is used during + /// timeline loads to cancel the load of a tenant if some timeline detects fatal corruption. + Fatal(String), /// The file was found in the remote storage, but the download failed. Other(anyhow::Error), } @@ -29,6 +32,7 @@ impl std::fmt::Display for DownloadError { DownloadError::Unmodified => write!(f, "File was not modified"), DownloadError::Cancelled => write!(f, "Cancelled, shutting down"), DownloadError::Timeout => write!(f, "timeout"), + DownloadError::Fatal(why) => write!(f, "Fatal read error: {why}"), DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"), } } @@ -41,7 +45,7 @@ impl DownloadError { pub fn is_permanent(&self) -> bool { use DownloadError::*; match self { - BadInput(_) | NotFound | Unmodified | Cancelled => true, + BadInput(_) | NotFound | Unmodified | Fatal(_) | Cancelled => true, Timeout | Other(_) => false, } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d0a96e78a6..61bb1fe40c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1433,6 +1433,12 @@ impl Tenant { info!(%timeline_id, "index_part not found on remote"); continue; } + Err(DownloadError::Fatal(why)) => { + // If, while loading one remote timeline, we saw an indication that our generation + // number is likely invalid, then we should not load the whole tenant. + error!(%timeline_id, "Fatal error loading timeline: {why}"); + anyhow::bail!(why.to_string()); + } Err(e) => { // Some (possibly ephemeral) error happened during index_part download. // Pretend the timeline exists to not delete the timeline directory, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index b37c16e133..600583f6b5 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -574,12 +574,18 @@ impl RemoteTimelineClient { if latest_index_generation > index_generation { // Unexpected! Why are we loading such an old index if a more recent one exists? - tracing::warn!( + // We will refuse to proceed, as there is no reasonable scenario where this should happen, but + // there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation + // backwards). + tracing::error!( ?index_generation, ?latest_index_generation, ?latest_index_mtime, "Found a newer index while loading an old one" ); + return Err(DownloadError::Fatal( + "Index age exceeds threshold and a newer index exists".into(), + )); } } diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 8f6c9f16fd..4f59efb8b3 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -35,9 +35,10 @@ from fixtures.pageserver.utils import ( wait_for_upload, ) from fixtures.remote_storage import ( + LocalFsStorage, RemoteStorageKind, ) -from fixtures.utils import wait_until +from fixtures.utils import run_only_on_default_postgres, wait_until from fixtures.workload import Workload if TYPE_CHECKING: @@ -728,3 +729,68 @@ def test_upgrade_generationless_local_file_paths( ) # We should download into the same local path we started with assert os.path.exists(victim_path) + + +@run_only_on_default_postgres("Only tests index logic") +def test_old_index_time_threshold( + neon_env_builder: NeonEnvBuilder, +): + """ + Exercise pageserver's detection of trying to load an ancient non-latest index. + (see https://github.com/neondatabase/neon/issues/6951) + """ + + # Run with local_fs because we will interfere with mtimes by local filesystem access + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + env = neon_env_builder.init_start() + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + workload = Workload(env, tenant_id, timeline_id) + workload.init() + workload.write_rows(32) + + # Remember generation 1's index path + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + index_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id) + + # Increment generation by detaching+attaching, and write+flush some data to get a new remote index + env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"}) + env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}}) + env.storage_controller.reconcile_until_idle() + workload.churn_rows(32) + + # A new index should have been written + assert env.pageserver_remote_storage.index_path(tenant_id, timeline_id) != index_path + + # Hack the mtime on the generation 1 index + log.info(f"Setting old mtime on {index_path}") + os.utime(index_path, times=(time.time(), time.time() - 30 * 24 * 3600)) + env.pageserver.allowed_errors.extend( + [ + ".*Found a newer index while loading an old one.*", + ".*Index age exceeds threshold and a newer index exists.*", + ] + ) + + # Detach from storage controller + attach in an old generation directly on the pageserver. + workload.stop() + env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"}) + env.storage_controller.reconcile_until_idle() + env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"}) + env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy") + + # The controller would not do this (attach in an old generation): we are doing it to simulate + # a hypothetical profound bug in the controller. + env.pageserver.http_client().tenant_location_conf( + tenant_id, {"generation": 1, "mode": "AttachedSingle", "tenant_conf": {}} + ) + + # The pageserver should react to this situation by refusing to attach the tenant and putting + # it into Broken state + env.pageserver.allowed_errors.append(".*tenant is broken.*") + with pytest.raises( + PageserverApiException, + match="tenant is broken: Index age exceeds threshold and a newer index exists", + ): + env.pageserver.http_client().timeline_detail(tenant_id, timeline_id) From 1280b708f1636034cfe99038faab1ae628dd4b2d Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 13 Nov 2024 20:35:48 -0600 Subject: [PATCH 02/19] Improve error handling for NeonAPI fixture Move error handling to the common request function and add a debug log. Signed-off-by: Tristan Partin --- test_runner/fixtures/neon_api.py | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 89c1f324b4..9de6681beb 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, cast, final import requests +from fixtures.log_helper import log + if TYPE_CHECKING: from typing import Any, Literal, Optional @@ -30,7 +32,11 @@ class NeonAPI: kwargs["headers"] = {} kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}" - return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) + resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) + log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text) + resp.raise_for_status() + + return resp def create_project( self, @@ -66,8 +72,6 @@ class NeonAPI: json=data, ) - assert resp.status_code == 201 - return cast("dict[str, Any]", resp.json()) def get_project_details(self, project_id: str) -> dict[str, Any]: @@ -79,7 +83,7 @@ class NeonAPI: "Content-Type": "application/json", }, ) - assert resp.status_code == 200 + return cast("dict[str, Any]", resp.json()) def delete_project( @@ -95,8 +99,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def start_endpoint( @@ -112,8 +114,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def suspend_endpoint( @@ -129,8 +129,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def restart_endpoint( @@ -146,8 +144,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def create_endpoint( @@ -178,8 +174,6 @@ class NeonAPI: json=data, ) - assert resp.status_code == 201 - return cast("dict[str, Any]", resp.json()) def get_connection_uri( @@ -206,8 +200,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_branches(self, project_id: str) -> dict[str, Any]: @@ -219,8 +211,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_endpoints(self, project_id: str) -> dict[str, Any]: @@ -232,8 +222,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_operations(self, project_id: str) -> dict[str, Any]: @@ -246,8 +234,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def wait_for_operation_to_finish(self, project_id: str): From d06bf4b0fe6865e6bd4fadcc443945df72b6e162 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 14 Nov 2024 13:06:42 +0300 Subject: [PATCH 03/19] safekeeper: fix atomicity of WAL truncation (#9685) If WAL truncation fails in the middle it might leave some data on disk above the write/flush LSN. In theory, concatenated with previous records it might form bogus WAL (though very unlikely in practice because CRC would protect from that). To protect from that, set pending_wal_truncation flag: means before any WAL writes truncation must be retried until it succeeds. We already did that in case of safekeeper restart, now extend this mechanism for failures without restart. Also, importantly, reset LSNs in the beginning of the operation, not in the end, because once on disk deletion starts previous pointers are wrong. All this most likely haven't created any problems in practice because CRC protects from the consequences. Tests for this are hard; simulation infrastructure might be useful here in the future, but not yet. --- safekeeper/src/wal_storage.rs | 66 +++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 11f372bceb..c3bb6cd12c 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -127,23 +127,29 @@ pub struct PhysicalStorage { /// - doesn't point to the end of the segment file: Option, - /// When false, we have just initialized storage using the LSN from find_end_of_wal(). - /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular, - /// there can be a case with unexpected .partial file. + /// When true, WAL truncation potentially has been interrupted and we need + /// to finish it before allowing WAL writes; see truncate_wal for details. + /// In this case [`write_lsn`] can be less than actually written WAL on + /// disk. In particular, there can be a case with unexpected .partial file. /// /// Imagine the following: /// - 000000010000000000000001 - /// - it was fully written, but the last record is split between 2 segments - /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment - /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0 + /// - it was fully written, but the last record is split between 2 + /// segments + /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in + /// the end of this segment + /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were + /// initialized to 0/1FFFFF0 /// - 000000010000000000000002.partial - /// - it has only 1 byte written, which is not enough to make a full WAL record + /// - it has only 1 byte written, which is not enough to make a full WAL + /// record /// - /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal(). - /// This flag will be set to true after the first truncate_wal() call. + /// Partial segment 002 has no WAL records, and it will be removed by the + /// next truncate_wal(). This flag will be set to true after the first + /// truncate_wal() call. /// /// [`write_lsn`]: Self::write_lsn - is_truncated_after_restart: bool, + pending_wal_truncation: bool, } impl PhysicalStorage { @@ -208,7 +214,7 @@ impl PhysicalStorage { flush_record_lsn: flush_lsn, decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000), file: None, - is_truncated_after_restart: false, + pending_wal_truncation: true, }) } @@ -405,6 +411,13 @@ impl Storage for PhysicalStorage { startpos ); } + if self.pending_wal_truncation { + bail!( + "write_wal called with pending WAL truncation, write_lsn={}, startpos={}", + self.write_lsn, + startpos + ); + } let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?; // WAL is written, updating write metrics @@ -479,15 +492,34 @@ impl Storage for PhysicalStorage { ); } - // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on - // disk (this happens on each connect). - if self.is_truncated_after_restart + // Quick exit if nothing to do and we know that the state is clean to + // avoid writing up to 16 MiB of zeros on disk (this happens on each + // connect). + if !self.pending_wal_truncation && end_pos == self.write_lsn && end_pos == self.flush_record_lsn { return Ok(()); } + // Atomicity: we start with LSNs reset because once on disk deletion is + // started it can't be reversed. However, we might crash/error in the + // middle, leaving garbage above the truncation point. In theory, + // concatenated with previous records it might form bogus WAL (though + // very unlikely in practice because CRC would guard from that). To + // protect, set pending_wal_truncation flag before beginning: it means + // truncation must be retried and WAL writes are prohibited until it + // succeeds. Flag is also set on boot because we don't know if the last + // state was clean. + // + // Protocol (HandleElected before first AppendRequest) ensures we'll + // always try to ensure clean truncation before any writes. + self.pending_wal_truncation = true; + + self.write_lsn = end_pos; + self.write_record_lsn = end_pos; + self.flush_record_lsn = end_pos; + // Close previously opened file, if any if let Some(unflushed_file) = self.file.take() { self.fdatasync_file(&unflushed_file).await?; @@ -513,11 +545,7 @@ impl Storage for PhysicalStorage { fs::rename(wal_file_path, wal_file_partial_path).await?; } - // Update LSNs - self.write_lsn = end_pos; - self.write_record_lsn = end_pos; - self.flush_record_lsn = end_pos; - self.is_truncated_after_restart = true; + self.pending_wal_truncation = false; Ok(()) } From 21282aa1134dba24aeb74bcde79a550b5b02f108 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 14 Nov 2024 15:16:43 +0000 Subject: [PATCH 04/19] cargo: use neon branch of rust-postgres (#9757) ## Problem We are pining our fork of rust-postgres to a commit hash and that prevents us from making further changes to it. The latest commit in rust-postgres requires https://github.com/neondatabase/neon/pull/8747, but that seems to have gone stale. I reverted rust-postgres `neon` branch to the pinned commit in https://github.com/neondatabase/rust-postgres/pull/31. ## Summary of changes Switch back to using the `neon` branch of the rust-postgres fork. --- Cargo.lock | 8 ++++---- Cargo.toml | 21 +++++---------------- workspace_hack/Cargo.toml | 4 ++-- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64231ed11c..f6e3f9ddb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4009,7 +4009,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "bytes", "fallible-iterator", @@ -4022,7 +4022,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "base64 0.20.0", "byteorder", @@ -4041,7 +4041,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "bytes", "fallible-iterator", @@ -6227,7 +6227,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 8207726caa..706d742f1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -203,21 +203,10 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed - -# We want to use the 'neon' branch for these, but there's currently one -# incompatible change on the branch. See: -# -# - PR #8076 which contained changes that depended on the new changes in -# the rust-postgres crate, and -# - PR #8654 which reverted those changes and made the code in proxy incompatible -# with the tip of the 'neon' branch again. -# -# When those proxy changes are re-applied (see PR #8747), we can switch using -# the tip of the 'neon' branch again. -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } ## Local libraries compute_api = { version = "0.1", path = "./libs/compute_api/" } @@ -255,7 +244,7 @@ tonic-build = "0.12" [patch.crates-io] # Needed to get `tokio-postgres-rustls` to depend on our fork. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } ################# Binary contents sections diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index ae4018a884..d6773987ea 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -58,7 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } once_cell = { version = "1" } parquet = { version = "53", default-features = false, features = ["zstd"] } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", default-features = false, features = ["with-serde_json-1"] } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] } prost = { version = "0.13", features = ["prost-derive"] } rand = { version = "0.8", features = ["small_rng"] } regex = { version = "1" } @@ -78,7 +78,7 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures tikv-jemalloc-sys = { version = "0.5" } time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } tokio-stream = { version = "0.1", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] } From f70611c8df719a45f23abffdbc5b60a803e4f87e Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 14 Nov 2024 17:19:13 +0200 Subject: [PATCH 05/19] Correctly truncate VM (#9342) ## Problem https://github.com/neondatabase/neon/issues/9240 ## Summary of changes Correctly truncate VM page instead just replacing it with zero page. ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik Co-authored-by: Heikki Linnakangas --- libs/pageserver_api/src/record.rs | 5 ++++ libs/postgres_ffi/src/pg_constants.rs | 7 ++++-- pageserver/src/walingest.rs | 28 +++++++++++++++++---- pageserver/src/walredo/apply_neon.rs | 28 +++++++++++++++++++++ test_runner/regress/test_vm_truncate.py | 33 +++++++++++++++++++++++++ 5 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 test_runner/regress/test_vm_truncate.py diff --git a/libs/pageserver_api/src/record.rs b/libs/pageserver_api/src/record.rs index 5c3f3deb82..bb62b35d36 100644 --- a/libs/pageserver_api/src/record.rs +++ b/libs/pageserver_api/src/record.rs @@ -41,6 +41,11 @@ pub enum NeonWalRecord { file_path: String, content: Option, }, + // Truncate visibility map page + TruncateVisibilityMap { + trunc_byte: usize, + trunc_offs: usize, + }, /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. #[cfg(feature = "testing")] diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 497d011d7a..e343473d77 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -243,8 +243,11 @@ const FSM_LEAF_NODES_PER_PAGE: usize = FSM_NODES_PER_PAGE - FSM_NON_LEAF_NODES_P pub const SLOTS_PER_FSM_PAGE: u32 = FSM_LEAF_NODES_PER_PAGE as u32; /* From visibilitymap.c */ -pub const VM_HEAPBLOCKS_PER_PAGE: u32 = - (BLCKSZ as usize - SIZEOF_PAGE_HEADER_DATA) as u32 * (8 / 2); // MAPSIZE * (BITS_PER_BYTE / BITS_PER_HEAPBLOCK) + +pub const VM_MAPSIZE: usize = BLCKSZ as usize - MAXALIGN_SIZE_OF_PAGE_HEADER_DATA; +pub const VM_BITS_PER_HEAPBLOCK: usize = 2; +pub const VM_HEAPBLOCKS_PER_BYTE: usize = 8 / VM_BITS_PER_HEAPBLOCK; +pub const VM_HEAPBLOCKS_PER_PAGE: usize = VM_MAPSIZE * VM_HEAPBLOCKS_PER_BYTE; /* From origin.c */ pub const REPLICATION_STATE_MAGIC: u32 = 0x1257DADE; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c3ccd8a2e4..84e553f330 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -587,11 +587,29 @@ impl WalIngest { forknum: VISIBILITYMAP_FORKNUM, }; - let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; - if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { - // Tail of last remaining vm page has to be zeroed. - // We are not precise here and instead of digging in VM bitmap format just clear the whole page. - modification.put_rel_page_image_zero(rel, vm_page_no)?; + // last remaining block, byte, and bit + let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32); + let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE + / pg_constants::VM_HEAPBLOCKS_PER_BYTE; + let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE + * pg_constants::VM_BITS_PER_HEAPBLOCK; + + // Unless the new size is exactly at a visibility map page boundary, the + // tail bits in the last remaining map page, representing truncated heap + // blocks, need to be cleared. This is not only tidy, but also necessary + // because we don't get a chance to clear the bits if the heap is extended + // again. + if (trunc_byte != 0 || trunc_offs != 0) + && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no)) + { + modification.put_rel_wal_record( + rel, + vm_page_no, + NeonWalRecord::TruncateVisibilityMap { + trunc_byte, + trunc_offs, + }, + )?; vm_page_no += 1; } let nblocks = get_relsize(modification, rel, ctx).await?; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index 78601d87af..d62e325310 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -42,6 +42,34 @@ pub(crate) fn apply_in_neon( } => { anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); } + // + // Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c` + // + NeonWalRecord::TruncateVisibilityMap { + trunc_byte, + trunc_offs, + } => { + // sanity check that this is modifying the correct relation + let (rel, _) = key.to_rel_block().context("invalid record")?; + assert!( + rel.forknum == VISIBILITYMAP_FORKNUM, + "TruncateVisibilityMap record on unexpected rel {}", + rel + ); + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + map[*trunc_byte + 1..].fill(0u8); + /*---- + * Mask out the unwanted bits of the last remaining byte. + * + * ((1 << 0) - 1) = 00000000 + * ((1 << 1) - 1) = 00000001 + * ... + * ((1 << 6) - 1) = 00111111 + * ((1 << 7) - 1) = 01111111 + *---- + */ + map[*trunc_byte] &= (1 << *trunc_offs) - 1; + } NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno, old_heap_blkno, diff --git a/test_runner/regress/test_vm_truncate.py b/test_runner/regress/test_vm_truncate.py new file mode 100644 index 0000000000..43b4f2d8b1 --- /dev/null +++ b/test_runner/regress/test_vm_truncate.py @@ -0,0 +1,33 @@ +from fixtures.neon_fixtures import NeonEnv + + +# +# Test that VM is properly truncated +# +def test_vm_truncate(neon_simple_env: NeonEnv): + env = neon_simple_env + + endpoint = env.endpoints.create_start("main") + con = endpoint.connect() + cur = con.cursor() + cur.execute("CREATE EXTENSION neon_test_utils") + cur.execute("CREATE EXTENSION pageinspect") + + cur.execute( + "create table t(pk integer primary key, counter integer default 0, filler text default repeat('?', 200))" + ) + cur.execute("insert into t (pk) values (generate_series(1,1000))") + cur.execute("delete from t where pk>10") + cur.execute("vacuum t") # truncates the relation, including its VM and FSM + # get image of the first block of the VM excluding the page header. It's expected + # to still be in the buffer cache. + # ignore page header (24 bytes, 48 - it's hex representation) + cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)") + pg_bitmap = cur.fetchall()[0][0] + # flush shared buffers + cur.execute("SELECT clear_buffer_cache()") + # now download the first block of the VM from the pageserver ... + cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)") + ps_bitmap = cur.fetchall()[0][0] + # and check that content of bitmaps are equal, i.e. PS is producing the same VM page as Postgres + assert pg_bitmap == ps_bitmap From 8cde37bc0beb0db91c0bd15908f1a4b0f7cf6dbd Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Thu, 14 Nov 2024 10:26:58 -0500 Subject: [PATCH 06/19] test: disable test_readonly_node_gc until proper fix (#9755) ## Problem After investigation, we think to make `test_readonly_node_gc` less flaky, we need to make a proper fix (likely involving persisting part of the lease state). See https://github.com/neondatabase/neon/issues/9754 for details. ## Summary of changes - skip the test until proper fix. Signed-off-by: Yuchen Liang --- test_runner/regress/test_readonly_node.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index f257f0853b..826136d5f9 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -122,6 +122,7 @@ def test_readonly_node(neon_simple_env: NeonEnv): ) +@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/9754") def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): """ Test static endpoint is protected from GC by acquiring and renewing lsn leases. From 49b599c1138e2cb35fa87974309dabf189e3bf84 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Thu, 14 Nov 2024 10:59:15 -0600 Subject: [PATCH 07/19] Remove the replication slot in test_snap_files at the end of the test Analysis of the LR benchmarking tests indicates that in the duration of test_subscriber_lag, a leftover 'slotter' replication slot can lead to retained WAL growing on the publisher. This replication slot is not used by any subscriber. The only purpose of the slot is to generate snapshot files for the puspose of test_snap_files. Signed-off-by: Tristan Partin --- .../performance/test_logical_replication.py | 117 +++++++++++------- 1 file changed, 74 insertions(+), 43 deletions(-) diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 050c09c1e5..9d653d1a1e 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -1,6 +1,8 @@ from __future__ import annotations import time +from collections.abc import Iterator +from contextlib import contextmanager from typing import TYPE_CHECKING, cast import psycopg2 @@ -18,7 +20,7 @@ if TYPE_CHECKING: from fixtures.benchmark_fixture import NeonBenchmarker from fixtures.neon_api import NeonApiEndpoint from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres - from psycopg2.extensions import cursor + from psycopg2.extensions import connection, cursor @pytest.mark.timeout(1000) @@ -292,6 +294,48 @@ def test_snap_files( then runs pgbench inserts while generating large numbers of snapfiles. Then restarts the node and tries to peek the replication changes. """ + + @contextmanager + def replication_slot(conn: connection, slot_name: str) -> Iterator[None]: + """ + Make sure that the replication slot doesn't outlive the test. Normally + we wouldn't want this behavior, but since the test creates and drops + the replication slot, we do. + + We've had problems in the past where this slot sticking around caused + issues with the publisher retaining WAL during the execution of the + other benchmarks in this suite. + """ + + def __drop_replication_slot(c: cursor) -> None: + c.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_replication_slots + WHERE slot_name = %(slot_name)s + ) THEN + PERFORM pg_drop_replication_slot(%(slot_name)s); + END IF; + END $$; + """, + {"slot_name": slot_name}, + ) + + with conn.cursor() as c: + __drop_replication_slot(c) + c.execute( + "SELECT pg_create_logical_replication_slot(%(slot_name)s, 'test_decoding')", + {"slot_name": slot_name}, + ) + + yield + + with conn.cursor() as c: + __drop_replication_slot(c) + test_duration_min = 60 test_interval_min = 5 pgbench_duration = f"-T{test_duration_min * 60 * 2}" @@ -314,48 +358,35 @@ def test_snap_files( conn = psycopg2.connect(connstr) conn.autocommit = True - with conn.cursor() as cur: - cur.execute( - """ - DO $$ - BEGIN - IF EXISTS ( - SELECT 1 - FROM pg_replication_slots - WHERE slot_name = 'slotter' - ) THEN - PERFORM pg_drop_replication_slot('slotter'); - END IF; - END $$; - """ + with replication_slot(conn, "slotter"): + workload = pg_bin.run_nonblocking( + ["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env ) - cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + try: + start = time.time() + prev_measurement = time.time() + while time.time() - start < test_duration_min * 60: + conn = psycopg2.connect(connstr) + conn.autocommit = True + + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" + ) + check_pgbench_still_running(workload) + cur.execute( + "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" + ) + + conn.close() + + # Measure storage + if time.time() - prev_measurement > test_interval_min * 60: + storage = benchmark_project_pub.get_synthetic_storage_size() + zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) + prev_measurement = time.time() + time.sleep(test_interval_min * 60 / 3) + finally: + workload.terminate() conn.close() - - workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env) - try: - start = time.time() - prev_measurement = time.time() - while time.time() - start < test_duration_min * 60: - conn = psycopg2.connect(connstr) - conn.autocommit = True - - with conn.cursor() as cur: - cur.execute( - "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" - ) - check_pgbench_still_running(workload) - cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())") - - conn.close() - - # Measure storage - if time.time() - prev_measurement > test_interval_min * 60: - storage = benchmark_project_pub.get_synthetic_storage_size() - zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) - prev_measurement = time.time() - time.sleep(test_interval_min * 60 / 3) - - finally: - workload.terminate() From 93939f123fcd76105543f7a251541712991b5a7a Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 14 Nov 2024 17:31:35 +0000 Subject: [PATCH 08/19] tests: add test_timeline_archival_chaos (#9609) ## Problem - We lack test coverage of cases where multiple timelines fight for updates to the same manifest (https://github.com/neondatabase/neon/pull/9557), and in timeline archival changes while dual-attached (https://github.com/neondatabase/neon/pull/9555) ## Summary of changes - Add a chaos test for timeline creation->archival->offload->deletion --- pageserver/src/http/routes.rs | 1 + pageserver/src/tenant.rs | 4 + .../src/tenant/remote_timeline_client.rs | 40 ++- pageserver/src/tenant/timeline/delete.rs | 44 +-- storage_controller/src/service.rs | 1 + test_runner/fixtures/neon_fixtures.py | 22 +- test_runner/regress/test_timeline_archive.py | 261 +++++++++++++++++- 7 files changed, 340 insertions(+), 33 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index dde9c5dd0b..ab170679ba 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -324,6 +324,7 @@ impl From for ApiError { .into_boxed_str(), ), a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()), + Cancelled => ApiError::ResourceUnavailable("shutting down".into()), Other(e) => ApiError::InternalServerError(e), } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 61bb1fe40c..c6fc3bfe6c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -700,6 +700,9 @@ pub enum DeleteTimelineError { #[error("Timeline deletion is already in progress")] AlreadyInProgress(Arc>), + #[error("Cancelled")] + Cancelled, + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -710,6 +713,7 @@ impl Debug for DeleteTimelineError { Self::NotFound => write!(f, "NotFound"), Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(), Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(), + Self::Cancelled => f.debug_tuple("Cancelled").finish(), Self::Other(e) => f.debug_tuple("Other").field(e).finish(), } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 600583f6b5..94f42c7827 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -243,7 +243,7 @@ use self::index::IndexPart; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; -use super::Generation; +use super::{DeleteTimelineError, Generation}; pub(crate) use download::{ download_index_part, download_tenant_manifest, is_temp_download_file, @@ -1550,15 +1550,17 @@ impl RemoteTimelineClient { /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set. /// The function deletes layer files one by one, then lists the prefix to see if we leaked something /// deletes leaked files if any and proceeds with deletion of index file at the end. - pub(crate) async fn delete_all(self: &Arc) -> anyhow::Result<()> { + pub(crate) async fn delete_all(self: &Arc) -> Result<(), DeleteTimelineError> { debug_assert_current_span_has_tenant_and_timeline_id(); let layers: Vec = { let mut locked = self.upload_queue.lock().unwrap(); - let stopped = locked.stopped_mut()?; + let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?; if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) { - anyhow::bail!("deleted_at is not set") + return Err(DeleteTimelineError::Other(anyhow::anyhow!( + "deleted_at is not set" + ))); } debug_assert!(stopped.upload_queue_for_deletion.no_pending_work()); @@ -1593,7 +1595,10 @@ impl RemoteTimelineClient { }; let layer_deletion_count = layers.len(); - self.deletion_queue_client.push_immediate(layers).await?; + self.deletion_queue_client + .push_immediate(layers) + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Delete the initdb.tar.zst, which is not always present, but deletion attempts of // inexistant objects are not considered errors. @@ -1601,7 +1606,8 @@ impl RemoteTimelineClient { remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id); self.deletion_queue_client .push_immediate(vec![initdb_path]) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage @@ -1609,7 +1615,9 @@ impl RemoteTimelineClient { // Execute all pending deletions, so that when we proceed to do a listing below, we aren't // taking the burden of listing all the layers that we already know we should delete. - self.flush_deletion_queue().await?; + self.flush_deletion_queue() + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; let cancel = shutdown_token(); @@ -1672,28 +1680,32 @@ impl RemoteTimelineClient { if !remaining_layers.is_empty() { self.deletion_queue_client .push_immediate(remaining_layers) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; } fail::fail_point!("timeline-delete-before-index-delete", |_| { - Err(anyhow::anyhow!( + Err(DeleteTimelineError::Other(anyhow::anyhow!( "failpoint: timeline-delete-before-index-delete" - ))? + )))? }); debug!("enqueuing index part deletion"); self.deletion_queue_client .push_immediate([latest_index].to_vec()) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait // for a flush to a persistent deletion list so that we may be sure deletion will occur. - self.flush_deletion_queue().await?; + self.flush_deletion_queue() + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; fail::fail_point!("timeline-delete-after-index-delete", |_| { - Err(anyhow::anyhow!( + Err(DeleteTimelineError::Other(anyhow::anyhow!( "failpoint: timeline-delete-after-index-delete" - ))? + )))? }); info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json"); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 5a4c2d9da3..69001a6c40 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::Context; use pageserver_api::{models::TimelineState, shard::TenantShardId}; +use remote_storage::DownloadError; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, info_span, instrument, Instrument}; use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; @@ -16,7 +17,7 @@ use crate::{ metadata::TimelineMetadata, remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient}, CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant, - TimelineOrOffloaded, + TenantManifestError, TimelineOrOffloaded, }, virtual_file::MaybeFatalIo, }; @@ -110,13 +111,6 @@ pub(super) async fn delete_local_timeline_directory( info!("finished deleting layer files, releasing locks"); } -/// Removes remote layers and an index file after them. -async fn delete_remote_layers_and_index( - remote_client: &Arc, -) -> anyhow::Result<()> { - remote_client.delete_all().await.context("delete_all") -} - /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] async fn remove_maybe_offloaded_timeline_from_tenant( @@ -221,11 +215,24 @@ impl DeleteTimelineFlow { None => { let remote_client = tenant .build_timeline_client(timeline.timeline_id(), tenant.remote_storage.clone()); - let result = remote_client + let result = match remote_client .download_index_file(&tenant.cancel) .instrument(info_span!("download_index_file")) .await - .map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!("error: {:?}", e)))?; + { + Ok(r) => r, + Err(DownloadError::NotFound) => { + // Deletion is already complete + tracing::info!("Timeline already deleted in remote storage"); + return Ok(()); + } + Err(e) => { + return Err(DeleteTimelineError::Other(anyhow::anyhow!( + "error: {:?}", + e + ))); + } + }; let index_part = match result { MaybeDeletedIndexPart::Deleted(p) => { tracing::info!("Timeline already set as deleted in remote index"); @@ -406,7 +413,12 @@ impl DeleteTimelineFlow { "timeline_delete", async move { if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await { - error!("Error: {err:#}"); + // Only log as an error if it's not a cancellation. + if matches!(err, DeleteTimelineError::Cancelled) { + info!("Shutdown during timeline deletion"); + }else { + error!("Error: {err:#}"); + } if let TimelineOrOffloaded::Timeline(timeline) = timeline { timeline.set_broken(format!("{err:#}")) } @@ -438,7 +450,7 @@ impl DeleteTimelineFlow { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? }); - delete_remote_layers_and_index(&remote_client).await?; + remote_client.delete_all().await?; pausable_failpoint!("in_progress_delete"); @@ -449,10 +461,10 @@ impl DeleteTimelineFlow { // So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted. // However, we handle this case in tenant loading code so the next time we attach, the issue is // resolved. - tenant - .store_tenant_manifest() - .await - .map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!(e)))?; + tenant.store_tenant_manifest().await.map_err(|e| match e { + TenantManifestError::Cancelled => DeleteTimelineError::Cancelled, + _ => DeleteTimelineError::Other(e.into()), + })?; *guard = Self::Finished; diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index e3a147bc06..3b85da6665 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3642,6 +3642,7 @@ impl Service { match res { Ok(ok) => Ok(ok), Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT), + Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())), Err(e) => { Err( ApiError::InternalServerError(anyhow::anyhow!( diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 990db1aed0..205a47a9d5 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2379,6 +2379,17 @@ class NeonPageserver(PgProtocol, LogUtils): # # The entries in the list are regular experessions. self.allowed_errors: list[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS) + # Store persistent failpoints that should be reapplied on each start + self._persistent_failpoints: dict[str, str] = {} + + def add_persistent_failpoint(self, name: str, action: str): + """ + Add a failpoint that will be automatically reapplied each time the pageserver starts. + The failpoint will be set immediately if the pageserver is running. + """ + self._persistent_failpoints[name] = action + if self.running: + self.http_client().configure_failpoints([(name, action)]) def timeline_dir( self, @@ -2446,6 +2457,15 @@ class NeonPageserver(PgProtocol, LogUtils): """ assert self.running is False + if self._persistent_failpoints: + # Tests shouldn't use this mechanism _and_ set FAILPOINTS explicitly + assert extra_env_vars is None or "FAILPOINTS" not in extra_env_vars + if extra_env_vars is None: + extra_env_vars = {} + extra_env_vars["FAILPOINTS"] = ",".join( + f"{k}={v}" for (k, v) in self._persistent_failpoints.items() + ) + storage = self.env.pageserver_remote_storage if isinstance(storage, S3Storage): s3_env_vars = storage.access_env_vars() @@ -4522,7 +4542,7 @@ def pytest_addoption(parser: Parser): SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile( - r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)" + r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)" ) diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index d3839e3d2c..c447535e10 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -1,10 +1,14 @@ from __future__ import annotations import json +import random +import threading +import time from typing import Optional import pytest -from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId +import requests +from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, @@ -12,8 +16,9 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty, list_prefix +from fixtures.pg_version import PgVersion from fixtures.remote_storage import S3Storage, s3_storage -from fixtures.utils import wait_until +from fixtures.utils import run_only_on_default_postgres, wait_until from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) @@ -378,6 +383,258 @@ def test_timeline_offload_persist(neon_env_builder: NeonEnvBuilder, delete_timel ) +@run_only_on_default_postgres("this test isn't sensitive to the contents of timelines") +def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): + """ + A general consistency check on archival/offload timeline state, and its intersection + with tenant migrations and timeline deletions. + """ + + # Offloading is off by default at time of writing: remove this line when it's on by default + neon_env_builder.pageserver_config_override = "timeline_offloading = true" + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + + # We will exercise migrations, so need multiple pageservers + neon_env_builder.num_pageservers = 2 + + env = neon_env_builder.init_start( + initial_tenant_conf={ + "compaction_period": "1s", + } + ) + tenant_id = env.initial_tenant + tenant_shard_id = TenantShardId(tenant_id, 0, 0) + + # Unavailable pageservers during timeline CRUD operations can be logged as errors on the storage controller + env.storage_controller.allowed_errors.append(".*error sending request.*") + + for ps in env.pageservers: + # We will do unclean restarts, which results in these messages when cleaning up files + ps.allowed_errors.extend( + [ + ".*removing local file.*because it has unexpected length.*", + ".*__temp.*", + # FIXME: there are still anyhow::Error paths in timeline creation/deletion which + # generate 500 results when called during shutdown + ".*InternalServerError.*", + # FIXME: there are still anyhow::Error paths in timeline deletion that generate + # log lines at error severity + ".*delete_timeline.*Error", + ] + ) + + class TimelineState: + def __init__(self): + self.timeline_id = TimelineId.generate() + self.created = False + self.archived = False + self.offloaded = False + self.deleted = False + + controller_ps_api = env.storage_controller.pageserver_api() + + shutdown = threading.Event() + + violations = [] + + timelines_deleted = [] + + def list_timelines(tenant_id) -> tuple[set[TimelineId], set[TimelineId]]: + """Get the list of active and offloaded TimelineId""" + listing = controller_ps_api.timeline_and_offloaded_list(tenant_id) + active_ids = set([TimelineId(t["timeline_id"]) for t in listing.timelines]) + offloaded_ids = set([TimelineId(t["timeline_id"]) for t in listing.offloaded]) + + return (active_ids, offloaded_ids) + + def timeline_objects(tenant_shard_id, timeline_id): + response = list_prefix( + env.pageserver_remote_storage, # type: ignore + prefix="/".join( + ( + "tenants", + str(tenant_shard_id), + "timelines", + str(timeline_id), + ) + ) + + "/", + ) + + return [k["Key"] for k in response.get("Contents", [])] + + def worker(): + """ + Background thread which drives timeline lifecycle operations, and checks that between steps + it obeys invariants. This should detect errors in pageserver persistence and in errors in + concurrent operations on different timelines when it is run many times in parallel. + """ + state = TimelineState() + + # Jitter worker startup, we're not interested in exercising lots of concurrent creations + # as we know that's I/O bound. + shutdown.wait(random.random() * 10) + + while not shutdown.is_set(): + # A little wait between actions to jitter out the API calls rather than having them + # all queue up at once + shutdown.wait(random.random()) + + try: + if not state.created: + log.info(f"Creating timeline {state.timeline_id}") + controller_ps_api.timeline_create( + PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=state.timeline_id + ) + state.created = True + + if ( + timeline_objects( + tenant_shard_id=tenant_shard_id, timeline_id=state.timeline_id + ) + == [] + ): + msg = f"Timeline {state.timeline_id} unexpectedly not present in remote storage" + violations.append(msg) + + elif state.deleted: + # Try to confirm its deletion completed. + # Deleted timeline should not appear in listing API, either as offloaded or active + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids or state.timeline_id in offloaded_ids: + msg = f"Timeline {state.timeline_id} appeared in listing after deletion was acked" + violations.append(msg) + raise RuntimeError(msg) + + objects = timeline_objects(tenant_shard_id, state.timeline_id) + if len(objects) == 0: + log.info(f"Confirmed deletion of timeline {state.timeline_id}") + timelines_deleted.append(state.timeline_id) + state = TimelineState() # A new timeline ID to create on next iteration + else: + # Deletion of objects doesn't have to be synchronous, we will keep polling + log.info(f"Timeline {state.timeline_id} objects still exist: {objects}") + shutdown.wait(random.random()) + else: + # The main lifetime of a timeline: proceed active->archived->offloaded->deleted + if not state.archived: + log.info(f"Archiving timeline {state.timeline_id}") + controller_ps_api.timeline_archival_config( + tenant_id, state.timeline_id, TimelineArchivalState.ARCHIVED + ) + state.archived = True + elif state.archived and not state.offloaded: + log.info(f"Waiting for offload of timeline {state.timeline_id}") + # Wait for offload: this should happen fast because we configured a short compaction interval + while not shutdown.is_set(): + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids: + log.info(f"Timeline {state.timeline_id} is still active") + shutdown.wait(0.5) + elif state.timeline_id in offloaded_ids: + log.info(f"Timeline {state.timeline_id} is now offloaded") + state.offloaded = True + break + else: + # Timeline is neither offloaded nor active, this is unexpected: the pageserver + # should ensure that the timeline appears in either the offloaded list or main list + msg = f"Timeline {state.timeline_id} disappeared!" + violations.append(msg) + raise RuntimeError(msg) + elif state.offloaded: + # Once it's offloaded it should only be in offloaded or deleted state: check + # it didn't revert back to active. This tests that the manfiest is doing its + # job to suppress loading of offloaded timelines as active. + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids: + msg = f"Timeline {state.timeline_id} is active, should be offloaded or deleted" + violations.append(msg) + raise RuntimeError(msg) + + log.info(f"Deleting timeline {state.timeline_id}") + controller_ps_api.timeline_delete(tenant_id, state.timeline_id) + state.deleted = True + else: + raise RuntimeError("State should be unreachable") + except PageserverApiException as e: + # This is expected: we are injecting chaos, API calls will sometimes fail. + # TODO: can we narrow this to assert we are getting friendly 503s? + log.info(f"Iteration error, will retry: {e}") + shutdown.wait(random.random()) + except requests.exceptions.RetryError as e: + # Retryable error repeated more times than `requests` is configured to tolerate, this + # is expected when a pageserver remains unavailable for a couple seconds + log.info(f"Iteration error, will retry: {e}") + shutdown.wait(random.random()) + except Exception as e: + log.warning( + f"Unexpected worker exception (current timeline {state.timeline_id}): {e}" + ) + else: + # In the non-error case, use a jitterd but small wait, we want to keep + # a high rate of operations going + shutdown.wait(random.random() * 0.1) + + n_workers = 4 + threads = [] + for _i in range(0, n_workers): + t = threading.Thread(target=worker) + t.start() + threads.append(t) + + # Set delay failpoints so that deletions and migrations take some time, and have a good + # chance to interact with other concurrent timeline mutations. + env.storage_controller.configure_failpoints( + [("reconciler-live-migrate-pre-await-lsn", "sleep(1)")] + ) + for ps in env.pageservers: + ps.add_persistent_failpoint("in_progress_delete", "sleep(1)") + + # Generate some chaos, while our workers are trying to complete their timeline operations + rng = random.Random() + try: + chaos_rounds = 48 + for _i in range(0, chaos_rounds): + action = rng.choice([0, 1]) + if action == 0: + # Pick a random pageserver to gracefully restart + pageserver = rng.choice(env.pageservers) + + # Whether to use a graceful shutdown or SIGKILL + immediate = random.choice([True, False]) + log.info(f"Restarting pageserver {pageserver.id}, immediate={immediate}") + + t1 = time.time() + pageserver.restart(immediate=immediate) + restart_duration = time.time() - t1 + + # Make sure we're up for as long as we spent restarting, to ensure operations can make progress + log.info(f"Staying alive for {restart_duration}s") + time.sleep(restart_duration) + else: + # Migrate our tenant between pageservers + origin_ps = env.get_tenant_pageserver(tenant_shard_id) + dest_ps = rng.choice([ps for ps in env.pageservers if ps.id != origin_ps.id]) + log.info(f"Migrating {tenant_shard_id} {origin_ps.id}->{dest_ps.id}") + env.storage_controller.tenant_shard_migrate( + tenant_shard_id=tenant_shard_id, dest_ps_id=dest_ps.id + ) + + log.info(f"Full timeline lifecycles so far: {len(timelines_deleted)}") + finally: + shutdown.set() + + for thread in threads: + thread.join() + + # Sanity check that during our run we did exercise some full timeline lifecycles, in case + # one of our workers got stuck + assert len(timelines_deleted) > 10 + + # That no invariant-violations were reported by workers + assert violations == [] + + @pytest.mark.parametrize("offload_child", ["offload", "offload-corrupt", "archive", None]) def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Optional[str]): """ From 38563de7dd2aa910c4d5564a4ad8c67ab62334e3 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 14 Nov 2024 19:41:10 +0000 Subject: [PATCH 09/19] storcon: exclude non-Active tenants from shard autosplitting (#9743) ## Problem We didn't have a neat way to prevent auto-splitting of tenants. This could be useful during incidents or for testing. Closes https://github.com/neondatabase/neon/issues/9332 ## Summary of changes - Filter splitting candidates by scheduling policy --- storage_controller/src/service.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 3b85da6665..446c476b99 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -6356,6 +6356,19 @@ impl Service { // Pick the biggest tenant to split first top_n.sort_by_key(|i| i.resident_size); + + // Filter out tenants in a prohibiting scheduling mode + { + let locked = self.inner.read().unwrap(); + top_n.retain(|i| { + if let Some(shard) = locked.tenants.get(&i.id) { + matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) + } else { + false + } + }); + } + let Some(split_candidate) = top_n.into_iter().next() else { tracing::debug!("No split-elegible shards found"); return; From 19f7d40c1d89691c9d1d43ed2616e8f1cadc2c19 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 15 Nov 2024 11:41:43 +0100 Subject: [PATCH 10/19] deny.toml: allow CDDL-1.0 license (#9766) #9764, which adds profiling support to Safekeeper, pulls in the dependency [`inferno`](https://crates.io/crates/inferno) via [`pprof-rs`](https://crates.io/crates/pprof). This is licenced under the [Common Development and Distribution License 1.0](https://spdx.org/licenses/CDDL-1.0.html), which is not allowed by `cargo-deny`. This patch allows the CDDL-1.0 license. It is a derivative of the Mozilla Public License, which we already allow, but avoids some issues around European copyright law that the MPL had. As such, I don't expect this to be problematic. --- deny.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/deny.toml b/deny.toml index 327ac58db7..8bf643f4ba 100644 --- a/deny.toml +++ b/deny.toml @@ -37,6 +37,7 @@ allow = [ "BSD-2-Clause", "BSD-3-Clause", "CC0-1.0", + "CDDL-1.0", "ISC", "MIT", "MPL-2.0", From 04938d9d559d6e5968ce7e3b71a5a86ac8f87f57 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 15 Nov 2024 13:22:05 +0000 Subject: [PATCH 11/19] tests: tolerate pageserver 500s in test_timeline_archival_chaos (#9769) ## Problem Test exposes cases where pageserver gives 500 responses, causing failures like https://neon-github-public-dev.s3.amazonaws.com/reports/pr-9766/11844529470/index.html#suites/d1acc79950edeb0563fc86236c620898/3546be2ffed99ba6 ## Summary of changes - Tolerate such messages, and link an issue for cleaning up the pageserver not to return such 500s. --- test_runner/regress/test_timeline_archive.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index c447535e10..83631405ab 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -406,7 +406,13 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): tenant_shard_id = TenantShardId(tenant_id, 0, 0) # Unavailable pageservers during timeline CRUD operations can be logged as errors on the storage controller - env.storage_controller.allowed_errors.append(".*error sending request.*") + env.storage_controller.allowed_errors.extend( + [ + ".*error sending request.*", + # FIXME: the pageserver should not return 500s on cancellation (https://github.com/neondatabase/neon/issues/97680) + ".*InternalServerError(Error deleting timeline .* on .* on .*: pageserver API: error: Cancelled", + ] + ) for ps in env.pageservers: # We will do unclean restarts, which results in these messages when cleaning up files @@ -415,10 +421,10 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): ".*removing local file.*because it has unexpected length.*", ".*__temp.*", # FIXME: there are still anyhow::Error paths in timeline creation/deletion which - # generate 500 results when called during shutdown + # generate 500 results when called during shutdown (https://github.com/neondatabase/neon/issues/9768) ".*InternalServerError.*", # FIXME: there are still anyhow::Error paths in timeline deletion that generate - # log lines at error severity + # log lines at error severity (https://github.com/neondatabase/neon/issues/9768) ".*delete_timeline.*Error", ] ) From 7880c246f1b2b14a8d316fa757df8d2a85894414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 15 Nov 2024 14:22:29 +0100 Subject: [PATCH 12/19] Correct mistakes in offloaded timeline retain_lsn management (#9760) PR #9308 has modified tenant activation code to take offloaded child timelines into account for populating the list of `retain_lsn` values. However, there is more places than just tenant activation where one needs to update the `retain_lsn`s. This PR fixes some bugs of the current code that could lead to corruption in the worst case: 1. Deleting of an offloaded timeline would not get its `retain_lsn` purged from its parent. With the patch we now do it, but as the parent can be offloaded as well, the situatoin is a bit trickier than for non-offloaded timelines which can just keep a pointer to their parent. Here we can't keep a pointer because the parent might get offloaded, then unoffloaded again, creating a dangling pointer situation. Keeping a pointer to the *tenant* is not good either, because we might drop the offloaded timeline in a context where a `offloaded_timelines` lock is already held: so we don't want to acquire a lock in the drop code of OffloadedTimeline. 2. Unoffloading a timeline would not get its `retain_lsn` values populated, leading to it maybe garbage collecting values that its children might need. We now call `initialize_gc_info` on the parent. 3. Offloading of a timeline would not get its `retain_lsn` values registered as offloaded at the parent. So if we drop the `Timeline` object, and its registration is removed, the parent would not have any of the child's `retain_lsn`s around. Also, before, the `Timeline` object would delete anything related to its timeline ID, now it only deletes `retain_lsn`s that have `MaybeOffloaded::No` set. Incorporates Chi's reproducer from #9753. cc https://github.com/neondatabase/cloud/issues/20199 The `test_timeline_retain_lsn` test is extended: 1. it gains a new dimension, duplicating each mode, to either have the "main" branch be the direct parent of the timeline we archive, or the "test_archived_parent" branch intermediary, creating a three timeline structure. This doesn't test anything fixed by this PR in particular, just explores the vast space of possible configurations a little bit more. 2. it gains two new modes, `offload-parent`, which tests the second point, and `offload-no-restart` which tests the third point. It's easy to verify the test actually is "sharp" by removing one of the respective `self.initialize_gc_info()`, `gc_info.insert_child()` or `ancestor_children.push()`. Part of #8088 --------- Signed-off-by: Alex Chi Z Co-authored-by: Alex Chi Z --- pageserver/src/tenant.rs | 152 +++++++++++++++++-- pageserver/src/tenant/timeline.rs | 31 ++-- pageserver/src/tenant/timeline/delete.rs | 3 +- pageserver/src/tenant/timeline/offload.rs | 12 +- test_runner/regress/test_timeline_archive.py | 93 +++++++++--- 5 files changed, 244 insertions(+), 47 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c6fc3bfe6c..909f99ea9d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -39,6 +39,7 @@ use remote_timeline_client::UploadQueueNotReadyError; use std::collections::BTreeMap; use std::fmt; use std::future::Future; +use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; @@ -524,6 +525,9 @@ pub struct OffloadedTimeline { /// Prevent two tasks from deleting the timeline at the same time. If held, the /// timeline is being deleted. If 'true', the timeline has already been deleted. pub delete_progress: TimelineDeleteProgress, + + /// Part of the `OffloadedTimeline` object's lifecycle: this needs to be set before we drop it + pub deleted_from_ancestor: AtomicBool, } impl OffloadedTimeline { @@ -533,9 +537,16 @@ impl OffloadedTimeline { /// the timeline is not in a stopped state. /// Panics if the timeline is not archived. fn from_timeline(timeline: &Timeline) -> Result { - let ancestor_retain_lsn = timeline - .get_ancestor_timeline_id() - .map(|_timeline_id| timeline.get_ancestor_lsn()); + let (ancestor_retain_lsn, ancestor_timeline_id) = + if let Some(ancestor_timeline) = timeline.ancestor_timeline() { + let ancestor_lsn = timeline.get_ancestor_lsn(); + let ancestor_timeline_id = ancestor_timeline.timeline_id; + let mut gc_info = ancestor_timeline.gc_info.write().unwrap(); + gc_info.insert_child(timeline.timeline_id, ancestor_lsn, MaybeOffloaded::Yes); + (Some(ancestor_lsn), Some(ancestor_timeline_id)) + } else { + (None, None) + }; let archived_at = timeline .remote_client .archived_at_stopped_queue()? @@ -543,14 +554,17 @@ impl OffloadedTimeline { Ok(Self { tenant_shard_id: timeline.tenant_shard_id, timeline_id: timeline.timeline_id, - ancestor_timeline_id: timeline.get_ancestor_timeline_id(), + ancestor_timeline_id, ancestor_retain_lsn, archived_at, delete_progress: timeline.delete_progress.clone(), + deleted_from_ancestor: AtomicBool::new(false), }) } fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self { + // We expect to reach this case in tenant loading, where the `retain_lsn` is populated in the parent's `gc_info` + // by the `initialize_gc_info` function. let OffloadedTimelineManifest { timeline_id, ancestor_timeline_id, @@ -564,6 +578,7 @@ impl OffloadedTimeline { ancestor_retain_lsn, archived_at, delete_progress: TimelineDeleteProgress::default(), + deleted_from_ancestor: AtomicBool::new(false), } } fn manifest(&self) -> OffloadedTimelineManifest { @@ -581,6 +596,33 @@ impl OffloadedTimeline { archived_at: *archived_at, } } + /// Delete this timeline's retain_lsn from its ancestor, if present in the given tenant + fn delete_from_ancestor_with_timelines( + &self, + timelines: &std::sync::MutexGuard<'_, HashMap>>, + ) { + if let (Some(_retain_lsn), Some(ancestor_timeline_id)) = + (self.ancestor_retain_lsn, self.ancestor_timeline_id) + { + if let Some((_, ancestor_timeline)) = timelines + .iter() + .find(|(tid, _tl)| **tid == ancestor_timeline_id) + { + ancestor_timeline + .gc_info + .write() + .unwrap() + .remove_child_offloaded(self.timeline_id); + } + } + self.deleted_from_ancestor.store(true, Ordering::Release); + } + /// Call [`Self::delete_from_ancestor_with_timelines`] instead if possible. + /// + /// As the entire tenant is being dropped, don't bother deregistering the `retain_lsn` from the ancestor. + fn defuse_for_tenant_drop(&self) { + self.deleted_from_ancestor.store(true, Ordering::Release); + } } impl fmt::Debug for OffloadedTimeline { @@ -589,6 +631,17 @@ impl fmt::Debug for OffloadedTimeline { } } +impl Drop for OffloadedTimeline { + fn drop(&mut self) { + if !self.deleted_from_ancestor.load(Ordering::Acquire) { + tracing::warn!( + "offloaded timeline {} was dropped without having cleaned it up at the ancestor", + self.timeline_id + ); + } + } +} + #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] pub enum MaybeOffloaded { Yes, @@ -1531,7 +1584,7 @@ impl Tenant { } // Complete deletions for offloaded timeline id's. offloaded_timelines_list - .retain(|(offloaded_id, _offloaded)| { + .retain(|(offloaded_id, offloaded)| { // At this point, offloaded_timeline_ids has the list of all offloaded timelines // without a prefix in S3, so they are inexistent. // In the end, existence of a timeline is finally determined by the existence of an index-part.json in remote storage. @@ -1539,6 +1592,7 @@ impl Tenant { let delete = offloaded_timeline_ids.contains(offloaded_id); if delete { tracing::info!("Removing offloaded timeline {offloaded_id} from manifest as no remote prefix was found"); + offloaded.defuse_for_tenant_drop(); } !delete }); @@ -1927,9 +1981,15 @@ impl Tenant { ))); }; let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap(); - if offloaded_timelines.remove(&timeline_id).is_none() { - warn!("timeline already removed from offloaded timelines"); + match offloaded_timelines.remove(&timeline_id) { + Some(offloaded) => { + offloaded.delete_from_ancestor_with_timelines(&timelines); + } + None => warn!("timeline already removed from offloaded timelines"), } + + self.initialize_gc_info(&timelines, &offloaded_timelines, Some(timeline_id)); + Arc::clone(timeline) }; @@ -2667,7 +2727,7 @@ impl Tenant { .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping())); // Before activation, populate each Timeline's GcInfo with information about its children - self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor); + self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None); // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. @@ -2782,8 +2842,14 @@ impl Tenant { let timeline_id = timeline.timeline_id; let span = tracing::info_span!("timeline_shutdown", %timeline_id, ?shutdown_mode); js.spawn(async move { timeline.shutdown(shutdown_mode).instrument(span).await }); - }) - }; + }); + } + { + let timelines_offloaded = self.timelines_offloaded.lock().unwrap(); + timelines_offloaded.values().for_each(|timeline| { + timeline.defuse_for_tenant_drop(); + }); + } // test_long_timeline_create_then_tenant_delete is leaning on this message tracing::info!("Waiting for timelines..."); while let Some(res) = js.join_next().await { @@ -3767,10 +3833,13 @@ impl Tenant { &self, timelines: &std::sync::MutexGuard>>, timelines_offloaded: &std::sync::MutexGuard>>, + restrict_to_timeline: Option, ) { - // This function must be called before activation: after activation timeline create/delete operations - // might happen, and this function is not safe to run concurrently with those. - assert!(!self.is_active()); + if restrict_to_timeline.is_none() { + // This function must be called before activation: after activation timeline create/delete operations + // might happen, and this function is not safe to run concurrently with those. + assert!(!self.is_active()); + } // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. @@ -3803,7 +3872,12 @@ impl Tenant { let horizon = self.get_gc_horizon(); // Populate each timeline's GcInfo with information about its child branches - for timeline in timelines.values() { + let timelines_to_write = if let Some(timeline_id) = restrict_to_timeline { + itertools::Either::Left(timelines.get(&timeline_id).into_iter()) + } else { + itertools::Either::Right(timelines.values()) + }; + for timeline in timelines_to_write { let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints .remove(&timeline.timeline_id) .unwrap_or_default(); @@ -9650,4 +9724,54 @@ mod tests { Ok(()) } + + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_timeline_offload_retain_lsn() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_timeline_offload_retain_lsn") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + let tline_parent = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + let tline_child = tenant + .branch_timeline_test(&tline_parent, NEW_TIMELINE_ID, Some(Lsn(0x20)), &ctx) + .await + .unwrap(); + { + let gc_info_parent = tline_parent.gc_info.read().unwrap(); + assert_eq!( + gc_info_parent.retain_lsns, + vec![(Lsn(0x20), tline_child.timeline_id, MaybeOffloaded::No)] + ); + } + // We have to directly call the remote_client instead of using the archive function to avoid constructing broker client... + tline_child + .remote_client + .schedule_index_upload_for_timeline_archival_state(TimelineArchivalState::Archived) + .unwrap(); + tline_child.remote_client.wait_completion().await.unwrap(); + offload_timeline(&tenant, &tline_child) + .instrument(tracing::info_span!(parent: None, "offload_test", tenant_id=%"test", shard_id=%"test", timeline_id=%"test")) + .await.unwrap(); + let child_timeline_id = tline_child.timeline_id; + Arc::try_unwrap(tline_child).unwrap(); + + { + let gc_info_parent = tline_parent.gc_info.read().unwrap(); + assert_eq!( + gc_info_parent.retain_lsns, + vec![(Lsn(0x20), child_timeline_id, MaybeOffloaded::Yes)] + ); + } + + tenant + .get_offloaded_timeline(child_timeline_id) + .unwrap() + .defuse_for_tenant_drop(); + + Ok(()) + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 09ddb19765..2bc14ec317 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -477,8 +477,21 @@ impl GcInfo { self.retain_lsns.sort_by_key(|i| i.0); } - pub(super) fn remove_child(&mut self, child_id: TimelineId) { - self.retain_lsns.retain(|i| i.1 != child_id); + pub(super) fn remove_child_maybe_offloaded( + &mut self, + child_id: TimelineId, + maybe_offloaded: MaybeOffloaded, + ) { + self.retain_lsns + .retain(|i| !(i.1 == child_id && i.2 == maybe_offloaded)); + } + + pub(super) fn remove_child_not_offloaded(&mut self, child_id: TimelineId) { + self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::No); + } + + pub(super) fn remove_child_offloaded(&mut self, child_id: TimelineId) { + self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes); } } @@ -4501,7 +4514,7 @@ impl Drop for Timeline { // This lock should never be poisoned, but in case it is we do a .map() instead of // an unwrap(), to avoid panicking in a destructor and thereby aborting the process. if let Ok(mut gc_info) = ancestor.gc_info.write() { - gc_info.remove_child(self.timeline_id) + gc_info.remove_child_not_offloaded(self.timeline_id) } } } @@ -5030,7 +5043,7 @@ impl Timeline { // 1. Is it newer than GC horizon cutoff point? if l.get_lsn_range().end > space_cutoff { - debug!( + info!( "keeping {} because it's newer than space_cutoff {}", l.layer_name(), space_cutoff, @@ -5041,7 +5054,7 @@ impl Timeline { // 2. It is newer than PiTR cutoff point? if l.get_lsn_range().end > time_cutoff { - debug!( + info!( "keeping {} because it's newer than time_cutoff {}", l.layer_name(), time_cutoff, @@ -5060,7 +5073,7 @@ impl Timeline { for retain_lsn in &retain_lsns { // start_lsn is inclusive if &l.get_lsn_range().start <= retain_lsn { - debug!( + info!( "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", l.layer_name(), retain_lsn, @@ -5075,7 +5088,7 @@ impl Timeline { if let Some(lsn) = &max_lsn_with_valid_lease { // keep if layer start <= any of the lease if &l.get_lsn_range().start <= lsn { - debug!( + info!( "keeping {} because there is a valid lease preventing GC at {}", l.layer_name(), lsn, @@ -5107,13 +5120,13 @@ impl Timeline { if !layers .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) { - debug!("keeping {} because it is the latest layer", l.layer_name()); + info!("keeping {} because it is the latest layer", l.layer_name()); result.layers_not_updated += 1; continue 'outer; } // We didn't find any reason to keep this file, so remove it. - debug!( + info!( "garbage collecting {} is_dropped: xx is_incremental: {}", l.layer_name(), l.is_incremental(), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 69001a6c40..13a8dfa51a 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -141,9 +141,10 @@ async fn remove_maybe_offloaded_timeline_from_tenant( ); } TimelineOrOffloaded::Offloaded(timeline) => { - timelines_offloaded + let offloaded_timeline = timelines_offloaded .remove(&timeline.timeline_id) .expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map"); + offloaded_timeline.delete_from_ancestor_with_timelines(&timelines); } } diff --git a/pageserver/src/tenant/timeline/offload.rs b/pageserver/src/tenant/timeline/offload.rs index 1394843467..3595d743bc 100644 --- a/pageserver/src/tenant/timeline/offload.rs +++ b/pageserver/src/tenant/timeline/offload.rs @@ -66,7 +66,7 @@ pub(crate) async fn offload_timeline( let conf = &tenant.conf; delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await; - remove_timeline_from_tenant(tenant, &timeline, &guard); + let remaining_refcount = remove_timeline_from_tenant(tenant, &timeline, &guard); { let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap(); @@ -87,16 +87,20 @@ pub(crate) async fn offload_timeline( // not our actual state of offloaded timelines. tenant.store_tenant_manifest().await?; + tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})"); + Ok(()) } /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] +/// +/// Returns the strong count of the timeline `Arc` fn remove_timeline_from_tenant( tenant: &Tenant, timeline: &Timeline, _: &DeletionGuard, // using it as a witness -) { +) -> usize { // Remove the timeline from the map. let mut timelines = tenant.timelines.lock().unwrap(); let children_exist = timelines @@ -109,7 +113,9 @@ fn remove_timeline_from_tenant( panic!("Timeline grew children while we removed layer files"); } - timelines + let timeline = timelines .remove(&timeline.timeline_id) .expect("timeline that we were deleting was concurrently removed from 'timelines' map"); + + Arc::strong_count(&timeline) } diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index 83631405ab..ba4e79c343 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -15,13 +15,19 @@ from fixtures.neon_fixtures import ( last_flush_lsn_upload, ) from fixtures.pageserver.http import PageserverApiException -from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty, list_prefix +from fixtures.pageserver.utils import ( + assert_prefix_empty, + assert_prefix_not_empty, + list_prefix, + wait_until_tenant_active, +) from fixtures.pg_version import PgVersion from fixtures.remote_storage import S3Storage, s3_storage from fixtures.utils import run_only_on_default_postgres, wait_until from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) +from psycopg2.errors import IoError, UndefinedTable @pytest.mark.parametrize("shard_count", [0, 4]) @@ -641,8 +647,21 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): assert violations == [] -@pytest.mark.parametrize("offload_child", ["offload", "offload-corrupt", "archive", None]) -def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Optional[str]): +@pytest.mark.parametrize("with_intermediary", [False, True]) +@pytest.mark.parametrize( + "offload_child", + [ + "offload", + "offload-corrupt", + "offload-no-restart", + "offload-parent", + "archive", + None, + ], +) +def test_timeline_retain_lsn( + neon_env_builder: NeonEnvBuilder, with_intermediary: bool, offload_child: Optional[str] +): """ Ensure that retain_lsn functionality for timelines works, both for offloaded and non-offloaded ones """ @@ -650,6 +669,7 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op # Our corruption code only works with S3 compatible storage neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.rust_log_override = "info,[gc_timeline]=debug" env = neon_env_builder.init_start() ps_http = env.pageserver.http_client() @@ -657,22 +677,30 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op tenant_id, root_timeline_id = env.create_tenant( conf={ # small checkpointing and compaction targets to ensure we generate many upload operations - "checkpoint_distance": 128 * 1024, + "checkpoint_distance": 32 * 1024, "compaction_threshold": 1, - "compaction_target_size": 128 * 1024, + "compaction_target_size": 32 * 1024, # set small image creation thresholds so that gc deletes data - "image_creation_threshold": 2, + "image_creation_threshold": 1, # disable background compaction and GC. We invoke it manually when we want it to happen. "gc_period": "0s", "compaction_period": "0s", # Disable pitr, we only want the latest lsn "pitr_interval": "0s", + "gc_horizon": 0, # Don't rely on endpoint lsn leases "lsn_lease_length": "0s", } ) - with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + if with_intermediary: + parent_branch_name = "test_archived_parent" + parent_timeline_id = env.create_branch("test_archived_parent", tenant_id) + else: + parent_branch_name = "main" + parent_timeline_id = root_timeline_id + + with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint: endpoint.safe_psql_many( [ "CREATE TABLE foo(v int, key serial primary key, t text default 'data_content')", @@ -682,14 +710,16 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op ) pre_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") log.info(f"Pre branch sum: {pre_branch_sum}") - last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) + last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id) # Create a branch and write some additional data to the parent - child_timeline_id = env.create_branch("test_archived_branch", tenant_id) + child_timeline_id = env.create_branch( + "test_archived_branch", tenant_id, ancestor_branch_name=parent_branch_name + ) - with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: - # Do some churn of the data. This is important so that we can overwrite image layers. - for i in range(10): + with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint: + # Do some overwriting churn with compactions in between. This is important so that we can overwrite image layers. + for i in range(5): endpoint.safe_psql_many( [ f"SELECT setseed(0.23{i})", @@ -698,9 +728,9 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op "UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 0", ] ) + last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id) post_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") log.info(f"Post branch sum: {post_branch_sum}") - last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) if offload_child is not None: ps_http.timeline_archival_config( @@ -715,9 +745,19 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op assert leaf_detail["is_archived"] is True if "offload" in offload_child: ps_http.timeline_offload(tenant_id, child_timeline_id) + if "offload-parent" in offload_child: + # Also offload the parent to ensure the retain_lsn of the child + # is entered in the parent at unoffloading + ps_http.timeline_archival_config( + tenant_id, + parent_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + ps_http.timeline_offload(tenant_id, parent_timeline_id) # Do a restart to get rid of any in-memory objects (we only init gc info once, at attach) - env.pageserver.stop() + if offload_child is None or "no-restart" not in offload_child: + env.pageserver.stop() if offload_child == "offload-corrupt": assert isinstance(env.pageserver_remote_storage, S3Storage) listing = list_prefix( @@ -752,13 +792,21 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op ".*page_service_conn_main.*could not find data for key.*", ] ) - env.pageserver.start() + if offload_child is None or "no-restart" not in offload_child: + env.pageserver.start() + if offload_child == "offload-parent": + wait_until_tenant_active(ps_http, tenant_id=tenant_id) + ps_http.timeline_archival_config( + tenant_id, + parent_timeline_id, + state=TimelineArchivalState.UNARCHIVED, + ) # Do an agressive gc and compaction of the parent branch - ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=root_timeline_id, gc_horizon=0) + ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=parent_timeline_id, gc_horizon=0) ps_http.timeline_checkpoint( tenant_id, - root_timeline_id, + parent_timeline_id, force_l0_compaction=True, force_repartition=True, wait_until_uploaded=True, @@ -774,10 +822,15 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op # Now, after unarchival, the child timeline should still have its data accessible (or corrupted) if offload_child == "offload-corrupt": - with pytest.raises(RuntimeError, match=".*failed to get basebackup.*"): - env.endpoints.create_start( + if with_intermediary: + error_regex = "(.*could not read .* from page server.*|.*relation .* does not exist)" + else: + error_regex = ".*failed to get basebackup.*" + with pytest.raises((RuntimeError, IoError, UndefinedTable), match=error_regex): + with env.endpoints.create_start( "test_archived_branch", tenant_id=tenant_id, basebackup_request_tries=1 - ) + ) as endpoint: + endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") else: with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint: sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") From e12628fe936769217dd28f537e834d10d5901289 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Fri, 15 Nov 2024 17:42:41 +0000 Subject: [PATCH 13/19] Collect max_connections metric (#9770) This will further allow us to expose this metric to users --- compute/etc/neon_collector.jsonnet | 1 + .../etc/sql_exporter/compute_max_connections.libsonnet | 10 ++++++++++ compute/etc/sql_exporter/compute_max_connections.sql | 1 + 3 files changed, 12 insertions(+) create mode 100644 compute/etc/sql_exporter/compute_max_connections.libsonnet create mode 100644 compute/etc/sql_exporter/compute_max_connections.sql diff --git a/compute/etc/neon_collector.jsonnet b/compute/etc/neon_collector.jsonnet index c6fa645b41..75d69c7b68 100644 --- a/compute/etc/neon_collector.jsonnet +++ b/compute/etc/neon_collector.jsonnet @@ -6,6 +6,7 @@ import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet', import 'sql_exporter/compute_current_lsn.libsonnet', import 'sql_exporter/compute_logical_snapshot_files.libsonnet', + import 'sql_exporter/compute_max_connections.libsonnet', import 'sql_exporter/compute_receive_lsn.libsonnet', import 'sql_exporter/compute_subscriptions_count.libsonnet', import 'sql_exporter/connection_counts.libsonnet', diff --git a/compute/etc/sql_exporter/compute_max_connections.libsonnet b/compute/etc/sql_exporter/compute_max_connections.libsonnet new file mode 100644 index 0000000000..69cfa1f19c --- /dev/null +++ b/compute/etc/sql_exporter/compute_max_connections.libsonnet @@ -0,0 +1,10 @@ +{ + metric_name: 'compute_max_connections', + type: 'gauge', + help: 'Max connections allowed for Postgres', + key_labels: null, + values: [ + 'max_connections', + ], + query: importstr 'sql_exporter/compute_max_connections.sql', +} diff --git a/compute/etc/sql_exporter/compute_max_connections.sql b/compute/etc/sql_exporter/compute_max_connections.sql new file mode 100644 index 0000000000..99a49483a6 --- /dev/null +++ b/compute/etc/sql_exporter/compute_max_connections.sql @@ -0,0 +1 @@ +SELECT current_setting('max_connections') as max_connections; From 2af791ba83ee402c2ebb9b1f436f9edb9351d468 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 15 Nov 2024 20:34:48 +0000 Subject: [PATCH 14/19] wal_decoder: make InterpretedWalRecord serde (#9775) ## Problem We want to serialize interpreted records to send them over the wire from safekeeper to pageserver. ## Summary of changes Make `InterpretedWalRecord` ser/de. This is a temporary change to get the bulk of the lift merged in https://github.com/neondatabase/neon/pull/9746. For going to prod, we don't want to use bincode since we can't evolve the schema. Questions on serialization will be tackled separately. --- libs/pageserver_api/src/key.rs | 2 +- libs/pageserver_api/src/reltag.rs | 2 +- libs/postgres_ffi/src/walrecord.rs | 16 ++++++------- libs/wal_decoder/src/models.rs | 29 ++++++++++++++++++++++++ libs/wal_decoder/src/serialized_batch.rs | 5 ++++ 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index b3fcaae62f..4505101ea6 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -24,7 +24,7 @@ pub struct Key { /// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as /// a struct of fields. -#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)] +#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] pub struct CompactKey(i128); /// The storage key size. diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 010a9c2932..09d1fae221 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -24,7 +24,7 @@ use postgres_ffi::Oid; // FIXME: should move 'forknum' as last field to keep this consistent with Postgres. // Then we could replace the custom Ord and PartialOrd implementations below with // deriving them. This will require changes in walredoproc.c. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] pub struct RelTag { pub forknum: u8, pub spcnode: Oid, diff --git a/libs/postgres_ffi/src/walrecord.rs b/libs/postgres_ffi/src/walrecord.rs index dedbaef64d..b32106632a 100644 --- a/libs/postgres_ffi/src/walrecord.rs +++ b/libs/postgres_ffi/src/walrecord.rs @@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError; use utils::lsn::Lsn; #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlMultiXactCreate { pub mid: MultiXactId, /* new MultiXact's ID */ @@ -46,7 +46,7 @@ impl XlMultiXactCreate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlMultiXactTruncate { pub oldest_multi_db: Oid, /* to-be-truncated range of multixact offsets */ @@ -72,7 +72,7 @@ impl XlMultiXactTruncate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlRelmapUpdate { pub dbid: Oid, /* database ID, or 0 for shared map */ pub tsid: Oid, /* database's tablespace, or pg_global */ @@ -90,7 +90,7 @@ impl XlRelmapUpdate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlReploriginDrop { pub node_id: RepOriginId, } @@ -104,7 +104,7 @@ impl XlReploriginDrop { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlReploriginSet { pub remote_lsn: Lsn, pub node_id: RepOriginId, @@ -120,7 +120,7 @@ impl XlReploriginSet { } #[repr(C)] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct RelFileNode { pub spcnode: Oid, /* tablespace */ pub dbnode: Oid, /* database */ @@ -911,7 +911,7 @@ impl XlSmgrCreate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlSmgrTruncate { pub blkno: BlockNumber, pub rnode: RelFileNode, @@ -984,7 +984,7 @@ impl XlDropDatabase { /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same /// struct for commits and aborts. /// -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlXactParsedRecord { pub xid: TransactionId, pub info: u8, diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 5d90eeb69c..88371fe51e 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -32,16 +32,19 @@ use postgres_ffi::walrecord::{ XlSmgrTruncate, XlXactParsedRecord, }; use postgres_ffi::{Oid, TransactionId}; +use serde::{Deserialize, Serialize}; use utils::lsn::Lsn; use crate::serialized_batch::SerializedValueBatch; +#[derive(Serialize, Deserialize)] pub enum FlushUncommittedRecords { Yes, No, } /// An interpreted Postgres WAL record, ready to be handled by the pageserver +#[derive(Serialize, Deserialize)] pub struct InterpretedWalRecord { /// Optional metadata record - may cause writes to metadata keys /// in the storage engine @@ -62,6 +65,7 @@ pub struct InterpretedWalRecord { /// The interpreted part of the Postgres WAL record which requires metadata /// writes to the underlying storage engine. +#[derive(Serialize, Deserialize)] pub enum MetadataRecord { Heapam(HeapamRecord), Neonrmgr(NeonrmgrRecord), @@ -77,10 +81,12 @@ pub enum MetadataRecord { Replorigin(ReploriginRecord), } +#[derive(Serialize, Deserialize)] pub enum HeapamRecord { ClearVmBits(ClearVmBits), } +#[derive(Serialize, Deserialize)] pub struct ClearVmBits { pub new_heap_blkno: Option, pub old_heap_blkno: Option, @@ -88,24 +94,29 @@ pub struct ClearVmBits { pub flags: u8, } +#[derive(Serialize, Deserialize)] pub enum NeonrmgrRecord { ClearVmBits(ClearVmBits), } +#[derive(Serialize, Deserialize)] pub enum SmgrRecord { Create(SmgrCreate), Truncate(XlSmgrTruncate), } +#[derive(Serialize, Deserialize)] pub struct SmgrCreate { pub rel: RelTag, } +#[derive(Serialize, Deserialize)] pub enum DbaseRecord { Create(DbaseCreate), Drop(DbaseDrop), } +#[derive(Serialize, Deserialize)] pub struct DbaseCreate { pub db_id: Oid, pub tablespace_id: Oid, @@ -113,27 +124,32 @@ pub struct DbaseCreate { pub src_tablespace_id: Oid, } +#[derive(Serialize, Deserialize)] pub struct DbaseDrop { pub db_id: Oid, pub tablespace_ids: Vec, } +#[derive(Serialize, Deserialize)] pub enum ClogRecord { ZeroPage(ClogZeroPage), Truncate(ClogTruncate), } +#[derive(Serialize, Deserialize)] pub struct ClogZeroPage { pub segno: u32, pub rpageno: u32, } +#[derive(Serialize, Deserialize)] pub struct ClogTruncate { pub pageno: u32, pub oldest_xid: TransactionId, pub oldest_xid_db: Oid, } +#[derive(Serialize, Deserialize)] pub enum XactRecord { Commit(XactCommon), Abort(XactCommon), @@ -142,6 +158,7 @@ pub enum XactRecord { Prepare(XactPrepare), } +#[derive(Serialize, Deserialize)] pub struct XactCommon { pub parsed: XlXactParsedRecord, pub origin_id: u16, @@ -150,61 +167,73 @@ pub struct XactCommon { pub lsn: Lsn, } +#[derive(Serialize, Deserialize)] pub struct XactPrepare { pub xl_xid: TransactionId, pub data: Bytes, } +#[derive(Serialize, Deserialize)] pub enum MultiXactRecord { ZeroPage(MultiXactZeroPage), Create(XlMultiXactCreate), Truncate(XlMultiXactTruncate), } +#[derive(Serialize, Deserialize)] pub struct MultiXactZeroPage { pub slru_kind: SlruKind, pub segno: u32, pub rpageno: u32, } +#[derive(Serialize, Deserialize)] pub enum RelmapRecord { Update(RelmapUpdate), } +#[derive(Serialize, Deserialize)] pub struct RelmapUpdate { pub update: XlRelmapUpdate, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum XlogRecord { Raw(RawXlogRecord), } +#[derive(Serialize, Deserialize)] pub struct RawXlogRecord { pub info: u8, pub lsn: Lsn, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum LogicalMessageRecord { Put(PutLogicalMessage), #[cfg(feature = "testing")] Failpoint, } +#[derive(Serialize, Deserialize)] pub struct PutLogicalMessage { pub path: String, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum StandbyRecord { RunningXacts(StandbyRunningXacts), } +#[derive(Serialize, Deserialize)] pub struct StandbyRunningXacts { pub oldest_running_xid: TransactionId, } +#[derive(Serialize, Deserialize)] pub enum ReploriginRecord { Set(XlReploriginSet), Drop(XlReploriginDrop), diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 8f33291023..632603cc8b 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -16,6 +16,7 @@ use pageserver_api::shard::ShardIdentity; use pageserver_api::{key::CompactKey, value::Value}; use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord}; use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; +use serde::{Deserialize, Serialize}; use utils::bin_ser::BeSer; use utils::lsn::Lsn; @@ -29,6 +30,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); /// relation sizes. In the case of "observed" values, we only need to know /// the key and LSN, so two types of metadata are supported to save on network /// bandwidth. +#[derive(Serialize, Deserialize)] pub enum ValueMeta { Serialized(SerializedValueMeta), Observed(ObservedValueMeta), @@ -75,6 +77,7 @@ impl PartialEq for OrderedValueMeta { impl Eq for OrderedValueMeta {} /// Metadata for a [`Value`] serialized into the batch. +#[derive(Serialize, Deserialize)] pub struct SerializedValueMeta { pub key: CompactKey, pub lsn: Lsn, @@ -86,12 +89,14 @@ pub struct SerializedValueMeta { } /// Metadata for a [`Value`] observed by the batch +#[derive(Serialize, Deserialize)] pub struct ObservedValueMeta { pub key: CompactKey, pub lsn: Lsn, } /// Batch of serialized [`Value`]s. +#[derive(Serialize, Deserialize)] pub struct SerializedValueBatch { /// [`Value`]s serialized in EphemeralFile's native format, /// ready for disk write by the pageserver From 23eabb9919a0457253455bfa844b1c6503b3f8d6 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Fri, 15 Nov 2024 15:17:23 -0600 Subject: [PATCH 15/19] Fix PG_MAJORVERSION_NUM typo In ea32f1d0a36a4d77c1181d623f14a91f2a06d6dd, Matthias added a feature to our extension to expose more granular wait events. However, due to the typo, those wait events were never registered, so we used the more generic wait events instead. Signed-off-by: Tristan Partin --- pgxn/neon/neon.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index dc87d79e87..f207ed61f9 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -512,7 +512,7 @@ neon_shmem_startup_hook(void) if (prev_shmem_startup_hook) prev_shmem_startup_hook(); -#if PG_PG_MAJORVERSION_NUM >= 17 +#if PG_MAJORVERSION_NUM >= 17 WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance"); WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read"); WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate"); From ac689ab01406a797f5347a6799d809f375468d52 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 15 Nov 2024 21:53:11 +0000 Subject: [PATCH 16/19] wal_decoder: rename end_lsn to next_record_lsn (#9776) ## Problem It turns out that `WalStreamDecoder::poll_decode` returns the start LSN of the next record and not the end LSN of the current record. They are not always equal. For example, they're not equal when the record in question is an XLOG SWITCH record. ## Summary of changes Rename things to reflect that. --- libs/wal_decoder/src/decoder.rs | 18 +++++++++++------- libs/wal_decoder/src/models.rs | 6 ++++-- libs/wal_decoder/src/serialized_batch.rs | 18 +++++++++++------- .../walreceiver/walreceiver_connection.rs | 12 ++++++------ pageserver/src/walingest.rs | 2 +- 5 files changed, 33 insertions(+), 23 deletions(-) diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 684718d220..1895f25bfc 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -19,7 +19,7 @@ impl InterpretedWalRecord { pub fn from_bytes_filtered( buf: Bytes, shard: &ShardIdentity, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { let mut decoded = DecodedWALRecord::default(); @@ -32,18 +32,18 @@ impl InterpretedWalRecord { FlushUncommittedRecords::No }; - let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?; + let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?; let batch = SerializedValueBatch::from_decoded_filtered( decoded, shard, - record_end_lsn, + next_record_lsn, pg_version, )?; Ok(InterpretedWalRecord { metadata_record, batch, - end_lsn: record_end_lsn, + next_record_lsn, flush_uncommitted, xid, }) @@ -53,7 +53,7 @@ impl InterpretedWalRecord { impl MetadataRecord { fn from_decoded( decoded: &DecodedWALRecord, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result> { // Note: this doesn't actually copy the bytes since @@ -74,7 +74,9 @@ impl MetadataRecord { Ok(None) } pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version), - pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn), + pg_constants::RM_XACT_ID => { + Self::decode_xact_record(&mut buf, decoded, next_record_lsn) + } pg_constants::RM_MULTIXACT_ID => { Self::decode_multixact_record(&mut buf, decoded, pg_version) } @@ -86,7 +88,9 @@ impl MetadataRecord { // // Alternatively, one can make the checkpoint part of the subscription protocol // to the pageserver. This should work fine, but can be done at a later point. - pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn), + pg_constants::RM_XLOG_ID => { + Self::decode_xlog_record(&mut buf, decoded, next_record_lsn) + } pg_constants::RM_LOGICALMSG_ID => { Self::decode_logical_message_record(&mut buf, decoded) } diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 88371fe51e..c69f8c869a 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -52,8 +52,10 @@ pub struct InterpretedWalRecord { /// A pre-serialized batch along with the required metadata for ingestion /// by the pageserver pub batch: SerializedValueBatch, - /// Byte offset within WAL for the end of the original PG WAL record - pub end_lsn: Lsn, + /// Byte offset within WAL for the start of the next PG WAL record. + /// Usually this is the end LSN of the current record, but in case of + /// XLOG SWITCH records it will be within the next segment. + pub next_record_lsn: Lsn, /// Whether to flush all uncommitted modifications to the storage engine /// before ingesting this record. This is currently only used for legacy PG /// database creations which read pages from a template database. Such WAL diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 632603cc8b..9c0708ebbe 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -137,7 +137,7 @@ impl SerializedValueBatch { pub(crate) fn from_decoded_filtered( decoded: DecodedWALRecord, shard: &ShardIdentity, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { // First determine how big the buffer needs to be and allocate it up-front. @@ -161,13 +161,17 @@ impl SerializedValueBatch { let key = rel_block_to_key(rel, blk.blkno); if !key.is_valid_key_on_write_path() { - anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key); + anyhow::bail!( + "Unsupported key decoded at LSN {}: {}", + next_record_lsn, + key + ); } let key_is_local = shard.is_key_local(&key); tracing::debug!( - lsn=%record_end_lsn, + lsn=%next_record_lsn, key=%key, "ingest: shard decision {}", if !key_is_local { "drop" } else { "keep" }, @@ -179,7 +183,7 @@ impl SerializedValueBatch { // its blkno in case it implicitly extends a relation. metadata.push(ValueMeta::Observed(ObservedValueMeta { key: key.to_compact(), - lsn: record_end_lsn, + lsn: next_record_lsn, })) } @@ -210,7 +214,7 @@ impl SerializedValueBatch { // that would corrupt the page. // if !page_is_new(&image) { - page_set_lsn(&mut image, record_end_lsn) + page_set_lsn(&mut image, next_record_lsn) } assert_eq!(image.len(), BLCKSZ as usize); @@ -229,12 +233,12 @@ impl SerializedValueBatch { metadata.push(ValueMeta::Serialized(SerializedValueMeta { key: key.to_compact(), - lsn: record_end_lsn, + lsn: next_record_lsn, batch_offset: relative_off, len: val_ser_size, will_init: val.will_init(), })); - max_lsn = std::cmp::max(max_lsn, record_end_lsn); + max_lsn = std::cmp::max(max_lsn, next_record_lsn); len += 1; } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 34bf959058..6ac6920d47 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -331,11 +331,11 @@ pub(super) async fn handle_walreceiver_connection( Ok(()) } - while let Some((record_end_lsn, recdata)) = waldecoder.poll_decode()? { + while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. - if !record_end_lsn.is_aligned() { + if !next_record_lsn.is_aligned() { return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); } @@ -343,7 +343,7 @@ pub(super) async fn handle_walreceiver_connection( let interpreted = InterpretedWalRecord::from_bytes_filtered( recdata, modification.tline.get_shard_identity(), - record_end_lsn, + next_record_lsn, modification.tline.pg_version, )?; @@ -367,10 +367,10 @@ pub(super) async fn handle_walreceiver_connection( .ingest_record(interpreted, &mut modification, &ctx) .await .with_context(|| { - format!("could not ingest record at {record_end_lsn}") + format!("could not ingest record at {next_record_lsn}") })?; if !ingested { - tracing::debug!("ingest: filtered out record @ LSN {record_end_lsn}"); + tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}"); WAL_INGEST.records_filtered.inc(); filtered_records += 1; } @@ -380,7 +380,7 @@ pub(super) async fn handle_walreceiver_connection( // to timeout the tests. fail_point!("walreceiver-after-ingest"); - last_rec_lsn = record_end_lsn; + last_rec_lsn = next_record_lsn; // Commit every ingest_batch_size records. Even if we filtered out // all records, we still need to call commit to advance the LSN. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 84e553f330..38d69760f2 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -154,7 +154,7 @@ impl WalIngest { WAL_INGEST.records_received.inc(); let prev_len = modification.len(); - modification.set_lsn(interpreted.end_lsn)?; + modification.set_lsn(interpreted.next_record_lsn)?; if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) { // Records of this type should always be preceded by a commit(), as they From de7e4a34ca61cd1dc671b08239c2042d676e4043 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 17 Nov 2024 18:19:14 +0100 Subject: [PATCH 17/19] safekeeper: send `AppendResponse` on segment flush (#9692) ## Problem When processing pipelined `AppendRequest`s, we explicitly flush the WAL every second and return an `AppendResponse`. However, the WAL is also implicitly flushed on segment bounds, but this does not result in an `AppendResponse`. Because of this, concurrent transactions may take up to 1 second to commit and writes may take up to 1 second before sending to the pageserver. ## Summary of changes Advance `flush_lsn` when a WAL segment is closed and flushed, and emit an `AppendResponse`. To accommodate this, track the `flush_lsn` in addition to the `flush_record_lsn`. --- safekeeper/src/receive_wal.rs | 3 ++ safekeeper/src/safekeeper.rs | 5 ++- safekeeper/src/wal_storage.rs | 71 +++++++++++++++++++++++------------ 3 files changed, 54 insertions(+), 25 deletions(-) diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index a0a96c6e99..2edcc4ef6f 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -562,6 +562,9 @@ impl WalAcceptor { // Don't flush the WAL on every append, only periodically via flush_ticker. // This batches multiple appends per fsync. If the channel is empty after // sending the reply, we'll schedule an immediate flush. + // + // Note that a flush can still happen on segment bounds, which will result + // in an AppendResponse. if let ProposerAcceptorMessage::AppendRequest(append_request) = msg { msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); dirty = true; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index f4983d44d0..6eb69f0b7c 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -947,6 +947,7 @@ where // while first connection still gets some packets later. It might be // better to not log this as error! above. let write_lsn = self.wal_store.write_lsn(); + let flush_lsn = self.wal_store.flush_lsn(); if write_lsn > msg.h.begin_lsn { bail!( "append request rewrites WAL written before, write_lsn={}, msg lsn={}", @@ -1004,7 +1005,9 @@ where ); // If flush_lsn hasn't updated, AppendResponse is not very useful. - if !require_flush { + // This is the common case for !require_flush, but a flush can still + // happen on segment bounds. + if !require_flush && flush_lsn == self.flush_lsn() { return Ok(None); } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index c3bb6cd12c..e338d70731 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -113,6 +113,13 @@ pub struct PhysicalStorage { /// non-aligned chunks of data. write_record_lsn: Lsn, + /// The last LSN flushed to disk. May be in the middle of a record. + /// + /// NB: when the rest of the system refers to `flush_lsn`, it usually + /// actually refers to `flush_record_lsn`. This ambiguity can be dangerous + /// and should be resolved. + flush_lsn: Lsn, + /// The LSN of the last WAL record flushed to disk. flush_record_lsn: Lsn, @@ -211,6 +218,7 @@ impl PhysicalStorage { system_id: state.server.system_id, write_lsn, write_record_lsn: write_lsn, + flush_lsn, flush_record_lsn: flush_lsn, decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000), file: None, @@ -295,8 +303,9 @@ impl PhysicalStorage { } } - /// Write WAL bytes, which are known to be located in a single WAL segment. - async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { + /// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the + /// segment was completed, closed, and flushed to disk. + async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result { let mut file = if let Some(file) = self.file.take() { file } else { @@ -320,20 +329,24 @@ impl PhysicalStorage { let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size); fs::rename(wal_file_partial_path, wal_file_path).await?; + Ok(true) } else { // otherwise, file can be reused later self.file = Some(file); + Ok(false) } - - Ok(()) } /// Writes WAL to the segment files, until everything is writed. If some segments /// are fully written, they are flushed to disk. The last (partial) segment can /// be flushed separately later. /// - /// Updates `write_lsn`. + /// Updates `write_lsn` and `flush_lsn`. async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { + // TODO: this shouldn't be possible, except possibly with write_lsn == 0. + // Rename this method to `append_exact`, and make it append-only, removing + // the `pos` parameter and this check. For this reason, we don't update + // `flush_lsn` here. if self.write_lsn != pos { // need to flush the file before discarding it if let Some(file) = self.file.take() { @@ -355,9 +368,13 @@ impl PhysicalStorage { buf.len() }; - self.write_in_segment(segno, xlogoff, &buf[..bytes_write]) + let flushed = self + .write_in_segment(segno, xlogoff, &buf[..bytes_write]) .await?; self.write_lsn += bytes_write as u64; + if flushed { + self.flush_lsn = self.write_lsn; + } buf = &buf[bytes_write..]; } @@ -371,6 +388,9 @@ impl Storage for PhysicalStorage { self.write_lsn } /// flush_lsn returns LSN of last durably stored WAL record. + /// + /// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing. + #[allow(clippy::misnamed_getters)] fn flush_lsn(&self) -> Lsn { self.flush_record_lsn } @@ -424,8 +444,9 @@ impl Storage for PhysicalStorage { self.metrics.observe_write_seconds(write_seconds); self.metrics.observe_write_bytes(buf.len()); - // figure out last record's end lsn for reporting (if we got the - // whole record) + // Figure out the last record's end LSN and update `write_record_lsn` + // (if we got a whole record). The write may also have closed and + // flushed a segment, so update `flush_record_lsn` as well. if self.decoder.available() != startpos { info!( "restart decoder from {} to {}", @@ -436,12 +457,15 @@ impl Storage for PhysicalStorage { self.decoder = WalStreamDecoder::new(startpos, pg_version); } self.decoder.feed_bytes(buf); - loop { - match self.decoder.poll_decode()? { - None => break, // no full record yet - Some((lsn, _rec)) => { - self.write_record_lsn = lsn; - } + + if self.write_record_lsn <= self.flush_lsn { + // We may have flushed a previously written record. + self.flush_record_lsn = self.write_record_lsn; + } + while let Some((lsn, _rec)) = self.decoder.poll_decode()? { + self.write_record_lsn = lsn; + if lsn <= self.flush_lsn { + self.flush_record_lsn = lsn; } } @@ -458,19 +482,17 @@ impl Storage for PhysicalStorage { self.fdatasync_file(&unflushed_file).await?; self.file = Some(unflushed_file); } else { - // We have unflushed data (write_lsn != flush_lsn), but no file. - // This should only happen if last file was fully written and flushed, - // but haven't updated flush_lsn yet. - if self.write_lsn.segment_offset(self.wal_seg_size) != 0 { - bail!( - "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}", - self.write_lsn, - self.flush_record_lsn - ); - } + // We have unflushed data (write_lsn != flush_lsn), but no file. This + // shouldn't happen, since the segment is flushed on close. + bail!( + "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}", + self.write_lsn, + self.flush_record_lsn + ); } // everything is flushed now, let's update flush_lsn + self.flush_lsn = self.write_lsn; self.flush_record_lsn = self.write_record_lsn; Ok(()) } @@ -517,6 +539,7 @@ impl Storage for PhysicalStorage { self.pending_wal_truncation = true; self.write_lsn = end_pos; + self.flush_lsn = end_pos; self.write_record_lsn = end_pos; self.flush_record_lsn = end_pos; From 88801341717bb51ca779ba5c9866cccf5f7fbd58 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 17 Nov 2024 19:52:05 +0100 Subject: [PATCH 18/19] Cargo.toml: upgrade tikv-jemallocator to 0.6.0 (#9779) --- Cargo.lock | 12 ++++++------ Cargo.toml | 4 ++-- workspace_hack/Cargo.toml | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6e3f9ddb1..954bac1c24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6074,9 +6074,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-ctl" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c" +checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b" dependencies = [ "libc", "paste", @@ -6085,9 +6085,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-sys" -version = "0.5.4+5.3.0-patched" +version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1" +checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" dependencies = [ "cc", "libc", @@ -6095,9 +6095,9 @@ dependencies = [ [[package]] name = "tikv-jemallocator" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca" +checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" dependencies = [ "libc", "tikv-jemalloc-sys", diff --git a/Cargo.toml b/Cargo.toml index 706d742f1b..dbda930535 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -168,8 +168,8 @@ sync_wrapper = "0.1.2" tar = "0.4" test-context = "0.3" thiserror = "1.0" -tikv-jemallocator = "0.5" -tikv-jemalloc-ctl = "0.5" +tikv-jemallocator = { version = "0.6", features = ["stats"] } +tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] } tokio = { version = "1.17", features = ["macros"] } tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } tokio-io-timeout = "1.2.0" diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index d6773987ea..53d3a7364b 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -75,7 +75,7 @@ smallvec = { version = "1", default-features = false, features = ["const_new", " spki = { version = "0.7", default-features = false, features = ["pem", "std"] } subtle = { version = "2" } sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] } -tikv-jemalloc-sys = { version = "0.5" } +tikv-jemalloc-sys = { version = "0.6", features = ["stats"] } time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] } From b6154b03f4b5a04e0dc364019c9d463c324df312 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 17 Nov 2024 21:25:16 +0100 Subject: [PATCH 19/19] build(deps): bump smallvec to 1.13.2 to get UB fix (#9781) Smallvec 1.13.2 contains [an UB fix](https://github.com/servo/rust-smallvec/pull/345). Upstream opened [a request](https://github.com/rustsec/advisory-db/issues/1960) for this in the advisory-db but it never got acted upon. Found while working on https://github.com/neondatabase/neon/pull/9321. --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 954bac1c24..f92da5ec51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5663,9 +5663,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smol_str"