From fef77b0cc981f71238e1117d392ea55ec867e61f Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 15 Aug 2024 09:02:33 +0100 Subject: [PATCH 01/14] safekeeper: consider partial uploads when pulling timeline (#8628) ## Problem The control file contains the id of the safekeeper that uploaded it. Previously, when sending a snapshot of the control file to another sk, it would eventually be gc-ed by the receiving sk. This is incorrect because the original sk might still need it later. ## Summary of Changes When sending a snapshot and the control file contains an uploaded segment: * Create a copy of the segment in s3 with the destination sk in the object name * Tweak the streamed control file to point to the object create in the previous step Note that the snapshot endpoint now has to know the id of the requestor, so the api has been extended to include the node if of the destination sk. Closes https://github.com/neondatabase/neon/issues/8542 --- safekeeper/src/control_file.rs | 42 +++--- safekeeper/src/http/client.rs | 7 +- safekeeper/src/http/routes.rs | 11 +- safekeeper/src/pull_timeline.rs | 64 +++++++--- safekeeper/src/wal_backup.rs | 10 ++ safekeeper/src/wal_backup_partial.rs | 57 ++++++++- test_runner/fixtures/neon_fixtures.py | 23 +++- test_runner/regress/test_wal_acceptor.py | 155 ++++++++++++++++++++++- 8 files changed, 327 insertions(+), 42 deletions(-) diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index d574bb438f..c551cd3122 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -164,6 +164,30 @@ impl Deref for FileStorage { } } +impl TimelinePersistentState { + pub(crate) fn write_to_buf(&self) -> Result> { + let mut buf: Vec = Vec::new(); + WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; + + if self.eviction_state == EvictionState::Present { + // temp hack for forward compatibility + const PREV_FORMAT_VERSION: u32 = 8; + let prev = downgrade_v9_to_v8(self); + WriteBytesExt::write_u32::(&mut buf, PREV_FORMAT_VERSION)?; + prev.ser_into(&mut buf)?; + } else { + // otherwise, we write the current format version + WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; + self.ser_into(&mut buf)?; + } + + // calculate checksum before resize + let checksum = crc32c::crc32c(&buf); + buf.extend_from_slice(&checksum.to_le_bytes()); + Ok(buf) + } +} + #[async_trait::async_trait] impl Storage for FileStorage { /// Persists state durably to the underlying storage. @@ -180,24 +204,8 @@ impl Storage for FileStorage { &control_partial_path ) })?; - let mut buf: Vec = Vec::new(); - WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; - if s.eviction_state == EvictionState::Present { - // temp hack for forward compatibility - const PREV_FORMAT_VERSION: u32 = 8; - let prev = downgrade_v9_to_v8(s); - WriteBytesExt::write_u32::(&mut buf, PREV_FORMAT_VERSION)?; - prev.ser_into(&mut buf)?; - } else { - // otherwise, we write the current format version - WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; - s.ser_into(&mut buf)?; - } - - // calculate checksum before resize - let checksum = crc32c::crc32c(&buf); - buf.extend_from_slice(&checksum.to_le_bytes()); + let buf: Vec = s.write_to_buf()?; control_partial.write_all(&buf).await.with_context(|| { format!( diff --git a/safekeeper/src/http/client.rs b/safekeeper/src/http/client.rs index 0bb31c200d..c56f7880d4 100644 --- a/safekeeper/src/http/client.rs +++ b/safekeeper/src/http/client.rs @@ -10,7 +10,7 @@ use reqwest::{IntoUrl, Method, StatusCode}; use utils::{ http::error::HttpErrorBody, - id::{TenantId, TimelineId}, + id::{NodeId, TenantId, TimelineId}, logging::SecretString, }; @@ -97,10 +97,11 @@ impl Client { &self, tenant_id: TenantId, timeline_id: TimelineId, + stream_to: NodeId, ) -> Result { let uri = format!( - "{}/v1/tenant/{}/timeline/{}/snapshot", - self.mgmt_api_endpoint, tenant_id, timeline_id + "{}/v1/tenant/{}/timeline/{}/snapshot/{}", + self.mgmt_api_endpoint, tenant_id, timeline_id, stream_to.0 ); self.get(&uri).await } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index fe6d325cee..c9defb0bcf 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -205,6 +205,7 @@ async fn timeline_pull_handler(mut request: Request) -> Result) -> Result, ApiError> { + let destination = parse_request_param(&request, "destination_id")?; let ttid = TenantTimelineId::new( parse_request_param(&request, "tenant_id")?, parse_request_param(&request, "timeline_id")?, @@ -225,7 +226,13 @@ async fn timeline_snapshot_handler(request: Request) -> Result RouterBuilder request_span(r, tenant_delete_handler) }) .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot", + "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id", |r| request_span(r, timeline_snapshot_handler), ) .post("/v1/pull_timeline", |r| { diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 618c6b278f..1eacec9981 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -11,13 +11,8 @@ use std::{ io::{self, ErrorKind}, sync::Arc, }; -use tokio::{ - fs::{File, OpenOptions}, - io::AsyncWrite, - sync::mpsc, - task, -}; -use tokio_tar::{Archive, Builder}; +use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task}; +use tokio_tar::{Archive, Builder, Header}; use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, @@ -32,13 +27,15 @@ use crate::{ routes::TimelineStatus, }, safekeeper::Term, + state::TimelinePersistentState, timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline}, + wal_backup, wal_storage::{self, open_wal_file, Storage}, GlobalTimelines, SafeKeeperConf, }; use utils::{ crashsafe::{durable_rename, fsync_async_opt}, - id::{TenantId, TenantTimelineId, TimelineId}, + id::{NodeId, TenantId, TenantTimelineId, TimelineId}, logging::SecretString, lsn::Lsn, pausable_failpoint, @@ -46,8 +43,13 @@ use utils::{ /// Stream tar archive of timeline to tx. #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))] -pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender>) { - if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await { +pub async fn stream_snapshot( + tli: WalResidentTimeline, + source: NodeId, + destination: NodeId, + tx: mpsc::Sender>, +) { + if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await { // Error type/contents don't matter as they won't can't reach the client // (hyper likely doesn't do anything with it), but http stream will be // prematurely terminated. It would be nice to try to send the error in @@ -81,6 +83,8 @@ impl Drop for SnapshotContext { pub async fn stream_snapshot_guts( tli: WalResidentTimeline, + source: NodeId, + destination: NodeId, tx: mpsc::Sender>, ) -> Result<()> { // tokio-tar wants Write implementor, but we have mpsc tx >; @@ -104,7 +108,7 @@ pub async fn stream_snapshot_guts( // which is also likely suboptimal. let mut ar = Builder::new_non_terminated(pinned_writer); - let bctx = tli.start_snapshot(&mut ar).await?; + let bctx = tli.start_snapshot(&mut ar, source, destination).await?; pausable_failpoint!("sk-snapshot-after-list-pausable"); let tli_dir = tli.get_timeline_dir(); @@ -158,13 +162,43 @@ impl WalResidentTimeline { async fn start_snapshot( &self, ar: &mut tokio_tar::Builder, + source: NodeId, + destination: NodeId, ) -> Result { let mut shared_state = self.write_shared_state().await; let wal_seg_size = shared_state.get_wal_seg_size(); - let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME); - let mut cf = File::open(cf_path).await?; - ar.append_file(CONTROL_FILE_NAME, &mut cf).await?; + let mut control_store = TimelinePersistentState::clone(shared_state.sk.state()); + // Modify the partial segment of the in-memory copy for the control file to + // point to the destination safekeeper. + let replace = control_store + .partial_backup + .replace_uploaded_segment(source, destination)?; + + if let Some(replace) = replace { + // The deserialized control file has an uploaded partial. We upload a copy + // of it to object storage for the destination safekeeper and send an updated + // control file in the snapshot. + tracing::info!( + "Replacing uploaded partial segment in in-mem control file: {replace:?}" + ); + + let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?; + wal_backup::copy_partial_segment( + &replace.previous.remote_path(&remote_timeline_path), + &replace.current.remote_path(&remote_timeline_path), + ) + .await?; + } + + let buf = control_store + .write_to_buf() + .with_context(|| "failed to serialize control store")?; + let mut header = Header::new_gnu(); + header.set_size(buf.len().try_into().expect("never breaches u64")); + ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice()) + .await + .with_context(|| "failed to append to archive")?; // We need to stream since the oldest segment someone (s3 or pageserver) // still needs. This duplicates calc_horizon_lsn logic. @@ -342,7 +376,7 @@ async fn pull_timeline( let client = Client::new(host.clone(), sk_auth_token.clone()); // Request stream with basebackup archive. let bb_resp = client - .snapshot(status.tenant_id, status.timeline_id) + .snapshot(status.tenant_id, status.timeline_id, conf.my_id) .await?; // Make Stream of Bytes from it... diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 234273e133..aa1a6696a1 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -483,6 +483,16 @@ pub(crate) async fn backup_partial_segment( .await } +pub(crate) async fn copy_partial_segment( + source: &RemotePath, + destination: &RemotePath, +) -> Result<()> { + let storage = get_configured_remote_storage(); + let cancel = CancellationToken::new(); + + storage.copy_object(source, destination, &cancel).await +} + pub async fn read_object( file_path: &RemotePath, offset: u64, diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 52765b0e98..675a051887 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -17,14 +17,13 @@ //! file. Code updates state in the control file before doing any S3 operations. //! This way control file stores information about all potentially existing //! remote partial segments and can clean them up after uploading a newer version. - use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, instrument, warn}; -use utils::lsn::Lsn; +use utils::{id::NodeId, lsn::Lsn}; use crate::{ metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, @@ -82,6 +81,12 @@ pub struct State { pub segments: Vec, } +#[derive(Debug)] +pub(crate) struct ReplaceUploadedSegment { + pub(crate) previous: PartialRemoteSegment, + pub(crate) current: PartialRemoteSegment, +} + impl State { /// Find an Uploaded segment. There should be only one Uploaded segment at a time. pub(crate) fn uploaded_segment(&self) -> Option { @@ -90,6 +95,54 @@ impl State { .find(|seg| seg.status == UploadStatus::Uploaded) .cloned() } + + /// Replace the name of the Uploaded segment (if one exists) in order to match + /// it with `destination` safekeeper. Returns a description of the change or None + /// wrapped in anyhow::Result. + pub(crate) fn replace_uploaded_segment( + &mut self, + source: NodeId, + destination: NodeId, + ) -> anyhow::Result> { + let current = self + .segments + .iter_mut() + .find(|seg| seg.status == UploadStatus::Uploaded); + + let current = match current { + Some(some) => some, + None => { + return anyhow::Ok(None); + } + }; + + // Sanity check that the partial segment we are replacing is belongs + // to the `source` SK. + if !current + .name + .ends_with(format!("sk{}.partial", source.0).as_str()) + { + anyhow::bail!( + "Partial segment name ({}) doesn't match self node id ({})", + current.name, + source + ); + } + + let previous = current.clone(); + + let new_name = current.name.replace( + format!("_sk{}", source.0).as_str(), + format!("_sk{}", destination.0).as_str(), + ); + + current.name = new_name; + + anyhow::Ok(Some(ReplaceUploadedSegment { + previous, + current: current.clone(), + })) + } } struct PartialBackup { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index aaa1f21997..b76432127d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -67,6 +67,7 @@ from fixtures.pageserver.utils import ( from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import ( + LocalFsStorage, MockS3Server, RemoteStorage, RemoteStorageKind, @@ -4425,14 +4426,32 @@ class Safekeeper(LogUtils): def timeline_dir(self, tenant_id, timeline_id) -> Path: return self.data_dir / str(tenant_id) / str(timeline_id) + def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId): + tline_path = ( + self.env.repo_dir + / "local_fs_remote_storage" + / "safekeeper" + / str(tenant_id) + / str(timeline_id) + ) + assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage) + return self._list_segments_in_dir( + tline_path, lambda name: ".metadata" not in name and ".___temp" not in name + ) + def list_segments(self, tenant_id, timeline_id) -> List[str]: """ Get list of segment names of the given timeline. """ tli_dir = self.timeline_dir(tenant_id, timeline_id) + return self._list_segments_in_dir( + tli_dir, lambda name: not name.startswith("safekeeper.control") + ) + + def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]: segments = [] - for _, _, filenames in os.walk(tli_dir): - segments.extend([f for f in filenames if not f.startswith("safekeeper.control")]) + for _, _, filenames in os.walk(path): + segments.extend([f for f in filenames if keep_filter(f)]) segments.sort() return segments diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index bf7829fc84..5d3b263936 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -49,7 +49,13 @@ from fixtures.remote_storage import ( ) from fixtures.safekeeper.http import SafekeeperHttpClient from fixtures.safekeeper.utils import are_walreceivers_absent -from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background +from fixtures.utils import ( + PropagatingThread, + get_dir_size, + query_scalar, + start_in_background, + wait_until, +) def wait_lsn_force_checkpoint( @@ -63,6 +69,18 @@ def wait_lsn_force_checkpoint( lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver") + wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options) + + +def wait_lsn_force_checkpoint_at( + lsn: Lsn, + tenant_id: TenantId, + timeline_id: TimelineId, + ps: NeonPageserver, + pageserver_conn_options=None, +): + pageserver_conn_options = pageserver_conn_options or {} + auth_token = None if "password" in pageserver_conn_options: auth_token = pageserver_conn_options["password"] @@ -2304,3 +2322,138 @@ def test_s3_eviction( ) assert event_metrics_seen + + +def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder): + """ + Verify that pulling timeline from a SK with an uploaded partial segment + does not lead to consistency issues: + 1. Start 3 SKs - only use two + 2. Ingest a bit of WAL + 3. Wait for partial to be uploaded + 4. Pull timeline to the third SK + 6. Replace source with destination SK and start compute + 5. Wait for source SK to evict timeline + 6. Go back to initial compute SK config and validate that + source SK can unevict the timeline (S3 state is consistent) + """ + neon_env_builder.auth_enabled = True + neon_env_builder.num_safekeepers = 3 + neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) + + neon_env_builder.safekeeper_extra_opts = [ + "--enable-offload", + "--delete-offloaded-wal", + "--partial-backup-timeout", + "500ms", + "--control-file-save-interval", + "500ms", + "--eviction-min-resident=500ms", + ] + + env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"}) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + + log.info("use only first 2 safekeepers, 3rd will be seeded") + endpoint = env.endpoints.create("main") + endpoint.active_safekeepers = [1, 2] + endpoint.start() + endpoint.safe_psql("create table t(key int, value text)") + endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'") + + endpoint.stop() + + def source_partial_segment_uploaded(): + first_segment_name = "000000010000000000000001" + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + + candidate_seg = None + for seg in segs: + if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name): + candidate_seg = seg + + if candidate_seg is not None: + # The term might change, causing the segment to be gc-ed shortly after, + # so give it a bit of time to make sure it's stable. + time.sleep(2) + + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + assert candidate_seg in segs + return candidate_seg + + raise Exception("Partial segment not uploaded yet") + + source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded) + log.info( + f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + log.info(f"Tracking source partial segment: {source_partial_segment}") + + src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id) + log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}") + + pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)} + wait_lsn_force_checkpoint_at( + src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options + ) + + dst_sk.pull_timeline([src_sk], tenant_id, timeline_id) + + def evicted(): + evictions = src_sk.http_client().get_metric_value( + "safekeeper_eviction_events_completed_total", {"kind": "evict"} + ) + + if evictions is None or evictions == 0: + raise Exception("Eviction did not happen on source safekeeper yet") + + wait_until(30, 1, evicted) + + endpoint.start(safekeepers=[2, 3]) + + def new_partial_segment_uploaded(): + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + for seg in segs: + if "partial" in seg and "sk3" in seg: + return seg + + raise Exception("Partial segment not uploaded yet") + + log.info( + f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + + endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'") + wait_until(15, 1, new_partial_segment_uploaded) + + log.info( + f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + + # Allow for some gc iterations to happen and assert that the original + # uploaded partial segment remains in place. + time.sleep(5) + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + assert source_partial_segment in segs + + log.info( + f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + + # Restart the endpoint in order to check that the source safekeeper + # can unevict the timeline + endpoint.stop() + endpoint.start(safekeepers=[1, 2]) + + def unevicted(): + unevictions = src_sk.http_client().get_metric_value( + "safekeeper_eviction_events_completed_total", {"kind": "restore"} + ) + + if unevictions is None or unevictions == 0: + raise Exception("Uneviction did not happen on source safekeeper yet") + + wait_until(10, 1, unevicted) From a9c28be7d02226032f153edf6c7b527aec9fa5db Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 15 Aug 2024 10:06:28 +0100 Subject: [PATCH 02/14] fix(pageserver): allow unused_imports in download.rs on macOS (#8733) ## Problem On macOS, clippy fails with the following error: ``` error: unused import: `crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt` --> pageserver/src/tenant/remote_timeline_client/download.rs:26:5 | 26 | use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `-D unused-imports` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_imports)]` ``` Introduced in https://github.com/neondatabase/neon/pull/8717 ## Summary of changes - allow `unused_imports` for `crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt` on macOS in download.rs --- pageserver/src/tenant/remote_timeline_client/download.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 8199218c3c..d9725ad756 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -23,6 +23,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::tenant::Generation; +#[cfg_attr(target_os = "macos", allow(unused_imports))] use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; From d9a57aeed9ca9b0e2134e7183355d52fb6a089d1 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 15 Aug 2024 12:54:05 +0300 Subject: [PATCH 03/14] storcon: deny external node configuration if an operation is ongoing (#8727) Per #8674, disallow node configuration while drain/fill are ongoing. Implement it by adding a only-http wrapper `Service::external_node_configure` which checks for operation existing before configuring. Additionally: - allow cancelling drain/fill after a pageserver has restarted and transitioned to WarmingUp Fixes: #8674 --- libs/pageserver_api/src/controller_api.rs | 3 -- storage_controller/src/http.rs | 2 +- storage_controller/src/service.rs | 42 +++++++++++------- .../regress/test_storage_controller.py | 44 +++++++++++++++++++ 4 files changed, 70 insertions(+), 21 deletions(-) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index a5b452da83..a50707a1b8 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -313,20 +313,17 @@ pub struct MetadataHealthUpdateRequest { pub struct MetadataHealthUpdateResponse {} #[derive(Serialize, Deserialize, Debug)] - pub struct MetadataHealthListUnhealthyResponse { pub unhealthy_tenant_shards: Vec, } #[derive(Serialize, Deserialize, Debug)] - pub struct MetadataHealthListOutdatedRequest { #[serde(with = "humantime_serde")] pub not_scrubbed_for: Duration, } #[derive(Serialize, Deserialize, Debug)] - pub struct MetadataHealthListOutdatedResponse { pub health_records: Vec, } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index e8513b31eb..e755aaed19 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -500,7 +500,7 @@ async fn handle_node_configure(mut req: Request) -> Result, StatusCode::OK, state .service - .node_configure( + .external_node_configure( config_req.node_id, config_req.availability.map(NodeAvailability::from), config_req.scheduling, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ef4cd91efd..d717924ae6 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4912,6 +4912,26 @@ impl Service { Ok(()) } + /// Wrapper around [`Self::node_configure`] which only allows changes while there is no ongoing + /// operation for HTTP api. + pub(crate) async fn external_node_configure( + &self, + node_id: NodeId, + availability: Option, + scheduling: Option, + ) -> Result<(), ApiError> { + { + let locked = self.inner.read().unwrap(); + if let Some(op) = locked.ongoing_operation.as_ref().map(|op| op.operation) { + return Err(ApiError::PreconditionFailed( + format!("Ongoing background operation forbids configuring: {op}").into(), + )); + } + } + + self.node_configure(node_id, availability, scheduling).await + } + pub(crate) async fn start_node_drain( self: &Arc, node_id: NodeId, @@ -5017,14 +5037,14 @@ impl Service { } pub(crate) async fn cancel_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> { - let (node_available, node_policy) = { + let node_available = { let locked = self.inner.read().unwrap(); let nodes = &locked.nodes; let node = nodes.get(&node_id).ok_or(ApiError::NotFound( anyhow::anyhow!("Node {} not registered", node_id).into(), ))?; - (node.is_available(), node.get_scheduling()) + node.is_available() }; if !node_available { @@ -5033,12 +5053,6 @@ impl Service { )); } - if !matches!(node_policy, NodeSchedulingPolicy::Draining) { - return Err(ApiError::PreconditionFailed( - format!("Node {node_id} has no drain in progress").into(), - )); - } - if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() { if let Operation::Drain(drain) = op_handler.operation { if drain.node_id == node_id { @@ -5152,14 +5166,14 @@ impl Service { } pub(crate) async fn cancel_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> { - let (node_available, node_policy) = { + let node_available = { let locked = self.inner.read().unwrap(); let nodes = &locked.nodes; let node = nodes.get(&node_id).ok_or(ApiError::NotFound( anyhow::anyhow!("Node {} not registered", node_id).into(), ))?; - (node.is_available(), node.get_scheduling()) + node.is_available() }; if !node_available { @@ -5168,12 +5182,6 @@ impl Service { )); } - if !matches!(node_policy, NodeSchedulingPolicy::Filling) { - return Err(ApiError::PreconditionFailed( - format!("Node {node_id} has no fill in progress").into(), - )); - } - if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() { if let Operation::Fill(fill) = op_handler.operation { if fill.node_id == node_id { @@ -5982,7 +5990,7 @@ impl Service { .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) .await; - failpoint_support::sleep_millis_async!("sleepy-drain-loop"); + failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel); } while !waiters.is_empty() { diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 9b2557a165..7d98ff2923 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2091,3 +2091,47 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder): ) == 0 ) + + +def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder): + # single unsharded tenant, two locations + neon_env_builder.num_pageservers = 2 + + env = neon_env_builder.init_start() + + env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}}) + env.storage_controller.reconcile_until_idle() + + attached_id = int(env.storage_controller.locate(env.initial_tenant)[0]["node_id"]) + attached = next((ps for ps in env.pageservers if ps.id == attached_id)) + + def attached_is_draining(): + details = env.storage_controller.node_status(attached.id) + assert details["scheduling"] == "Draining" + + env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(10000)")) + env.storage_controller.node_drain(attached.id) + + wait_until(10, 0.5, attached_is_draining) + + attached.restart() + + # we are unable to reconfigure node while the operation is still ongoing + with pytest.raises( + StorageControllerApiException, + match="Precondition failed: Ongoing background operation forbids configuring: drain.*", + ): + env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"}) + with pytest.raises( + StorageControllerApiException, + match="Precondition failed: Ongoing background operation forbids configuring: drain.*", + ): + env.storage_controller.node_configure(attached.id, {"availability": "Offline"}) + + env.storage_controller.cancel_node_drain(attached.id) + + def reconfigure_node_again(): + env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"}) + + # allow for small delay between actually having cancelled and being able reconfigure again + wait_until(4, 0.5, reconfigure_node_again) From 52641eb8533ec0bdd70523f2595a0265c9208dc7 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 15 Aug 2024 15:30:04 +0300 Subject: [PATCH 04/14] storcon: add spans to drain/fill ops (#8735) this way we do not need to repeat the %node_id everywhere, and we get no stray messages in logs from within the op. --- storage_controller/src/service.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index d717924ae6..84db088a42 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4989,6 +4989,8 @@ impl Service { cancel: cancel.clone(), }); + let span = tracing::info_span!(parent: None, "drain_node", %node_id); + tokio::task::spawn({ let service = self.clone(); let cancel = cancel.clone(); @@ -5005,21 +5007,21 @@ impl Service { } } - tracing::info!(%node_id, "Drain background operation starting"); + tracing::info!("Drain background operation starting"); let res = service.drain_node(node_id, cancel).await; match res { Ok(()) => { - tracing::info!(%node_id, "Drain background operation completed successfully"); + tracing::info!("Drain background operation completed successfully"); } Err(OperationError::Cancelled) => { - tracing::info!(%node_id, "Drain background operation was cancelled"); + tracing::info!("Drain background operation was cancelled"); } Err(err) => { - tracing::error!(%node_id, "Drain background operation encountered: {err}") + tracing::error!("Drain background operation encountered: {err}") } } } - }); + }.instrument(span)); } NodeSchedulingPolicy::Draining => { return Err(ApiError::Conflict(format!( @@ -5118,6 +5120,8 @@ impl Service { cancel: cancel.clone(), }); + let span = tracing::info_span!(parent: None, "fill_node", %node_id); + tokio::task::spawn({ let service = self.clone(); let cancel = cancel.clone(); @@ -5134,21 +5138,21 @@ impl Service { } } - tracing::info!(%node_id, "Fill background operation starting"); + tracing::info!("Fill background operation starting"); let res = service.fill_node(node_id, cancel).await; match res { Ok(()) => { - tracing::info!(%node_id, "Fill background operation completed successfully"); + tracing::info!("Fill background operation completed successfully"); } Err(OperationError::Cancelled) => { - tracing::info!(%node_id, "Fill background operation was cancelled"); + tracing::info!("Fill background operation was cancelled"); } Err(err) => { - tracing::error!(%node_id, "Fill background operation encountered: {err}") + tracing::error!("Fill background operation encountered: {err}") } } } - }); + }.instrument(span)); } NodeSchedulingPolicy::Filling => { return Err(ApiError::Conflict(format!( From 24d347f50b15bb8ba44f0b25589e180e6482e1a8 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 15 Aug 2024 16:27:07 +0300 Subject: [PATCH 05/14] storcon: use tracing for logging panics (#8734) this gives spans for panics, and does not globber loki output by writing to stderr while all of the other logging is to stdout. See: #3475 --- storage_controller/src/main.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 5a68799141..7387d36690 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -196,14 +196,26 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> { } fn main() -> anyhow::Result<()> { - let default_panic = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |info| { - default_panic(info); - std::process::exit(1); - })); + logging::init( + LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stdout, + )?; + + // log using tracing so we don't get confused output by default hook writing to stderr + utils::logging::replace_panic_hook_with_tracing_panic_hook().forget(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + let hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + // let sentry send a message (and flush) + // and trace the error + hook(info); + + std::process::exit(1); + })); + tokio::runtime::Builder::new_current_thread() // We use spawn_blocking for database operations, so require approximately // as many blocking threads as we will open database connections. @@ -217,12 +229,6 @@ fn main() -> anyhow::Result<()> { async fn async_main() -> anyhow::Result<()> { let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate())); - logging::init( - LogFormat::Plain, - logging::TracingErrorLayerEnablement::Disabled, - logging::Output::Stdout, - )?; - preinitialize_metrics(); let args = Cli::parse(); From f087423a0111d4fb5ac1e12007447c56b2a1c2a6 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 15 Aug 2024 16:28:25 +0300 Subject: [PATCH 06/14] Handle reload config file request in LR monitor (#8732) ## Problem Logical replication BGW checking replication lag is not reloading config ## Summary of changes Add handling of reload config request ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist Co-authored-by: Konstantin Knizhnik --- pgxn/neon/neon.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 784d0f1da3..fe8e276d1c 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -192,6 +192,13 @@ LogicalSlotsMonitorMain(Datum main_arg) { XLogRecPtr cutoff_lsn; + /* In case of a SIGHUP, just reload the configuration. */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + /* * If there are too many .snap files, just drop all logical slots to * prevent aux files bloat. From 4e58fd93216c5274e49488de161dc9ce12abd82d Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 15 Aug 2024 18:37:15 +0100 Subject: [PATCH 07/14] CI(label-for-external-users): use CI_ACCESS_TOKEN (#8738) ## Problem `secrets.GITHUB_TOKEN` (with any permissions) is not enough to get a user's membership info if they decide to hide it. ## Summary of changes - Use `secrets.CI_ACCESS_TOKEN` for `gh api` call - Use `pull_request_target` instead of `pull_request` event to get access to secrets --- .github/workflows/label-for-external-users.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/label-for-external-users.yml b/.github/workflows/label-for-external-users.yml index 7cf5ee254c..585d118dfb 100644 --- a/.github/workflows/label-for-external-users.yml +++ b/.github/workflows/label-for-external-users.yml @@ -4,7 +4,7 @@ on: issues: types: - opened - pull_request: + pull_request_target: types: - opened @@ -25,7 +25,7 @@ jobs: - name: Check whether `${{ github.actor }}` is a member of `${{ github.repository_owner }}` id: check-user env: - GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }} run: | if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then is_member=true @@ -45,10 +45,10 @@ jobs: issues: write # for `gh issue edit` steps: - - name: Label new ${{ github.event_name }} + - name: Add `${{ env.LABEL }}` label env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request' && 'pull_request' || 'issue'].number }} - GH_CLI_COMMAND: ${{ github.event_name == 'pull_request' && 'pr' || 'issue' }} + ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request_target' && 'pull_request' || 'issue'].number }} + GH_CLI_COMMAND: ${{ github.event_name == 'pull_request_target' && 'pr' || 'issue' }} run: | gh ${GH_CLI_COMMAND} --repo ${GITHUB_REPOSITORY} edit --add-label=${LABEL} ${ITEM_NUMBER} From 69cb1ee479ecdc99dd117fe4149b59dd54676fea Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 15 Aug 2024 22:41:58 +0100 Subject: [PATCH 08/14] CI(replication-tests): store test results & change notification channel (#8687) ## Problem We want to store Nightly Replication test results in the database and notify the relevant Slack channel about failures ## Summary of changes - Store test results in the database - Notify `on-call-compute-staging-stream` about failures --- .github/workflows/benchmarking.yml | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index f99a037489..a4a597acde 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -222,13 +222,20 @@ jobs: id: create-allure-report if: ${{ !cancelled() }} uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} - name: Post to a Slack channel if: ${{ github.event.schedule && failure() }} uses: slackapi/slack-github-action@v1 with: - channel-id: "C033QLM5P7D" # dev-staging-stream - slack-message: "Periodic replication testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" + channel-id: "C06T9AMNDQQ" # on-call-compute-staging-stream + slack-message: | + Periodic replication testing: ${{ job.status }} + <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run> + <${{ steps.create-allure-report.outputs.report-url }}|Allure report> env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} @@ -330,7 +337,7 @@ jobs: prepare_AWS_RDS_databases: uses: ./.github/workflows/_benchmarking_preparation.yml secrets: inherit - + pgbench-compare: if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} needs: [ generate-matrices, prepare_AWS_RDS_databases ] From df086cd139ee5ecc82bf096fc3fc6ee4397ac983 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Thu, 15 Aug 2024 15:34:45 -0700 Subject: [PATCH 09/14] Add logical replication test to exercise snapfiles (#8364) --- .../performance/test_logical_replication.py | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 4b4ffc1fee..c4e42a7834 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -262,3 +262,85 @@ def test_publisher_restart( sub_workload.terminate() finally: pub_workload.terminate() + + +@pytest.mark.remote_cluster +@pytest.mark.timeout(2 * 60 * 60) +def test_snap_files( + pg_bin: PgBin, + benchmark_project_pub: NeonApiEndpoint, + zenbenchmark: NeonBenchmarker, +): + """ + Creates a node with a replication slot. Generates pgbench into the replication slot, + then runs pgbench inserts while generating large numbers of snapfiles. Then restarts + the node and tries to peek the replication changes. + """ + test_duration_min = 60 + test_interval_min = 5 + pgbench_duration = f"-T{test_duration_min * 60 * 2}" + + env = benchmark_project_pub.pgbench_env + connstr = benchmark_project_pub.connstr + pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env) + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'") + is_super = cur.fetchall()[0] + assert is_super, "This benchmark won't work if we don't have superuser" + + conn = psycopg2.connect(connstr) + conn.autocommit = True + cur = conn.cursor() + cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1") + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT pg_reload_conf()") + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_replication_slots + WHERE slot_name = 'slotter' + ) THEN + PERFORM pg_drop_replication_slot('slotter'); + END IF; + END $$; + """ + ) + cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + + workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env) + try: + start = time.time() + prev_measurement = time.time() + while time.time() - start < test_duration_min * 60: + with psycopg2.connect(connstr) as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" + ) + check_pgbench_still_running(workload) + cur.execute( + "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" + ) + + # Measure storage + if time.time() - prev_measurement > test_interval_min * 60: + storage = benchmark_project_pub.get_synthetic_storage_size() + zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) + prev_measurement = time.time() + time.sleep(test_interval_min * 60 / 3) + + finally: + workload.terminate() From 4763a960d103a27250eadd6892368ae77a3d66c4 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 16 Aug 2024 08:10:05 +0300 Subject: [PATCH 10/14] chore: log if we have an open layer or any frozen on shutdown (#8740) Some benchmarks are failing with a "long" flushing, which might be because there is a queue of in-memory layers, or something else. Add logging to narrow it down. Private slack DM ref: https://neondb.slack.com/archives/D049K7HJ9JM/p1723727305238099 --- pageserver/src/tenant/timeline.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b4d908b130..01e77fa1b1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1645,6 +1645,20 @@ impl Timeline { self.last_record_lsn.shutdown(); if try_freeze_and_flush { + if let Some((open, frozen)) = self + .layers + .read() + .await + .layer_map() + .map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len())) + .ok() + .filter(|(open, frozen)| *open || *frozen > 0) + { + tracing::info!(?open, frozen, "flushing and freezing on shutdown"); + } else { + // this is double-shutdown, ignore it + } + // we shut down walreceiver above, so, we won't add anything more // to the InMemoryLayer; freeze it and wait for all frozen layers // to reach the disk & upload queue, then shut the upload queue and From 7fdc3ea16296ae7ac6f74ed2843ecee454391276 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 16 Aug 2024 13:30:53 +0300 Subject: [PATCH 11/14] Add retroactive RFC about physical replication (#8546) We've had physical replication support for a long time, but we never created an RFC for the feature. This RFC does that after the fact. Even though we've already implemented the feature, let's have a design discussion as if it hadn't done that. It can still be quite insightful. This is written from a pretty compute-centric viewpoint, not much on how it works in the control plane. --- docs/rfcs/036-physical-replication.md | 265 ++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 docs/rfcs/036-physical-replication.md diff --git a/docs/rfcs/036-physical-replication.md b/docs/rfcs/036-physical-replication.md new file mode 100644 index 0000000000..41aced0545 --- /dev/null +++ b/docs/rfcs/036-physical-replication.md @@ -0,0 +1,265 @@ +# Physical Replication + +This RFC is a bit special in that we have already implemented physical +replication a long time ago. However, we never properly wrote down all +the decisions and assumptions, and in the last months when more users +have started to use the feature, numerous issues have surfaced. + +This RFC documents the design decisions that have been made. + +## Summary + +PostgreSQL has a feature called streaming replication, where a replica +streams WAL from the primary and continuously applies it. It is also +known as "physical replication", to distinguish it from logical +replication. In PostgreSQL, a replica is initialized by taking a +physical backup of the primary. In Neon, the replica is initialized +from a slim "base backup" from the pageserver, just like a primary, +and the primary and the replicas connect to the same pageserver, +sharing the storage. + +There are two kinds of read-only replicas in Neon: +- replicas that follow the primary, and +- "static" replicas that are pinned at a particular LSN. + +A static replica is useful e.g. for performing time-travel queries and +running one-off slow queries without affecting the primary. A replica +that follows the primary can be used e.g. to scale out read-only +workloads. + +## Motivation + +Read-only replicas allow offloading read-only queries. It's useful for +isolation, if you want to make sure that read-only queries don't +affect the primary, and it's also an easy way to provide guaranteed +read-only access to an application, without having to mess with access +controls. + +## Non Goals (if relevant) + +This RFC is all about WAL-based *physical* replication. Logical +replication is a different feature. + +Neon also has the capability to launch "static" read-only nodes which +do not follow the primary, but are pinned to a particular LSN. They +can be used for long-running one-off queries, or for Point-in-time +queries. They work similarly to read replicas that follow the primary, +but some things are simpler: there are no concerns about cache +invalidation when the data changes on the primary, or worrying about +transactions that are in-progress on the primary. + +## Impacted components (e.g. pageserver, safekeeper, console, etc) + +- Control plane launches the replica +- Replica Postgres instance connects to the safekeepers, to stream the WAL +- The primary does not know about the standby, except for the hot standby feedback +- The primary and replicas all connect to the same pageservers + + +# Context + +Some useful things to know about hot standby and replicas in +PostgreSQL. + +## PostgreSQL startup sequence + +"Running" and "start up" terms are little imprecise. PostgreSQL +replica startup goes through several stages: + +1. First, the process is started up, and various initialization steps + are performed, like initializing shared memory. If you try to + connect to the server in this stage, you get an error: ERROR: the + database system is starting up. This stage happens very quickly, no + +2. Then the server reads the checpoint record from the WAL and starts + the WAL replay starting from the checkpoint. This works differently + in Neon: we start the WAL replay at the basebackup LSN, not from a + checkpoint! If you connect to the server in this state, you get an + error: ERROR: the database system is not yet accepting + connections. We proceed to the next stage, when the WAL replay sees + a running-xacts record. Or in Neon, the "CLOG scanning" mechanism + can allow us to move directly to next stage, with all the caveats + listed in this RFC. + +3. When the running-xacts information is established, the server + starts to accept connections normally. + +From PostgreSQL's point of view, the server is already running in +stage 2, even though it's not accepting connections yet. Our +`compute_ctl` does not consider it as running until stage 3. If the +transition from stage 2 to 3 doesn't happen fast enough, the control +plane will mark the start operation as failed. + + +## Decisions, Issues + +### Cache invalidation in replica + +When a read replica follows the primary in PostgreSQL, it needs to +stream all the WAL from the primary and apply all the records, to keep +the local copy of the data consistent with the primary. In Neon, the +replica can fetch the updated page versions from the pageserver, so +it's not necessary to apply all the WAL. However, it needs to ensure +that any pages that are currently in the Postgres buffer cache, or the +Local File Cache, are either updated, or thrown away so that the next +read of the page will fetch the latest version. + +We choose to apply the WAL records for pages that are already in the +buffer cache, and skip records for other pages. Somewhat arbitrarily, +we also apply records affecting catalog relations, fetching the old +page version from the pageserver if necessary first. See +`neon_redo_read_buffer_filter()` function. + +The replica wouldn't necessarily need to see all the WAL records, only +the records that apply to cached pages. For simplicity, we do stream +all the WAL to the replica, and the replica simply ignores WAL records +that require no action. + +Like in PostgreSQL, the read replica maintains a "replay LSN", which +is the LSN up to which the replica has received and replayed the +WAL. The replica can lag behind the primary, if it cannot quite keep +up with the primary, or if a long-running query conflicts with changes +that are about to be applied, or even intentionally if the user wishes +to see delayed data (see recovery_min_apply_delay). It's important +that the replica sees a consistent view of the whole cluster at the +replay LSN, when it's lagging behind. + +In Neon, the replica connects to a safekeeper to get the WAL +stream. That means that the safekeepers must be able to regurgitate +the original WAL as far back as the replay LSN of any running read +replica. (A static read-only node that does not follow the primary +does not require a WAL stream however). The primary does not need to +be running, and when it is, the replicas don't incur any extra +overhead to the primary (see hot standby feedback though). + +### In-progress transactions + +In PostgreSQL, when a hot standby server starts up, it cannot +immediately open up for queries (see [PostgreSQL startup +sequence]). It first needs to establish a complete list of in-progress +transactions, including subtransactions, that are running at the +primary, at the current replay LSN. Normally that happens quickly, +when the replica sees a "running-xacts" WAL record, because the +primary writes a running-xacts WAL record at every checkpoint, and in +PostgreSQL the replica always starts the WAL replay from a checkpoint +REDO point. (A shutdown checkpoint WAL record also implies that all +the non-prepared transactions have ended.) If there are a lot of +subtransactions in progress, however, the standby might need to wait +for old transactions to complete before it can open up for queries. + +In Neon that problem is worse: a replica can start at any LSN, so +there's no guarantee that it will see a running-xacts record any time +soon. In particular, if the primary is not running when the replica is +started, it might never see a running-xacts record. + +To make things worse, we initially missed this issue, and always +started accepting queries at replica startup, even if it didn't have +the transaction information. That could lead to incorrect query +results and data corruption later. However, as we fixed that, we +introduced a new problem compared to what we had before: previously +the replica would always start up, but after fixing that bug, it might +not. In a superficial way, the old behavior was better (but could lead +to serious issues later!). That made fixing that bug was very hard, +because as we fixed it, we made things (superficially) worse for +others. + +See https://github.com/neondatabase/neon/pull/7288 which fixed the +bug, and follow-up PRs https://github.com/neondatabase/neon/pull/8323 +and https://github.com/neondatabase/neon/pull/8484 to try to claw back +the cases that started to cause trouble as fixing it. As of this +writing, there are still cases where a replica might not immediately +start up, causing the control plane operation to fail, the remaining +issues are tracked in https://github.com/neondatabase/neon/issues/6211. + +One long-term fix for this is to switch to using so-called CSN +snapshots in read replica. That would make it unnecessary to have the +full in-progress transaction list in the replica at startup time. See +https://commitfest.postgresql.org/48/4912/ for a work-in-progress +patch to upstream to implement that. + +Another thing we could do is to teach the control plane about that +distinction between "starting up" and "running but haven't received +running-xacts information yet", so that we could keep the replica +waiting longer in that stage, and also give any client connections the +same `ERROR: the database system is not yet accepting connections` +error that you get in standalone PostgreSQL in that state. + + +### Recovery conflicts and Hot standby feedback + +It's possible that a tuple version is vacuumed away in the primary, +even though it is still needed by a running transactions in the +replica. This is called a "recovery conflict", and PostgreSQL provides +various options for dealing with it. By default, the WAL replay will +wait up to 30 s for the conflicting query to finish. After that, it +will kill the running query, so that the WAL replay can proceed. + +Another way to avoid the situation is to enable the +[`hot_standby_feedback`](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-HOT-STANDBY-FEEDBACK) +option. When it is enabled, the primary will refrain from vacuuming +tuples that are still needed in the primary. That means potentially +bloating the primary, which violates the usual rule that read replicas +don't affect the operations on the primary, which is why it's off by +default. We leave it to users to decide if they want to turn it on, +same as PostgreSQL. + +Neon supports `hot_standby_feedback` by passing the feedback messages +from the replica to the safekeepers, and from safekeepers to the +primary. + +### Relationship of settings between primary and replica + +In order to enter hot standby mode, some configuration options need to +be set to the same or larger values in the standby, compared to the +primary. See [explanation in the PostgreSQL +docs](https://www.postgresql.org/docs/current/hot-standby.html#HOT-STANDBY-ADMIN) + +In Neon, we have this problem too. To prevent customers from hitting +it, the control plane automatically adjusts the settings of a replica, +so that they match or exceed the primary's settings (see +https://github.com/neondatabase/cloud/issues/14903). However, you +can still hit the issue if the primary is restarted with larger +settings, while the replica is running. + + +### Interaction with Pageserver GC + +The read replica can lag behind the primary. If there are recovery +conflicts or the replica cannot keep up for some reason, the lag can +in principle grow indefinitely. The replica will issue all GetPage +requests to the pageservers at the current replay LSN, and needs to +see the old page versions. + +If the retention period in the pageserver is set to be small, it may +have already garbage collected away the old page versions. That will +cause read errors in the compute, and can mean that the replica cannot +make progress with the replication anymore. + +There is a mechanism for replica to pass information about its replay +LSN to the pageserver, so that the pageserver refrains from GC'ing +data that is still needed by the standby. It's called +'standby_horizon' in the pageserver code, see +https://github.com/neondatabase/neon/pull/7368. A separate "lease" +mechanism also is in the works, where the replica could hold a lease +on the old LSN, preventing the pageserver from advancing the GC +horizon past that point. The difference is that the standby_horizon +mechanism relies on a feedback message from replica to safekeeper, +while the least API is exposed directly from the pageserver. A static +read-only node is not connected to safekeepers, so it cannot use the +standby_horizon mechanism. + + +### Synchronous replication + +We haven't put any effort into synchronous replication yet. + +PostgreSQL provides multiple levels of synchronicity. In the weaker +levels, a transaction is not acknowledged as committed to the client +in the primary until the WAL has been streamed to a replica or flushed +to disk there. Those modes don't make senses in Neon, because the +safekeepers handle durability. + +`synchronous_commit=remote_apply` mode would make sense. In that mode, +the commit is not acknowledged to the client until it has been +replayed in the replica. That ensures that after commit, you can see +the commit in the replica too (aka. read-your-write consistency). From 3f91ea28d997a23b899ef0c3ce237e7ae85f2916 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 16 Aug 2024 13:05:04 +0100 Subject: [PATCH 12/14] tests: add infra and test for storcon leadership transfer (#8587) ## Problem https://github.com/neondatabase/neon/pull/8588 implemented the mechanism for storage controller leadership transfers. However, there's no tests that exercise the behaviour. ## Summary of changes 1. Teach `neon_local` how to handle multiple storage controller instances. Each storage controller instance gets its own subdirectory (`storage_controller_1, ...`). `storage_controller start|stop` subcommands have also been extended to optionally accept an instance id. 2. Add a storage controller proxy test fixture. It's a basic HTTP server that forwards requests from pageserver and test env to the currently configured storage controller. 3. Add a test which exercises storage controller leadership transfer. 4. Finally fix a couple bugs that the test surfaced --- control_plane/src/background_process.rs | 2 +- control_plane/src/bin/neon_local.rs | 86 +++- control_plane/src/local_env.rs | 37 ++ control_plane/src/storage_controller.rs | 396 ++++++++++++------ storage_controller/src/http.rs | 16 + storage_controller/src/peer_client.rs | 4 +- storage_controller/src/service.rs | 114 ++--- test_runner/conftest.py | 1 + test_runner/fixtures/neon_fixtures.py | 232 +++++++--- .../fixtures/storage_controller_proxy.py | 73 ++++ test_runner/fixtures/utils.py | 2 +- .../regress/test_storage_controller.py | 129 ++++++ 12 files changed, 841 insertions(+), 251 deletions(-) create mode 100644 test_runner/fixtures/storage_controller_proxy.py diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index bf8a27e550..619c5bce3e 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -379,7 +379,7 @@ where } } -fn process_has_stopped(pid: Pid) -> anyhow::Result { +pub(crate) fn process_has_stopped(pid: Pid) -> anyhow::Result { match kill(pid, None) { // Process exists, keep waiting Ok(_) => Ok(false), diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 51e9a51a57..edd88dc71c 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -15,7 +15,9 @@ use control_plane::local_env::{ }; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; -use control_plane::storage_controller::StorageController; +use control_plane::storage_controller::{ + NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController, +}; use control_plane::{broker, local_env}; use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, @@ -1052,6 +1054,36 @@ fn get_start_timeout(args: &ArgMatches) -> &Duration { humantime_duration.as_ref() } +fn storage_controller_start_args(args: &ArgMatches) -> NeonStorageControllerStartArgs { + let maybe_instance_id = args.get_one::("instance-id"); + + let base_port = args.get_one::("base-port"); + + if maybe_instance_id.is_some() && base_port.is_none() { + panic!("storage-controller start specificied instance-id but did not provide base-port"); + } + + let start_timeout = args + .get_one::("start-timeout") + .expect("invalid value for start-timeout"); + + NeonStorageControllerStartArgs { + instance_id: maybe_instance_id.copied().unwrap_or(1), + base_port: base_port.copied(), + start_timeout: *start_timeout, + } +} + +fn storage_controller_stop_args(args: &ArgMatches) -> NeonStorageControllerStopArgs { + let maybe_instance_id = args.get_one::("instance-id"); + let immediate = args.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); + + NeonStorageControllerStopArgs { + instance_id: maybe_instance_id.copied().unwrap_or(1), + immediate, + } +} + async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { match sub_match.subcommand() { Some(("start", subcommand_args)) => { @@ -1113,19 +1145,14 @@ async fn handle_storage_controller( let svc = StorageController::from_env(env); match sub_match.subcommand() { Some(("start", start_match)) => { - if let Err(e) = svc.start(get_start_timeout(start_match)).await { + if let Err(e) = svc.start(storage_controller_start_args(start_match)).await { eprintln!("start failed: {e}"); exit(1); } } Some(("stop", stop_match)) => { - let immediate = stop_match - .get_one::("stop-mode") - .map(|s| s.as_str()) - == Some("immediate"); - - if let Err(e) = svc.stop(immediate).await { + if let Err(e) = svc.stop(storage_controller_stop_args(stop_match)).await { eprintln!("stop failed: {}", e); exit(1); } @@ -1228,7 +1255,12 @@ async fn handle_start_all( // Only start the storage controller if the pageserver is configured to need it if env.control_plane_api.is_some() { let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller.start(retry_timeout).await { + if let Err(e) = storage_controller + .start(NeonStorageControllerStartArgs::with_default_instance_id( + (*retry_timeout).into(), + )) + .await + { eprintln!("storage_controller start failed: {:#}", e); try_stop_all(env, true).await; exit(1); @@ -1358,10 +1390,21 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { eprintln!("neon broker stop failed: {e:#}"); } - if env.control_plane_api.is_some() { + // Stop all storage controller instances. In the most common case there's only one, + // but iterate though the base data directory in order to discover the instances. + let storcon_instances = env + .storage_controller_instances() + .await + .expect("Must inspect data dir"); + for (instance_id, _instance_dir_path) in storcon_instances { let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller.stop(immediate).await { - eprintln!("storage controller stop failed: {e:#}"); + let stop_args = NeonStorageControllerStopArgs { + instance_id, + immediate, + }; + + if let Err(e) = storage_controller.stop(stop_args).await { + eprintln!("Storage controller instance {instance_id} stop failed: {e:#}"); } } } @@ -1501,6 +1544,18 @@ fn cli() -> Command { .action(ArgAction::SetTrue) .required(false); + let instance_id = Arg::new("instance-id") + .long("instance-id") + .help("Identifier used to distinguish storage controller instances (default 1)") + .value_parser(value_parser!(u8)) + .required(false); + + let base_port = Arg::new("base-port") + .long("base-port") + .help("Base port for the storage controller instance idenfified by instance-id (defaults to pagserver cplane api)") + .value_parser(value_parser!(u16)) + .required(false); + Command::new("Neon CLI") .arg_required_else_help(true) .version(GIT_VERSION) @@ -1609,9 +1664,12 @@ fn cli() -> Command { .arg_required_else_help(true) .about("Manage storage_controller") .subcommand(Command::new("start").about("Start storage controller") - .arg(timeout_arg.clone())) + .arg(timeout_arg.clone()) + .arg(instance_id.clone()) + .arg(base_port)) .subcommand(Command::new("stop").about("Stop storage controller") - .arg(stop_mode_arg.clone())) + .arg(stop_mode_arg.clone()) + .arg(instance_id)) ) .subcommand( Command::new("safekeeper") diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 15bbac702f..807519c88d 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -156,6 +156,11 @@ pub struct NeonStorageControllerConf { #[serde(with = "humantime_serde")] pub max_warming_up: Duration, + pub start_as_candidate: bool, + + /// Database url used when running multiple storage controller instances + pub database_url: Option, + /// Threshold for auto-splitting a tenant into shards pub split_threshold: Option, @@ -174,6 +179,8 @@ impl Default for NeonStorageControllerConf { Self { max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL, max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL, + start_as_candidate: false, + database_url: None, split_threshold: None, max_secondary_lag_bytes: None, } @@ -392,6 +399,36 @@ impl LocalEnv { } } + /// Inspect the base data directory and extract the instance id and instance directory path + /// for all storage controller instances + pub async fn storage_controller_instances(&self) -> std::io::Result> { + let mut instances = Vec::default(); + + let dir = std::fs::read_dir(self.base_data_dir.clone())?; + for dentry in dir { + let dentry = dentry?; + let is_dir = dentry.metadata()?.is_dir(); + let filename = dentry.file_name().into_string().unwrap(); + let parsed_instance_id = match filename.strip_prefix("storage_controller_") { + Some(suffix) => suffix.parse::().ok(), + None => None, + }; + + let is_instance_dir = is_dir && parsed_instance_id.is_some(); + + if !is_instance_dir { + continue; + } + + instances.push(( + parsed_instance_id.expect("Checked previously"), + dentry.path(), + )); + } + + Ok(instances) + } + pub fn register_branch_mapping( &mut self, branch_name: String, diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index f180e922e8..2c077595a1 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -3,6 +3,8 @@ use crate::{ local_env::{LocalEnv, NeonStorageControllerConf}, }; use camino::{Utf8Path, Utf8PathBuf}; +use hyper::Uri; +use nix::unistd::Pid; use pageserver_api::{ controller_api::{ NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, @@ -18,7 +20,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use reqwest::Method; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{fs, str::FromStr, time::Duration}; +use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::OnceLock}; use tokio::process::Command; use tracing::instrument; use url::Url; @@ -29,12 +31,14 @@ use utils::{ pub struct StorageController { env: LocalEnv, - listen: String, private_key: Option>, public_key: Option, - postgres_port: u16, client: reqwest::Client, config: NeonStorageControllerConf, + + // The listen addresses is learned when starting the storage controller, + // hence the use of OnceLock to init it at the right time. + listen: OnceLock, } const COMMAND: &str = "storage_controller"; @@ -43,6 +47,36 @@ const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16; const DB_NAME: &str = "storage_controller"; +pub struct NeonStorageControllerStartArgs { + pub instance_id: u8, + pub base_port: Option, + pub start_timeout: humantime::Duration, +} + +impl NeonStorageControllerStartArgs { + pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self { + Self { + instance_id: 1, + base_port: None, + start_timeout, + } + } +} + +pub struct NeonStorageControllerStopArgs { + pub instance_id: u8, + pub immediate: bool, +} + +impl NeonStorageControllerStopArgs { + pub fn with_default_instance_id(immediate: bool) -> Self { + Self { + instance_id: 1, + immediate, + } + } +} + #[derive(Serialize, Deserialize)] pub struct AttachHookRequest { pub tenant_shard_id: TenantShardId, @@ -67,23 +101,6 @@ pub struct InspectResponse { impl StorageController { pub fn from_env(env: &LocalEnv) -> Self { - // Makes no sense to construct this if pageservers aren't going to use it: assume - // pageservers have control plane API set - let listen_url = env.control_plane_api.clone().unwrap(); - - let listen = format!( - "{}:{}", - listen_url.host_str().unwrap(), - listen_url.port().unwrap() - ); - - // Convention: NeonEnv in python tests reserves the next port after the control_plane_api - // port, for use by our captive postgres. - let postgres_port = listen_url - .port() - .expect("Control plane API setting should always have a port") - + 1; - // Assume all pageservers have symmetric auth configuration: this service // expects to use one JWT token to talk to all of them. let ps_conf = env @@ -126,20 +143,28 @@ impl StorageController { Self { env: env.clone(), - listen, private_key, public_key, - postgres_port, client: reqwest::ClientBuilder::new() .build() .expect("Failed to construct http client"), config: env.storage_controller.clone(), + listen: OnceLock::default(), } } - fn pid_file(&self) -> Utf8PathBuf { - Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid")) - .expect("non-Unicode path") + fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf { + self.env + .base_data_dir + .join(format!("storage_controller_{}", instance_id)) + } + + fn pid_file(&self, instance_id: u8) -> Utf8PathBuf { + Utf8PathBuf::from_path_buf( + self.storage_controller_instance_dir(instance_id) + .join("storage_controller.pid"), + ) + .expect("non-Unicode path") } /// PIDFile for the postgres instance used to store storage controller state @@ -184,9 +209,9 @@ impl StorageController { } /// Readiness check for our postgres process - async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result { + async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result { let bin_path = pg_bin_dir.join("pg_isready"); - let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)]; + let args = ["-h", "localhost", "-p", &format!("{}", postgres_port)]; let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?; Ok(exitcode.success()) @@ -199,8 +224,8 @@ impl StorageController { /// who just want to run `cargo neon_local` without knowing about diesel. /// /// Returns the database url - pub async fn setup_database(&self) -> anyhow::Result { - let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port); + pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result { + let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port); let pg_bin_dir = self.get_pg_bin_dir().await?; let createdb_path = pg_bin_dir.join("createdb"); @@ -209,7 +234,7 @@ impl StorageController { "-h", "localhost", "-p", - &format!("{}", self.postgres_port), + &format!("{}", postgres_port), DB_NAME, ]) .output() @@ -230,13 +255,14 @@ impl StorageController { pub async fn connect_to_database( &self, + postgres_port: u16, ) -> anyhow::Result<( tokio_postgres::Client, tokio_postgres::Connection, )> { tokio_postgres::Config::new() .host("localhost") - .port(self.postgres_port) + .port(postgres_port) // The user is the ambient operating system user name. // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400 // @@ -252,72 +278,115 @@ impl StorageController { .map_err(anyhow::Error::new) } - pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { - // Start a vanilla Postgres process used by the storage controller for persistence. - let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) - .unwrap() - .join("storage_controller_db"); - let pg_bin_dir = self.get_pg_bin_dir().await?; - let pg_lib_dir = self.get_pg_lib_dir().await?; - let pg_log_path = pg_data_path.join("postgres.log"); + pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> { + let instance_dir = self.storage_controller_instance_dir(start_args.instance_id); + if let Err(err) = tokio::fs::create_dir(&instance_dir).await { + if err.kind() != std::io::ErrorKind::AlreadyExists { + panic!("Failed to create instance dir {instance_dir:?}"); + } + } - if !tokio::fs::try_exists(&pg_data_path).await? { - // Initialize empty database - let initdb_path = pg_bin_dir.join("initdb"); - let mut child = Command::new(&initdb_path) - .envs(vec![ - ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ]) - .args(["-D", pg_data_path.as_ref()]) - .spawn() - .expect("Failed to spawn initdb"); - let status = child.wait().await?; - if !status.success() { - anyhow::bail!("initdb failed with status {status}"); + let (listen, postgres_port) = { + if let Some(base_port) = start_args.base_port { + ( + format!("127.0.0.1:{base_port}"), + self.config + .database_url + .expect("--base-port requires NeonStorageControllerConf::database_url") + .port(), + ) + } else { + let listen_url = self.env.control_plane_api.clone().unwrap(); + + let listen = format!( + "{}:{}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + ); + + (listen, listen_url.port().unwrap() + 1) } }; - // Write a minimal config file: - // - Specify the port, since this is chosen dynamically - // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing - // the storage controller we don't want a slow local disk to interfere with that. - // - // NB: it's important that we rewrite this file on each start command so we propagate changes - // from `LocalEnv`'s config file (`.neon/config`). - tokio::fs::write( - &pg_data_path.join("postgresql.conf"), - format!("port = {}\nfsync=off\n", self.postgres_port), - ) - .await?; + let socket_addr = listen + .parse() + .expect("listen address is a valid socket address"); + self.listen + .set(socket_addr) + .expect("StorageController::listen is only set here"); - println!("Starting storage controller database..."); - let db_start_args = [ - "-w", - "-D", - pg_data_path.as_ref(), - "-l", - pg_log_path.as_ref(), - "start", - ]; + // Do we remove the pid file on stop? + let pg_started = self.is_postgres_running().await?; + let pg_lib_dir = self.get_pg_lib_dir().await?; - background_process::start_process( - "storage_controller_db", - &self.env.base_data_dir, - pg_bin_dir.join("pg_ctl").as_std_path(), - db_start_args, - vec![ - ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ], - background_process::InitialPidFile::Create(self.postgres_pid_file()), - retry_timeout, - || self.pg_isready(&pg_bin_dir), - ) - .await?; + if !pg_started { + // Start a vanilla Postgres process used by the storage controller for persistence. + let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) + .unwrap() + .join("storage_controller_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + let pg_log_path = pg_data_path.join("postgres.log"); - // Run migrations on every startup, in case something changed. - let database_url = self.setup_database().await?; + if !tokio::fs::try_exists(&pg_data_path).await? { + // Initialize empty database + let initdb_path = pg_bin_dir.join("initdb"); + let mut child = Command::new(&initdb_path) + .envs(vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ]) + .args(["-D", pg_data_path.as_ref()]) + .spawn() + .expect("Failed to spawn initdb"); + let status = child.wait().await?; + if !status.success() { + anyhow::bail!("initdb failed with status {status}"); + } + }; + + // Write a minimal config file: + // - Specify the port, since this is chosen dynamically + // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing + // the storage controller we don't want a slow local disk to interfere with that. + // + // NB: it's important that we rewrite this file on each start command so we propagate changes + // from `LocalEnv`'s config file (`.neon/config`). + tokio::fs::write( + &pg_data_path.join("postgresql.conf"), + format!("port = {}\nfsync=off\n", postgres_port), + ) + .await?; + + println!("Starting storage controller database..."); + let db_start_args = [ + "-w", + "-D", + pg_data_path.as_ref(), + "-l", + pg_log_path.as_ref(), + "start", + ]; + + background_process::start_process( + "storage_controller_db", + &self.env.base_data_dir, + pg_bin_dir.join("pg_ctl").as_std_path(), + db_start_args, + vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ], + background_process::InitialPidFile::Create(self.postgres_pid_file()), + &start_args.start_timeout, + || self.pg_isready(&pg_bin_dir, postgres_port), + ) + .await?; + + // Run migrations on every startup, in case something changed. + self.setup_database(postgres_port).await?; + } + + let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port); // We support running a startup SQL script to fiddle with the database before we launch storcon. // This is used by the test suite. @@ -339,7 +408,7 @@ impl StorageController { } } }; - let (mut client, conn) = self.connect_to_database().await?; + let (mut client, conn) = self.connect_to_database(postgres_port).await?; let conn = tokio::spawn(conn); let tx = client.build_transaction(); let tx = tx.start().await?; @@ -348,9 +417,20 @@ impl StorageController { drop(client); conn.await??; + let listen = self + .listen + .get() + .expect("cell is set earlier in this function"); + let address_for_peers = Uri::builder() + .scheme("http") + .authority(format!("{}:{}", listen.ip(), listen.port())) + .path_and_query("") + .build() + .unwrap(); + let mut args = vec![ "-l", - &self.listen, + &listen.to_string(), "--dev", "--database-url", &database_url, @@ -358,10 +438,17 @@ impl StorageController { &humantime::Duration::from(self.config.max_offline).to_string(), "--max-warming-up-interval", &humantime::Duration::from(self.config.max_warming_up).to_string(), + "--address-for-peers", + &address_for_peers.to_string(), ] .into_iter() .map(|s| s.to_string()) .collect::>(); + + if self.config.start_as_candidate { + args.push("--start-as-candidate".to_string()); + } + if let Some(private_key) = &self.private_key { let claims = Claims::new(None, Scope::PageServerApi); let jwt_token = @@ -394,15 +481,15 @@ impl StorageController { background_process::start_process( COMMAND, - &self.env.base_data_dir, + &instance_dir, &self.env.storage_controller_bin(), args, vec![ ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ], - background_process::InitialPidFile::Create(self.pid_file()), - retry_timeout, + background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)), + &start_args.start_timeout, || async { match self.ready().await { Ok(_) => Ok(true), @@ -415,8 +502,35 @@ impl StorageController { Ok(()) } - pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> { - background_process::stop_process(immediate, COMMAND, &self.pid_file())?; + pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> { + background_process::stop_process( + stop_args.immediate, + COMMAND, + &self.pid_file(stop_args.instance_id), + )?; + + let storcon_instances = self.env.storage_controller_instances().await?; + for (instance_id, instanced_dir_path) in storcon_instances { + if instance_id == stop_args.instance_id { + continue; + } + + let pid_file = instanced_dir_path.join("storage_controller.pid"); + let pid = tokio::fs::read_to_string(&pid_file) + .await + .map_err(|err| { + anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}") + })? + .parse::() + .expect("pid is valid i32"); + + let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?; + if other_proc_alive { + // There is another storage controller instance running, so we return + // and leave the database running. + return Ok(()); + } + } let pg_data_path = self.env.base_data_dir.join("storage_controller_db"); let pg_bin_dir = self.get_pg_bin_dir().await?; @@ -429,27 +543,51 @@ impl StorageController { .wait() .await?; if !stop_status.success() { - let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"]; - let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl")) - .args(pg_status_args) - .spawn()? - .wait() - .await?; - - // pg_ctl status returns this exit code if postgres is not running: in this case it is - // fine that stop failed. Otherwise it is an error that stop failed. - const PG_STATUS_NOT_RUNNING: i32 = 3; - if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() { - println!("Storage controller database is already stopped"); - return Ok(()); - } else { - anyhow::bail!("Failed to stop storage controller database: {stop_status}") + match self.is_postgres_running().await { + Ok(false) => { + println!("Storage controller database is already stopped"); + return Ok(()); + } + Ok(true) => { + anyhow::bail!("Failed to stop storage controller database"); + } + Err(err) => { + anyhow::bail!("Failed to stop storage controller database: {err}"); + } } } Ok(()) } + async fn is_postgres_running(&self) -> anyhow::Result { + let pg_data_path = self.env.base_data_dir.join("storage_controller_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + + let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"]; + let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl")) + .args(pg_status_args) + .spawn()? + .wait() + .await?; + + // pg_ctl status returns this exit code if postgres is not running: in this case it is + // fine that stop failed. Otherwise it is an error that stop failed. + const PG_STATUS_NOT_RUNNING: i32 = 3; + const PG_NO_DATA_DIR: i32 = 4; + const PG_STATUS_RUNNING: i32 = 0; + match status_exitcode.code() { + Some(PG_STATUS_NOT_RUNNING) => Ok(false), + Some(PG_NO_DATA_DIR) => Ok(false), + Some(PG_STATUS_RUNNING) => Ok(true), + Some(code) => Err(anyhow::anyhow!( + "pg_ctl status returned unexpected status code: {:?}", + code + )), + None => Err(anyhow::anyhow!("pg_ctl status returned no status code")), + } + } + fn get_claims_for_path(path: &str) -> anyhow::Result> { let category = match path.find('/') { Some(idx) => &path[..idx], @@ -475,15 +613,31 @@ impl StorageController { RQ: Serialize + Sized, RS: DeserializeOwned + Sized, { - // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out - // for general purpose API access. - let listen_url = self.env.control_plane_api.clone().unwrap(); - let url = Url::from_str(&format!( - "http://{}:{}/{path}", - listen_url.host_str().unwrap(), - listen_url.port().unwrap() - )) - .unwrap(); + // In the special case of the `storage_controller start` subcommand, we wish + // to use the API endpoint of the newly started storage controller in order + // to pass the readiness check. In this scenario [`Self::listen`] will be set + // (see [`Self::start`]). + // + // Otherwise, we infer the storage controller api endpoint from the configured + // control plane API. + let url = if let Some(socket_addr) = self.listen.get() { + Url::from_str(&format!( + "http://{}:{}/{path}", + socket_addr.ip().to_canonical(), + socket_addr.port() + )) + .unwrap() + } else { + // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out + // for general purpose API access. + let listen_url = self.env.control_plane_api.clone().unwrap(); + Url::from_str(&format!( + "http://{}:{}/{path}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + )) + .unwrap() + }; let mut builder = self.client.request(method, url); if let Some(body) = body { diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index e755aaed19..7bbd1541cf 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -520,6 +520,19 @@ async fn handle_node_status(req: Request) -> Result, ApiErr json_response(StatusCode::OK, node_status) } +async fn handle_get_leader(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let leader = state.service.get_leader().await.map_err(|err| { + ApiError::InternalServerError(anyhow::anyhow!( + "Failed to read leader from database: {err}" + )) + })?; + + json_response(StatusCode::OK, leader) +} + async fn handle_node_drain(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; @@ -1016,6 +1029,9 @@ pub fn make_router( .get("/control/v1/node/:node_id", |r| { named_request_span(r, handle_node_status, RequestName("control_v1_node_status")) }) + .get("/control/v1/leader", |r| { + named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader")) + }) .put("/control/v1/node/:node_id/drain", |r| { named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain")) }) diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs index ebb59a1720..3f8520fe55 100644 --- a/storage_controller/src/peer_client.rs +++ b/storage_controller/src/peer_client.rs @@ -1,7 +1,7 @@ use crate::tenant_shard::ObservedState; use pageserver_api::shard::TenantShardId; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use tokio_util::sync::CancellationToken; use hyper::Uri; @@ -69,6 +69,8 @@ impl PeerClient { req }; + let req = req.timeout(Duration::from_secs(2)); + let res = req .send() .await diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 84db088a42..3459b44774 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -20,7 +20,8 @@ use crate::{ metrics, peer_client::{GlobalObservedState, PeerClient}, persistence::{ - AbortShardSplitStatus, ControllerPersistence, MetadataHealthPersistence, TenantFilter, + AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence, + TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, @@ -489,11 +490,6 @@ pub(crate) enum ReconcileResultRequest { Stop, } -struct LeaderStepDownState { - observed: GlobalObservedState, - leader: ControllerPersistence, -} - impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -504,7 +500,8 @@ impl Service { #[instrument(skip_all)] async fn startup_reconcile( self: &Arc, - leader_step_down_state: Option, + current_leader: Option, + leader_step_down_state: Option, bg_compute_notify_result_tx: tokio::sync::mpsc::Sender< Result<(), (TenantShardId, NotifyError)>, >, @@ -522,17 +519,15 @@ impl Service { .checked_add(STARTUP_RECONCILE_TIMEOUT / 2) .expect("Reconcile timeout is a modest constant"); - let (observed, current_leader) = if let Some(state) = leader_step_down_state { + let observed = if let Some(state) = leader_step_down_state { tracing::info!( "Using observed state received from leader at {}", - state.leader.address, + current_leader.as_ref().unwrap().address ); - (state.observed, Some(state.leader)) + + state } else { - ( - self.build_global_observed_state(node_scan_deadline).await, - None, - ) + self.build_global_observed_state(node_scan_deadline).await }; // Accumulate a list of any tenant locations that ought to be detached @@ -1382,13 +1377,32 @@ impl Service { }; let leadership_status = this.inner.read().unwrap().get_leadership_status(); - let peer_observed_state = match leadership_status { - LeadershipStatus::Candidate => this.request_step_down().await, + let leader = match this.get_leader().await { + Ok(ok) => ok, + Err(err) => { + tracing::error!( + "Failed to query database for current leader: {err}. Aborting start-up ..." + ); + std::process::exit(1); + } + }; + + let leader_step_down_state = match leadership_status { + LeadershipStatus::Candidate => { + if let Some(ref leader) = leader { + this.request_step_down(leader).await + } else { + tracing::info!( + "No leader found to request step down from. Will build observed state." + ); + None + } + } LeadershipStatus::Leader => None, LeadershipStatus::SteppedDown => unreachable!(), }; - this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx) + this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx) .await; drop(startup_completion); @@ -4650,6 +4664,10 @@ impl Service { )) } + pub(crate) async fn get_leader(&self) -> DatabaseResult> { + self.persistence.get_leader().await + } + pub(crate) async fn node_register( &self, register_req: NodeRegisterRequest, @@ -6342,6 +6360,7 @@ impl Service { pub(crate) async fn step_down(&self) -> GlobalObservedState { tracing::info!("Received step down request from peer"); + failpoint_support::sleep_millis_async!("sleep-on-step-down-handling"); self.inner.write().unwrap().step_down(); // TODO: would it make sense to have a time-out for this? @@ -6367,50 +6386,31 @@ impl Service { /// /// On failures to query the database or step down error responses the process is killed /// and we rely on k8s to retry. - async fn request_step_down(&self) -> Option { - let leader = match self.persistence.get_leader().await { - Ok(leader) => leader, + async fn request_step_down( + &self, + leader: &ControllerPersistence, + ) -> Option { + tracing::info!("Sending step down request to {leader:?}"); + + // TODO: jwt token + let client = PeerClient::new( + Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), + self.config.jwt_token.clone(), + ); + let state = client.step_down(&self.cancel).await; + match state { + Ok(state) => Some(state), Err(err) => { + // TODO: Make leaders periodically update a timestamp field in the + // database and, if the leader is not reachable from the current instance, + // but inferred as alive from the timestamp, abort start-up. This avoids + // a potential scenario in which we have two controllers acting as leaders. tracing::error!( - "Failed to query database for current leader: {err}. Aborting start-up ..." + "Leader ({}) did not respond to step-down request: {}", + leader.address, + err ); - std::process::exit(1); - } - }; - match leader { - Some(leader) => { - tracing::info!("Sending step down request to {leader:?}"); - - // TODO: jwt token - let client = PeerClient::new( - Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), - self.config.jwt_token.clone(), - ); - let state = client.step_down(&self.cancel).await; - match state { - Ok(state) => Some(LeaderStepDownState { - observed: state, - leader: leader.clone(), - }), - Err(err) => { - // TODO: Make leaders periodically update a timestamp field in the - // database and, if the leader is not reachable from the current instance, - // but inferred as alive from the timestamp, abort start-up. This avoids - // a potential scenario in which we have two controllers acting as leaders. - tracing::error!( - "Leader ({}) did not respond to step-down request: {}", - leader.address, - err - ); - None - } - } - } - None => { - tracing::info!( - "No leader found to request step down from. Will build observed state." - ); None } } diff --git a/test_runner/conftest.py b/test_runner/conftest.py index 4b0c9ac71d..996ca4d652 100644 --- a/test_runner/conftest.py +++ b/test_runner/conftest.py @@ -3,6 +3,7 @@ pytest_plugins = ( "fixtures.parametrize", "fixtures.httpserver", "fixtures.compute_reconfigure", + "fixtures.storage_controller_proxy", "fixtures.neon_fixtures", "fixtures.benchmark_fixture", "fixtures.pg_stats", diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b76432127d..ec5a83601e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -497,6 +497,7 @@ class NeonEnvBuilder: pageserver_aux_file_policy: Optional[AuxFileStore] = None, pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None, safekeeper_extra_opts: Optional[list[str]] = None, + storage_controller_port_override: Optional[int] = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -549,6 +550,8 @@ class NeonEnvBuilder: self.safekeeper_extra_opts = safekeeper_extra_opts + self.storage_controller_port_override = storage_controller_port_override + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1054,6 +1057,7 @@ class NeonEnv: """ BASE_PAGESERVER_ID = 1 + storage_controller: NeonStorageController | NeonProxiedStorageController def __init__(self, config: NeonEnvBuilder): self.repo_dir = config.repo_dir @@ -1084,27 +1088,41 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline - # Find two adjacent ports for storage controller and its postgres DB. This - # loop would eventually throw from get_port() if we run out of ports (extremely - # unlikely): usually we find two adjacent free ports on the first iteration. - while True: - self.storage_controller_port = self.port_distributor.get_port() - storage_controller_pg_port = self.port_distributor.get_port() - if storage_controller_pg_port == self.storage_controller_port + 1: - break - # The URL for the pageserver to use as its control_plane_api config - self.control_plane_api: str = f"http://127.0.0.1:{self.storage_controller_port}/upcall/v1" - # The base URL of the storage controller - self.storage_controller_api: str = f"http://127.0.0.1:{self.storage_controller_port}" + if config.storage_controller_port_override is not None: + log.info( + f"Using storage controller api override {config.storage_controller_port_override}" + ) + + self.storage_controller_port = config.storage_controller_port_override + self.storage_controller = NeonProxiedStorageController( + self, config.storage_controller_port_override, config.auth_enabled + ) + else: + # Find two adjacent ports for storage controller and its postgres DB. This + # loop would eventually throw from get_port() if we run out of ports (extremely + # unlikely): usually we find two adjacent free ports on the first iteration. + while True: + storage_controller_port = self.port_distributor.get_port() + storage_controller_pg_port = self.port_distributor.get_port() + if storage_controller_pg_port == storage_controller_port + 1: + break + + self.storage_controller_port = storage_controller_port + self.storage_controller = NeonStorageController( + self, storage_controller_port, config.auth_enabled + ) + + log.info( + f"Using generated control_plane_api: {self.storage_controller.upcall_api_endpoint()}" + ) + + self.storage_controller_api: str = self.storage_controller.api_root() + self.control_plane_api: str = self.storage_controller.upcall_api_endpoint() # For testing this with a fake HTTP server, enable passing through a URL from config self.control_plane_compute_hook_api = config.control_plane_compute_hook_api - self.storage_controller: NeonStorageController = NeonStorageController( - self, config.auth_enabled - ) - self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine self.pageserver_aux_file_policy = config.pageserver_aux_file_policy @@ -1869,16 +1887,24 @@ class NeonCli(AbstractNeonCli): def storage_controller_start( self, timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, ): cmd = ["storage_controller", "start"] if timeout_in_seconds is not None: cmd.append(f"--start-timeout={timeout_in_seconds}s") + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") + if base_port is not None: + cmd.append(f"--base-port={base_port}") return self.raw_cli(cmd) - def storage_controller_stop(self, immediate: bool): + def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None): cmd = ["storage_controller", "stop"] if immediate: cmd.extend(["-m", "immediate"]) + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") return self.raw_cli(cmd) def pageserver_start( @@ -2189,17 +2215,30 @@ class PageserverSchedulingPolicy(str, Enum): PAUSE_FOR_RESTART = "PauseForRestart" +class StorageControllerLeadershipStatus(str, Enum): + LEADER = "leader" + STEPPED_DOWN = "stepped_down" + CANDIDATE = "candidate" + + class NeonStorageController(MetricsGetter, LogUtils): - def __init__(self, env: NeonEnv, auth_enabled: bool): + def __init__(self, env: NeonEnv, port: int, auth_enabled: bool): self.env = env + self.port: int = port + self.api: str = f"http://127.0.0.1:{port}" self.running = False self.auth_enabled = auth_enabled self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS - self.logfile = self.workdir / "storage_controller.log" + self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log" - def start(self, timeout_in_seconds: Optional[int] = None): + def start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): assert not self.running - self.env.neon_cli.storage_controller_start(timeout_in_seconds) + self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) self.running = True return self @@ -2209,6 +2248,12 @@ class NeonStorageController(MetricsGetter, LogUtils): self.running = False return self + def upcall_api_endpoint(self) -> str: + return f"{self.api}/upcall/v1" + + def api_root(self) -> str: + return self.api + @staticmethod def retryable_node_operation(op, ps_id, max_attempts, backoff): while max_attempts > 0: @@ -2237,7 +2282,9 @@ class NeonStorageController(MetricsGetter, LogUtils): def assert_no_errors(self): assert_no_errors( - self.env.repo_dir / "storage_controller.log", "storage_controller", self.allowed_errors + self.logfile, + "storage_controller", + self.allowed_errors, ) def pageserver_api(self) -> PageserverHttpClient: @@ -2249,7 +2296,7 @@ class NeonStorageController(MetricsGetter, LogUtils): auth_token = None if self.auth_enabled: auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API) - return PageserverHttpClient(self.env.storage_controller_port, lambda: True, auth_token) + return PageserverHttpClient(self.port, lambda: True, auth_token) def request(self, method, *args, **kwargs) -> requests.Response: resp = requests.request(method, *args, **kwargs) @@ -2266,13 +2313,13 @@ class NeonStorageController(MetricsGetter, LogUtils): return headers def get_metrics(self) -> Metrics: - res = self.request("GET", f"{self.env.storage_controller_api}/metrics") + res = self.request("GET", f"{self.api}/metrics") return parse_metrics(res.text) def ready(self) -> bool: status = None try: - resp = self.request("GET", f"{self.env.storage_controller_api}/ready") + resp = self.request("GET", f"{self.api}/ready") status = resp.status_code except StorageControllerApiException as e: status = e.status_code @@ -2305,7 +2352,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/attach-hook", + f"{self.api}/debug/v1/attach-hook", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2316,7 +2363,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]): self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/attach-hook", + f"{self.api}/debug/v1/attach-hook", json={"tenant_shard_id": str(tenant_shard_id), "node_id": None}, headers=self.headers(TokenScope.ADMIN), ) @@ -2327,7 +2374,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/inspect", + f"{self.api}/debug/v1/inspect", json={"tenant_shard_id": str(tenant_shard_id)}, headers=self.headers(TokenScope.ADMIN), ) @@ -2350,7 +2397,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_register({body})") self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/node", + f"{self.api}/control/v1/node", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2359,7 +2406,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_delete({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + f"{self.api}/control/v1/node/{node_id}", headers=self.headers(TokenScope.ADMIN), ) @@ -2367,7 +2414,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_drain({node_id})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + f"{self.api}/control/v1/node/{node_id}/drain", headers=self.headers(TokenScope.ADMIN), ) @@ -2375,7 +2422,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"cancel_node_drain({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + f"{self.api}/control/v1/node/{node_id}/drain", headers=self.headers(TokenScope.ADMIN), ) @@ -2383,7 +2430,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_fill({node_id})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + f"{self.api}/control/v1/node/{node_id}/fill", headers=self.headers(TokenScope.ADMIN), ) @@ -2391,14 +2438,22 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"cancel_node_fill({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + f"{self.api}/control/v1/node/{node_id}/fill", headers=self.headers(TokenScope.ADMIN), ) def node_status(self, node_id): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + f"{self.api}/control/v1/node/{node_id}", + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + + def get_leader(self): + response = self.request( + "GET", + f"{self.api}/control/v1/leader", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2406,7 +2461,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def node_list(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/node", + f"{self.api}/control/v1/node", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2414,7 +2469,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_list(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/debug/v1/tenant", + f"{self.api}/debug/v1/tenant", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2424,7 +2479,7 @@ class NeonStorageController(MetricsGetter, LogUtils): body["node_id"] = node_id self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/config", + f"{self.api}/control/v1/node/{node_id}/config", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2459,7 +2514,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/v1/tenant", + f"{self.api}/v1/tenant", json=body, headers=self.headers(TokenScope.PAGE_SERVER_API), ) @@ -2472,7 +2527,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "GET", - f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/locate", + f"{self.api}/debug/v1/tenant/{tenant_id}/locate", headers=self.headers(TokenScope.ADMIN), ) body = response.json() @@ -2485,7 +2540,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}", + f"{self.api}/control/v1/tenant/{tenant_id}", headers=self.headers(TokenScope.ADMIN), ) response.raise_for_status() @@ -2496,7 +2551,7 @@ class NeonStorageController(MetricsGetter, LogUtils): ) -> list[TenantShardId]: response = self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split", + f"{self.api}/control/v1/tenant/{tenant_id}/shard_split", json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size}, headers=self.headers(TokenScope.ADMIN), ) @@ -2508,7 +2563,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int): self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_shard_id}/migrate", + f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate", json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}, headers=self.headers(TokenScope.ADMIN), ) @@ -2519,7 +2574,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"tenant_policy_update({tenant_id}, {body})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy", + f"{self.api}/control/v1/tenant/{tenant_id}/policy", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2527,14 +2582,14 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_import(self, tenant_id: TenantId): self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/import", + f"{self.api}/debug/v1/tenant/{tenant_id}/import", headers=self.headers(TokenScope.ADMIN), ) def reconcile_all(self): r = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/reconcile_all", + f"{self.api}/debug/v1/reconcile_all", headers=self.headers(TokenScope.ADMIN), ) r.raise_for_status() @@ -2567,7 +2622,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/consistency_check", + f"{self.api}/debug/v1/consistency_check", headers=self.headers(TokenScope.ADMIN), ) log.info("storage controller passed consistency check") @@ -2640,7 +2695,7 @@ class NeonStorageController(MetricsGetter, LogUtils): self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/metadata_health/update", + f"{self.api}/control/v1/metadata_health/update", json=body, headers=self.headers(TokenScope.SCRUBBER), ) @@ -2648,7 +2703,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def metadata_health_list_unhealthy(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy", + f"{self.api}/control/v1/metadata_health/unhealthy", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2658,7 +2713,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated", + f"{self.api}/control/v1/metadata_health/outdated", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2681,7 +2736,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info("Asking storage controller to step down") response = self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/step_down", + f"{self.api}/control/v1/step_down", headers=self.headers(TokenScope.ADMIN), ) @@ -2698,7 +2753,7 @@ class NeonStorageController(MetricsGetter, LogUtils): res = self.request( "PUT", - f"{self.env.storage_controller_api}/debug/v1/failpoints", + f"{self.api}/debug/v1/failpoints", json=[{"name": name, "actions": actions} for name, actions in pairs], headers=self.headers(TokenScope.ADMIN), ) @@ -2768,9 +2823,21 @@ class NeonStorageController(MetricsGetter, LogUtils): parsed_tid, wait_ms=250 ) - @property - def workdir(self) -> Path: - return self.env.repo_dir + def get_leadership_status(self) -> StorageControllerLeadershipStatus: + metric_values = {} + for status in StorageControllerLeadershipStatus: + metric_value = self.get_metric_value( + "storage_controller_leadership_status", filter={"status": status} + ) + metric_values[status] = metric_value + + assert list(metric_values.values()).count(1) == 1 + + for status, metric_value in metric_values.items(): + if metric_value == 1: + return status + + raise AssertionError("unreachable") def __enter__(self) -> "NeonStorageController": return self @@ -2784,6 +2851,59 @@ class NeonStorageController(MetricsGetter, LogUtils): self.stop(immediate=True) +class NeonProxiedStorageController(NeonStorageController): + def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool): + super(NeonProxiedStorageController, self).__init__(env, proxy_port, auth_enabled) + self.instances: dict[int, dict[str, Any]] = {} + + def start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): + assert instance_id is not None and base_port is not None + + self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) + self.instances[instance_id] = {"running": True} + + self.running = True + return self + + def stop_instance( + self, immediate: bool = False, instance_id: Optional[int] = None + ) -> "NeonStorageController": + assert instance_id in self.instances + if self.instances[instance_id]["running"]: + self.env.neon_cli.storage_controller_stop(immediate, instance_id) + self.instances[instance_id]["running"] = False + + self.running = any(meta["running"] for meta in self.instances.values()) + return self + + def stop(self, immediate: bool = False) -> "NeonStorageController": + for iid, details in self.instances.items(): + if details["running"]: + self.env.neon_cli.storage_controller_stop(immediate, iid) + self.instances[iid]["running"] = False + + self.running = False + return self + + def assert_no_errors(self): + for instance_id in self.instances.keys(): + assert_no_errors( + self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log", + "storage_controller", + self.allowed_errors, + ) + + def log_contains( + self, pattern: str, offset: None | LogCursor = None + ) -> Optional[Tuple[str, LogCursor]]: + raise NotImplementedError() + + @dataclass class LogCursor: _line_no: int @@ -4520,7 +4640,7 @@ class StorageScrubber: base_args = [ str(self.env.neon_binpath / "storage_scrubber"), - f"--controller-api={self.env.storage_controller_api}", + f"--controller-api={self.env.storage_controller.api_root()}", ] args = base_args + args diff --git a/test_runner/fixtures/storage_controller_proxy.py b/test_runner/fixtures/storage_controller_proxy.py new file mode 100644 index 0000000000..3477f8b1f2 --- /dev/null +++ b/test_runner/fixtures/storage_controller_proxy.py @@ -0,0 +1,73 @@ +import re +from typing import Any, Optional + +import pytest +import requests +from pytest_httpserver import HTTPServer +from werkzeug.datastructures import Headers +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + +from fixtures.log_helper import log + + +class StorageControllerProxy: + def __init__(self, server: HTTPServer): + self.server: HTTPServer = server + self.listen: str = f"http://{server.host}:{server.port}" + self.routing_to: Optional[str] = None + + def route_to(self, storage_controller_api: str): + self.routing_to = storage_controller_api + + def port(self) -> int: + return self.server.port + + def upcall_api_endpoint(self) -> str: + return f"{self.listen}/upcall/v1" + + +def proxy_request(method: str, url: str, **kwargs) -> requests.Response: + return requests.request(method, url, **kwargs) + + +@pytest.fixture(scope="function") +def storage_controller_proxy(make_httpserver): + """ + Proxies requests into the storage controller to the currently + selected storage controller instance via `StorageControllerProxy.route_to`. + + This fixture is intended for tests that need to run multiple instances + of the storage controller at the same time. + """ + server = make_httpserver + + self = StorageControllerProxy(server) + + log.info(f"Storage controller proxy listening on {self.listen}") + + def handler(request: Request): + if self.route_to is None: + log.info(f"Storage controller proxy has no routing configured for {request.url}") + return Response("Routing not configured", status=503) + + route_to_url = f"{self.routing_to}{request.path}" + + log.info(f"Routing {request.url} to {route_to_url}") + + args: dict[str, Any] = {"headers": request.headers} + if request.is_json: + args["json"] = request.json + + response = proxy_request(request.method, route_to_url, **args) + + headers = Headers() + for key, value in response.headers.items(): + headers.add(key, value) + + return Response(response.content, headers=headers, status=response.status_code) + + self.server.expect_request(re.compile(".*")).respond_with_handler(handler) + + yield self + server.clear() diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 4dc9f7caae..80f1c9e4e3 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -403,7 +403,7 @@ def wait_until( try: res = func() except Exception as e: - log.info("waiting for %s iteration %s failed", func, i + 1) + log.info("waiting for %s iteration %s failed: %s", func, i + 1, e) last_exception = e if show_intermediate_error: log.info(e) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 7d98ff2923..95c35e9641 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -1,3 +1,4 @@ +import concurrent.futures import json import threading import time @@ -16,6 +17,7 @@ from fixtures.neon_fixtures import ( PageserverSchedulingPolicy, PgBin, StorageControllerApiException, + StorageControllerLeadershipStatus, TokenScope, last_flush_lsn_upload, ) @@ -30,7 +32,9 @@ from fixtures.pageserver.utils import ( timeline_delete_wait_completed, ) from fixtures.pg_version import PgVersion +from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import RemoteStorageKind, s3_storage +from fixtures.storage_controller_proxy import StorageControllerProxy from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until from fixtures.workload import Workload from mypy_boto3_s3.type_defs import ( @@ -2093,6 +2097,131 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder): ) +# This is a copy of NeonEnv.start which injects the instance id and port +# into the call to NeonStorageController.start +def start_env(env: NeonEnv, storage_controller_port: int): + timeout_in_seconds = 30 + + # Storage controller starts first, so that pageserver /re-attach calls don't + # bounce through retries on startup + env.storage_controller.start(timeout_in_seconds, 1, storage_controller_port) + + # Wait for storage controller readiness to prevent unnecessary post start-up + # reconcile. + env.storage_controller.wait_until_ready() + + # Start up broker, pageserver and all safekeepers + futs = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=2 + len(env.pageservers) + len(env.safekeepers) + ) as executor: + futs.append( + executor.submit(lambda: env.broker.try_start() or None) + ) # The `or None` is for the linter + + for pageserver in env.pageservers: + futs.append( + executor.submit( + lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) + ) + ) + + for safekeeper in env.safekeepers: + futs.append( + executor.submit( + lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) + ) + ) + + for f in futs: + f.result() + + +@pytest.mark.parametrize("step_down_times_out", [False, True]) +def test_storage_controller_leadership_transfer( + neon_env_builder: NeonEnvBuilder, + storage_controller_proxy: StorageControllerProxy, + port_distributor: PortDistributor, + step_down_times_out: bool, +): + neon_env_builder.num_pageservers = 3 + + neon_env_builder.storage_controller_config = { + "database_url": f"127.0.0.1:{port_distributor.get_port()}", + "start_as_candidate": True, + } + + neon_env_builder.storage_controller_port_override = storage_controller_proxy.port() + + storage_controller_1_port = port_distributor.get_port() + storage_controller_2_port = port_distributor.get_port() + + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") + + env = neon_env_builder.init_configs() + start_env(env, storage_controller_1_port) + + assert ( + env.storage_controller.get_leadership_status() == StorageControllerLeadershipStatus.LEADER + ) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_1_port}/" + + if step_down_times_out: + env.storage_controller.configure_failpoints( + ("sleep-on-step-down-handling", "return(10000)") + ) + env.storage_controller.allowed_errors.append(".*request was dropped before completing.*") + + tenant_count = 2 + shard_count = 4 + tenants = set(TenantId.generate() for _ in range(0, tenant_count)) + + for tid in tenants: + env.storage_controller.tenant_create( + tid, shard_count=shard_count, placement_policy={"Attached": 1} + ) + env.storage_controller.reconcile_until_idle() + + env.storage_controller.start( + timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port + ) + + if not step_down_times_out: + + def previous_stepped_down(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.STEPPED_DOWN + ) + + wait_until(5, 1, previous_stepped_down) + + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") + + def new_becomes_leader(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.LEADER + ) + + wait_until(15, 1, new_becomes_leader) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/" + + env.storage_controller.wait_until_ready() + env.storage_controller.consistency_check() + + if step_down_times_out: + env.storage_controller.allowed_errors.extend( + [ + ".*Leader.*did not respond to step-down request.*", + ".*Send step down request failed.*", + ".*Send step down request still failed.*", + ] + ) + + def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder): # single unsharded tenant, two locations neon_env_builder.num_pageservers = 2 From 25e7d321f474e5cbc5ac53ed42de697a48db50db Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 6 Aug 2024 12:51:05 +0300 Subject: [PATCH 13/14] safekeeper: cross check divergence point in ProposerElected handling. Previously, we protected from multiple ProposerElected messages from the same walproposer with the following condition: msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at It is not exhaustive, i.e. we could still proceed to truncating WAL even though safekeeper inserted something since the divergence point has been calculated. While it was most likely safe because walproposer can't use safekeeper position to commit WAL until last_log_term reaches the current walproposer term, let's be more careful and properly calculate the divergence point like walproposer does. --- safekeeper/src/safekeeper.rs | 62 +++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 33ec39b852..0814d9ba67 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -92,7 +92,7 @@ impl TermHistory { } /// Find point of divergence between leader (walproposer) term history and - /// safekeeper. Arguments are not symmetrics as proposer history ends at + /// safekeeper. Arguments are not symmetric as proposer history ends at /// +infinity while safekeeper at flush_lsn. /// C version is at walproposer SendProposerElected. pub fn find_highest_common_point( @@ -701,7 +701,13 @@ where .with_label_values(&["handle_elected"]) .start_timer(); - info!("received ProposerElected {:?}", msg); + info!( + "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}", + msg, + self.state.acceptor_state.term, + self.get_last_log_term(), + self.flush_lsn() + ); if self.state.acceptor_state.term < msg.term { let mut state = self.state.start_change(); state.acceptor_state.term = msg.term; @@ -713,22 +719,43 @@ where return Ok(None); } - // This might happen in a rare race when another (old) connection from - // the same walproposer writes + flushes WAL after this connection - // already sent flush_lsn in VoteRequest. It is generally safe to - // proceed, but to prevent commit_lsn surprisingly going down we should - // either refuse the session (simpler) or skip the part we already have - // from the stream (can be implemented). - if msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at { - bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help", - msg.term, self.flush_lsn(), msg.start_streaming_at) + // Before truncating WAL check-cross the check divergence point received + // from the walproposer. + let sk_th = self.get_term_history(); + let last_common_point = match TermHistory::find_highest_common_point( + &msg.term_history, + &sk_th, + self.flush_lsn(), + ) { + // No common point. Expect streaming from the beginning of the + // history like walproposer while we don't have proper init. + None => *msg.term_history.0.first().ok_or(anyhow::anyhow!( + "empty walproposer term history {:?}", + msg.term_history + ))?, + Some(lcp) => lcp, + }; + // This is expected to happen in a rare race when another connection + // from the same walproposer writes + flushes WAL after this connection + // sent flush_lsn in VoteRequest; for instance, very late + // ProposerElected message delivery after another connection was + // established and wrote WAL. In such cases error is transient; + // reconnection makes safekeeper send newest term history and flush_lsn + // and walproposer recalculates the streaming point. OTOH repeating + // error indicates a serious bug. + if last_common_point.lsn != msg.start_streaming_at { + bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}", + last_common_point, msg.start_streaming_at, + self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history, + ); } - // Otherwise we must never attempt to truncate committed data. + + // We are also expected to never attempt to truncate committed data. assert!( msg.start_streaming_at >= self.state.inmem.commit_lsn, - "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}", - msg.start_streaming_at, - self.state.inmem.commit_lsn + "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}", + msg.start_streaming_at, self.state.inmem.commit_lsn, + self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history, ); // Before first WAL write initialize its segment. It makes first segment @@ -743,9 +770,6 @@ where .await?; } - // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to - // intersection of our history and history from msg - // truncate wal, update the LSNs self.wal_store.truncate_wal(msg.start_streaming_at).await?; @@ -1069,7 +1093,7 @@ mod tests { let pem = ProposerElected { term: 1, - start_streaming_at: Lsn(1), + start_streaming_at: Lsn(3), term_history: TermHistory(vec![TermLsn { term: 1, lsn: Lsn(3), From e2d89f7991bc9cea88661e50722a02346b7b6485 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 16 Aug 2024 13:35:02 +0100 Subject: [PATCH 14/14] pageserver: prioritize secondary downloads to get most recent layers first, except l0s (#8729) ## Problem When a secondary location is trying to catch up while a tenant is receiving new writes, it can become quite wasteful: - Downloading L0s which are soon destroyed by compaction to L1s - Downloading older layer files which are soon made irrelevant when covered by image layers. ## Summary of changes Sort the layer files in the heatmap: - L0 layers are the lowest priority - Other layers are sorted to download the highest LSNs first. --- pageserver/src/tenant/secondary/heatmap.rs | 8 +- pageserver/src/tenant/timeline.rs | 134 +++++++++++++++++++-- 2 files changed, 130 insertions(+), 12 deletions(-) diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs index 166483ba5d..4a8e66d38a 100644 --- a/pageserver/src/tenant/secondary/heatmap.rs +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -29,16 +29,16 @@ pub(super) struct HeatMapTenant { #[derive(Serialize, Deserialize)] pub(crate) struct HeatMapTimeline { #[serde_as(as = "DisplayFromStr")] - pub(super) timeline_id: TimelineId, + pub(crate) timeline_id: TimelineId, - pub(super) layers: Vec, + pub(crate) layers: Vec, } #[serde_as] #[derive(Serialize, Deserialize)] pub(crate) struct HeatMapLayer { - pub(super) name: LayerName, - pub(super) metadata: LayerFileMetadata, + pub(crate) name: LayerName, + pub(crate) metadata: LayerFileMetadata, #[serde_as(as = "TimestampSeconds")] pub(super) access_time: SystemTime, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 01e77fa1b1..26dc87c373 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2977,11 +2977,7 @@ impl Timeline { LayerVisibilityHint::Visible => { // Layer is visible to one or more read LSNs: elegible for inclusion in layer map let last_activity_ts = layer.latest_activity(); - Some(HeatMapLayer::new( - layer.layer_desc().layer_name(), - layer.metadata(), - last_activity_ts, - )) + Some((layer.layer_desc(), layer.metadata(), last_activity_ts)) } LayerVisibilityHint::Covered => { // Layer is resident but unlikely to be read: not elegible for inclusion in heatmap. @@ -2990,7 +2986,23 @@ impl Timeline { } }); - let layers = resident.collect(); + let mut layers = resident.collect::>(); + + // Sort layers in order of which to download first. For a large set of layers to download, we + // want to prioritize those layers which are most likely to still be in the resident many minutes + // or hours later: + // - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might + // only exist for a few minutes before being compacted into L1s. + // - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner + // the layer is likely to be covered by an image layer during compaction. + layers.sort_by_key(|(desc, _meta, _atime)| { + std::cmp::Reverse((!LayerMap::is_l0(&desc.key_range), desc.lsn_range.end)) + }); + + let layers = layers + .into_iter() + .map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime)) + .collect(); Some(HeatMapTimeline::new(self.timeline_id, layers)) } @@ -4516,6 +4528,7 @@ impl DurationRecorder { /// the layer descriptor requires the user to provide the ranges, which should cover all /// keys specified in the `data` field. #[cfg(test)] +#[derive(Clone)] pub struct DeltaLayerTestDesc { pub lsn_range: Range, pub key_range: Range, @@ -4545,6 +4558,13 @@ impl DeltaLayerTestDesc { data, } } + + pub(crate) fn layer_name(&self) -> LayerName { + LayerName::Delta(super::storage_layer::DeltaLayerName { + key_range: self.key_range.clone(), + lsn_range: self.lsn_range.clone(), + }) + } } impl Timeline { @@ -5768,12 +5788,110 @@ fn is_send() { #[cfg(test)] mod tests { + use pageserver_api::key::Key; use utils::{id::TimelineId, lsn::Lsn}; - use crate::tenant::{ - harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline, + use crate::{ + repository::Value, + tenant::{ + harness::{test_img, TenantHarness}, + layer_map::LayerMap, + storage_layer::{Layer, LayerName}, + timeline::{DeltaLayerTestDesc, EvictionError}, + Timeline, + }, }; + #[tokio::test] + async fn test_heatmap_generation() { + let harness = TenantHarness::create("heatmap_generation").await.unwrap(); + + let covered_delta = DeltaLayerTestDesc::new_with_inferred_key_range( + Lsn(0x10)..Lsn(0x20), + vec![( + Key::from_hex("620000000033333333444444445500000000").unwrap(), + Lsn(0x11), + Value::Image(test_img("foo")), + )], + ); + let visible_delta = DeltaLayerTestDesc::new_with_inferred_key_range( + Lsn(0x10)..Lsn(0x20), + vec![( + Key::from_hex("720000000033333333444444445500000000").unwrap(), + Lsn(0x11), + Value::Image(test_img("foo")), + )], + ); + let l0_delta = DeltaLayerTestDesc::new( + Lsn(0x20)..Lsn(0x30), + Key::from_hex("000000000000000000000000000000000000").unwrap() + ..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(), + vec![( + Key::from_hex("720000000033333333444444445500000000").unwrap(), + Lsn(0x25), + Value::Image(test_img("foo")), + )], + ); + let delta_layers = vec![ + covered_delta.clone(), + visible_delta.clone(), + l0_delta.clone(), + ]; + + let image_layer = ( + Lsn(0x40), + vec![( + Key::from_hex("620000000033333333444444445500000000").unwrap(), + test_img("bar"), + )], + ); + let image_layers = vec![image_layer]; + + let (tenant, ctx) = harness.load().await; + let timeline = tenant + .create_test_timeline_with_layers( + TimelineId::generate(), + Lsn(0x10), + 14, + &ctx, + delta_layers, + image_layers, + Lsn(0x100), + ) + .await + .unwrap(); + + // Layer visibility is an input to heatmap generation, so refresh it first + timeline.update_layer_visibility().await.unwrap(); + + let heatmap = timeline + .generate_heatmap() + .await + .expect("Infallible while timeline is not shut down"); + + assert_eq!(heatmap.timeline_id, timeline.timeline_id); + + // L0 should come last + assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name()); + + let mut last_lsn = Lsn::MAX; + for layer in heatmap.layers { + // Covered layer should be omitted + assert!(layer.name != covered_delta.layer_name()); + + let layer_lsn = match &layer.name { + LayerName::Delta(d) => d.lsn_range.end, + LayerName::Image(i) => i.lsn, + }; + + // Apart from L0s, newest Layers should come first + if !LayerMap::is_l0(layer.name.key_range()) { + assert!(layer_lsn <= last_lsn); + last_lsn = layer_lsn; + } + } + } + #[tokio::test] async fn two_layer_eviction_attempts_at_the_same_time() { let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time")