From 20f4febce1221e1d619e5f4eef63720ef89d05da Mon Sep 17 00:00:00 2001 From: Suhas Thalanki <54014218+thesuhas@users.noreply.github.com> Date: Wed, 18 Jun 2025 12:31:22 -0700 Subject: [PATCH 1/9] fix: additional changes to terminate pgbouncer on compute suspend (#12153) (#12284) Addressed [retrospective comments ](https://github.com/neondatabase/neon/pull/12153#discussion_r2154197503) to https://github.com/neondatabase/neon/pull/12153 --- compute_tools/src/compute.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d42e3cc860..7a7f2dfedc 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -2320,8 +2320,6 @@ pub fn forward_termination_signal(dev_mode: bool) { } if !dev_mode { - info!("not in dev mode, terminating pgbouncer"); - // Terminate pgbouncer with SIGKILL match pid_file::read(PGBOUNCER_PIDFILE.into()) { Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => { @@ -2353,25 +2351,27 @@ pub fn forward_termination_signal(dev_mode: bool) { error!("error reading pgbouncer pid file: {}", e); } } - } - // Terminate local_proxy - match pid_file::read("/etc/local_proxy/pid".into()) { - Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => { - info!("sending SIGTERM to local_proxy process pid: {}", pid); - if let Err(e) = kill(pid, Signal::SIGTERM) { - error!("failed to terminate local_proxy: {}", e); + // Terminate local_proxy + match pid_file::read("/etc/local_proxy/pid".into()) { + Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => { + info!("sending SIGTERM to local_proxy process pid: {}", pid); + if let Err(e) = kill(pid, Signal::SIGTERM) { + error!("failed to terminate local_proxy: {}", e); + } + } + Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => { + info!("local_proxy PID file exists but process not running"); + } + Ok(pid_file::PidFileRead::NotExist) => { + info!("local_proxy PID file not found, process may not be running"); + } + Err(e) => { + error!("error reading local_proxy PID file: {}", e); } } - Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => { - info!("local_proxy PID file exists but process not running"); - } - Ok(pid_file::PidFileRead::NotExist) => { - info!("local_proxy PID file not found, process may not be running"); - } - Err(e) => { - error!("error reading local_proxy PID file: {}", e); - } + } else { + info!("Skipping pgbouncer and local_proxy termination because in dev mode"); } let pg_pid = PG_PID.load(Ordering::SeqCst); From 3f676df3d519107ee09b54772652be38734608c1 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 19 Jun 2025 12:53:18 +0300 Subject: [PATCH 2/9] pageserver: fix initial layer visibility calculation (#12206) ## Problem GC info is an input to updating layer visibility. Currently, gc info is updated on timeline activation and visibility is computed on tenant attach, so we ignore branch points and compute visibility by taking all layers into account. Side note: gc info is also updated when timelines are created and dropped. That doesn't help because we create the timelines in topological order from the root. Hence the root timeline goes first, without context of where the branch points are. The impact of this in prod is that shards need to rehydrate layers after live migration since the non-visible ones were excluded from the heatmap. ## Summary of Changes Move the visibility calculation into tenant attachment instead of activation. --- pageserver/src/tenant.rs | 30 +++++++++++++++++++++++------- pageserver/src/tenant/timeline.rs | 6 +----- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cfecf5561c..dc6926f299 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1859,6 +1859,29 @@ impl TenantShard { } } + // At this point we've initialized all timelines and are tracking them. + // Now compute the layer visibility for all (not offloaded) timelines. + let compute_visiblity_for = { + let timelines_accessor = self.timelines.lock().unwrap(); + let mut timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap(); + + timelines_offloaded_accessor.extend(offloaded_timelines_list.into_iter()); + + // Before activation, populate each Timeline's GcInfo with information about its children + self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None); + + timelines_accessor.values().cloned().collect::>() + }; + + for tl in compute_visiblity_for { + tl.update_layer_visibility().await.with_context(|| { + format!( + "failed initial timeline visibility computation {} for tenant {}", + tl.timeline_id, self.tenant_shard_id + ) + })?; + } + // Walk through deleted timelines, resume deletion for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions { remote_timeline_client @@ -1878,10 +1901,6 @@ impl TenantShard { .context("resume_deletion") .map_err(LoadLocalTimelineError::ResumeDeletion)?; } - { - let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap(); - offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter()); - } // Stash the preloaded tenant manifest, and upload a new manifest if changed. // @@ -3449,9 +3468,6 @@ impl TenantShard { .values() .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping())); - // Before activation, populate each Timeline's GcInfo with information about its children - self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None); - // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. tasks::start_background_loops(self, background_jobs_can_start); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a1969ecae6..b8bfc4f936 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3422,10 +3422,6 @@ impl Timeline { // TenantShard::create_timeline will wait for these uploads to happen before returning, or // on retry. - // Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan) - drop(guard); // drop write lock, update_layer_visibility will take a read lock. - self.update_layer_visibility().await?; - info!( "loaded layer map with {} layers at {}, total physical size: {}", num_layers, disk_consistent_lsn, total_physical_size @@ -5939,7 +5935,7 @@ impl Drop for Timeline { if let Ok(mut gc_info) = ancestor.gc_info.write() { if !gc_info.remove_child_not_offloaded(self.timeline_id) { tracing::error!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id, - "Couldn't remove retain_lsn entry from offloaded timeline's parent: already removed"); + "Couldn't remove retain_lsn entry from timeline's parent on drop: already removed"); } } } From 6f4ffdb48b029bb56f02abb5b15551790083eb01 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 19 Jun 2025 11:54:34 +0200 Subject: [PATCH 3/9] pageserver: add gRPC compression (#12280) ## Problem The gRPC page service should support compression. Requires #12111. Touches #11728. Touches https://github.com/neondatabase/cloud/issues/25679. ## Summary of changes Add support for gzip and zstd compression in the server, and a client parameter to enable compression. This will need further benchmarking under realistic network conditions. --- Cargo.lock | 2 ++ Cargo.toml | 2 +- pageserver/page_api/src/client.rs | 11 ++++++++++- pageserver/src/page_service.rs | 10 ++++++++-- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6b8399b5e..b996f9d384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7561,6 +7561,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", @@ -7580,6 +7581,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2f4fcbc249..076d8d0b60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,7 +199,7 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" -tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } +tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] } tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 057a1d4ad6..274f036f3d 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -83,6 +83,7 @@ impl Client { timeline_id: TimelineId, shard_id: ShardIndex, auth_header: Option, + compression: Option, ) -> anyhow::Result { let endpoint: tonic::transport::Endpoint = into_endpoint .try_into() @@ -90,7 +91,15 @@ impl Client { let channel = endpoint.connect().await?; let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id) .map_err(|e| anyhow::anyhow!(e.to_string()))?; - let client = proto::PageServiceClient::with_interceptor(channel, auth); + let mut client = proto::PageServiceClient::with_interceptor(channel, auth); + + if let Some(compression) = compression { + // TODO: benchmark this (including network latency). + // TODO: consider enabling compression by default. + client = client + .accept_compressed(compression) + .send_compressed(compression); + } Ok(Self { client }) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 57087dc6c3..800b47f235 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3286,7 +3286,14 @@ impl GrpcPageServiceHandler { Ok(req) })) // Run the page service. - .service(proto::PageServiceServer::new(page_service_handler)); + .service( + proto::PageServiceServer::new(page_service_handler) + // Support both gzip and zstd compression. The client decides what to use. + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(tonic::codec::CompressionEncoding::Zstd) + .send_compressed(tonic::codec::CompressionEncoding::Gzip) + .send_compressed(tonic::codec::CompressionEncoding::Zstd), + ); let server = server.add_service(page_service); // Reflection service for use with e.g. grpcurl. @@ -3532,7 +3539,6 @@ impl proto::PageService for GrpcPageServiceHandler { Ok(tonic::Response::new(resp.into())) } - // TODO: ensure clients use gzip compression for the stream. #[instrument(skip_all, fields(lsn))] async fn get_base_backup( &self, From fa954671b2876301188fc6249a90f81e384ec6d3 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 19 Jun 2025 13:00:01 +0300 Subject: [PATCH 4/9] Remove unnecessary Postgres libs from the storage docker image (#12286) Since commit 87ad50c925, storage_controller has used diesel_async, which in turn uses tokio-postgres as the Postgres client, which doesn't require libpq. Thus we no longer need libpq in the storage image. --- Dockerfile | 8 -------- 1 file changed, 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0b7ef491fd..f72d7d9bbc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,8 +5,6 @@ ARG REPOSITORY=ghcr.io/neondatabase ARG IMAGE=build-tools ARG TAG=pinned -ARG DEFAULT_PG_VERSION=17 -ARG STABLE_PG_VERSION=16 ARG DEBIAN_VERSION=bookworm ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim @@ -63,14 +61,11 @@ FROM $REPOSITORY/$IMAGE:$TAG AS build WORKDIR /home/nonroot ARG GIT_VERSION=local ARG BUILD_TAG -ARG STABLE_PG_VERSION COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server -COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib -COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib COPY --from=plan /home/nonroot/recipe.json recipe.json ARG ADDITIONAL_RUSTFLAGS="" @@ -97,7 +92,6 @@ RUN set -e \ # Build final image # FROM $BASE_IMAGE_SHA -ARG DEFAULT_PG_VERSION WORKDIR /data RUN set -e \ @@ -107,8 +101,6 @@ RUN set -e \ libreadline-dev \ libseccomp-dev \ ca-certificates \ - # System postgres for use with client libraries (e.g. in storage controller) - postgresql-15 \ openssl \ unzip \ curl \ From 2ca6665f4add906114abed2ffa8db36565277414 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 19 Jun 2025 13:24:09 +0300 Subject: [PATCH 5/9] Remove outdated 'clean' Makefile targest (#12288) We have been bad at keeping them up-to-date, several contrib modules and neon extensions were missing from the clean rules. Give up trying, and remove the targets altogether. In practice, it's straightforward to just do `rm -rf pg_install/build`, so the clean-targets are hardly worth the maintenance effort. I kept `make distclean` though. The rule for that is simple enough. --- Makefile | 48 ------------------------------------------------ 1 file changed, 48 deletions(-) diff --git a/Makefile b/Makefile index 0911465fb8..5130e17e59 100644 --- a/Makefile +++ b/Makefile @@ -167,13 +167,6 @@ postgres-%: postgres-configure-% \ +@echo "Compiling test_decoding $*" $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install -.PHONY: postgres-clean-% -postgres-clean-%: - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean - .PHONY: postgres-check-% postgres-check-%: postgres-% $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check @@ -206,21 +199,6 @@ neon-pg-ext-%: postgres-% -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install -.PHONY: neon-pg-clean-ext-% -neon-pg-clean-ext-%: - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean - # Build walproposer as a static library. walproposer source code is located # in the pgxn/neon directory. # @@ -253,12 +231,6 @@ ifeq ($(UNAME_S),Linux) pg_crc32c.o endif -.PHONY: walproposer-lib-clean -walproposer-lib-clean: - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean - .PHONY: neon-pg-ext neon-pg-ext: \ neon-pg-ext-v14 \ @@ -266,13 +238,6 @@ neon-pg-ext: \ neon-pg-ext-v16 \ neon-pg-ext-v17 -.PHONY: neon-pg-clean-ext -neon-pg-clean-ext: \ - neon-pg-clean-ext-v14 \ - neon-pg-clean-ext-v15 \ - neon-pg-clean-ext-v16 \ - neon-pg-clean-ext-v17 - # shorthand to build all Postgres versions .PHONY: postgres postgres: \ @@ -288,13 +253,6 @@ postgres-headers: \ postgres-headers-v16 \ postgres-headers-v17 -.PHONY: postgres-clean -postgres-clean: \ - postgres-clean-v14 \ - postgres-clean-v15 \ - postgres-clean-v16 \ - postgres-clean-v17 - .PHONY: postgres-check postgres-check: \ postgres-check-v14 \ @@ -302,12 +260,6 @@ postgres-check: \ postgres-check-v16 \ postgres-check-v17 -# This doesn't remove the effects of 'configure'. -.PHONY: clean -clean: postgres-clean neon-pg-clean-ext - $(MAKE) -C compute clean - $(CARGO_CMD_PREFIX) cargo clean - # This removes everything .PHONY: distclean distclean: From 1950ccfe33d895541217cc9784d12a941ecd571a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 19 Jun 2025 13:31:27 +0300 Subject: [PATCH 6/9] Eliminate dependency from pageserver_api to postgres_ffi (#12273) Introduce a separate `postgres_ffi_types` crate which contains a few types and functions that were used in the API. `postgres_ffi_types` is a much small crate than `postgres_ffi`, and it doesn't depend on bindgen or the Postgres C headers. Move NeonWalRecord and Value types to wal_decoder crate. They are only used in the pageserver-safekeeper "ingest" API. The rest of the ingest API types are defined in wal_decoder, so move these there as well. --- Cargo.lock | 13 ++++- Cargo.toml | 2 + libs/pageserver_api/Cargo.toml | 2 +- libs/pageserver_api/src/key.rs | 6 +- libs/pageserver_api/src/keyspace.rs | 11 ++-- libs/pageserver_api/src/lib.rs | 2 - libs/pageserver_api/src/pagestream_api.rs | 14 +++-- libs/pageserver_api/src/reltag.rs | 6 +- libs/pageserver_api/src/shard.rs | 2 +- libs/postgres_ffi/Cargo.toml | 1 + libs/postgres_ffi/src/pg_constants.rs | 6 +- libs/postgres_ffi/src/relfile_utils.rs | 55 +++---------------- libs/postgres_ffi_types/Cargo.toml | 11 ++++ libs/postgres_ffi_types/src/constants.rs | 8 +++ libs/postgres_ffi_types/src/forknum.rs | 36 ++++++++++++ libs/postgres_ffi_types/src/lib.rs | 13 +++++ libs/wal_decoder/Cargo.toml | 1 + libs/wal_decoder/src/decoder.rs | 2 +- libs/wal_decoder/src/models.rs | 3 + .../src => wal_decoder/src/models}/record.rs | 0 .../src => wal_decoder/src/models}/value.rs | 2 +- libs/wal_decoder/src/serialized_batch.rs | 6 +- pageserver/Cargo.toml | 1 + pageserver/benches/bench_ingest.rs | 2 +- pageserver/benches/bench_walredo.rs | 2 +- pageserver/src/basebackup.rs | 8 +-- pageserver/src/import_datadir.rs | 4 +- pageserver/src/page_service.rs | 2 +- pageserver/src/pgdatadir_mapping.rs | 11 ++-- pageserver/src/tenant.rs | 10 ++-- pageserver/src/tenant/storage_layer.rs | 4 +- .../storage_layer/batch_split_writer.rs | 2 +- .../src/tenant/storage_layer/delta_layer.rs | 9 ++- .../tenant/storage_layer/filter_iterator.rs | 3 +- .../src/tenant/storage_layer/image_layer.rs | 4 +- .../src/tenant/storage_layer/layer/tests.rs | 2 +- .../tenant/storage_layer/merge_iterator.rs | 9 +-- pageserver/src/tenant/timeline.rs | 12 ++-- pageserver/src/tenant/timeline/compaction.rs | 4 +- .../src/tenant/timeline/import_pgdata/flow.rs | 6 +- pageserver/src/walingest.rs | 4 +- pageserver/src/walredo.rs | 4 +- pageserver/src/walredo/apply_neon.rs | 4 +- pageserver/src/walredo/process.rs | 2 +- 44 files changed, 183 insertions(+), 128 deletions(-) create mode 100644 libs/postgres_ffi_types/Cargo.toml create mode 100644 libs/postgres_ffi_types/src/constants.rs create mode 100644 libs/postgres_ffi_types/src/forknum.rs create mode 100644 libs/postgres_ffi_types/src/lib.rs rename libs/{pageserver_api/src => wal_decoder/src/models}/record.rs (100%) rename libs/{pageserver_api/src => wal_decoder/src/models}/value.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index b996f9d384..5ab26b02fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4334,6 +4334,7 @@ dependencies = [ "postgres_backend", "postgres_connection", "postgres_ffi", + "postgres_ffi_types", "postgres_initdb", "posthog_client_lite", "pprof", @@ -4403,7 +4404,7 @@ dependencies = [ "nix 0.30.1", "once_cell", "postgres_backend", - "postgres_ffi", + "postgres_ffi_types", "rand 0.8.5", "remote_storage", "reqwest", @@ -4892,6 +4893,7 @@ dependencies = [ "memoffset 0.9.0", "once_cell", "postgres", + "postgres_ffi_types", "pprof", "regex", "serde", @@ -4900,6 +4902,14 @@ dependencies = [ "utils", ] +[[package]] +name = "postgres_ffi_types" +version = "0.1.0" +dependencies = [ + "thiserror 1.0.69", + "workspace_hack", +] + [[package]] name = "postgres_initdb" version = "0.1.0" @@ -8161,6 +8171,7 @@ dependencies = [ "futures", "pageserver_api", "postgres_ffi", + "postgres_ffi_types", "pprof", "prost 0.13.5", "remote_storage", diff --git a/Cargo.toml b/Cargo.toml index 076d8d0b60..2a6acc132e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "libs/http-utils", "libs/pageserver_api", "libs/postgres_ffi", + "libs/postgres_ffi_types", "libs/safekeeper_api", "libs/desim", "libs/neon-shmem", @@ -259,6 +260,7 @@ pageserver_page_api = { path = "./pageserver/page_api" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" } postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" } +postgres_ffi_types = { version = "0.1", path = "./libs/postgres_ffi_types/" } postgres_initdb = { path = "./libs/postgres_initdb" } posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" } pq_proto = { version = "0.1", path = "./libs/pq_proto/" } diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 25f29b8ecd..5a9a74b93d 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -17,7 +17,7 @@ anyhow.workspace = true bytes.workspace = true byteorder.workspace = true utils.workspace = true -postgres_ffi.workspace = true +postgres_ffi_types.workspace = true enum-map.workspace = true strum.workspace = true strum_macros.workspace = true diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index c14975167b..102bbee879 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -4,8 +4,8 @@ use std::ops::Range; use anyhow::{Result, bail}; use byteorder::{BE, ByteOrder}; use bytes::Bytes; -use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; -use postgres_ffi::{Oid, RepOriginId}; +use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; +use postgres_ffi_types::{Oid, RepOriginId}; use serde::{Deserialize, Serialize}; use utils::const_assert; @@ -194,7 +194,7 @@ impl Key { /// will be rejected on the write path. #[allow(dead_code)] pub fn is_valid_key_on_write_path_strong(&self) -> bool { - use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; + use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; if !self.is_i128_representable() { return false; } diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 79e3ef553b..1b48d3c462 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -1,7 +1,6 @@ use std::ops::Range; use itertools::Itertools; -use postgres_ffi::BLCKSZ; use crate::key::Key; use crate::shard::{ShardCount, ShardIdentity}; @@ -269,9 +268,13 @@ impl KeySpace { /// Partition a key space into roughly chunks of roughly 'target_size' bytes /// in each partition. /// - pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning { - // Assume that each value is 8k in size. - let target_nblocks = (target_size / BLCKSZ as u64) as u32; + pub fn partition( + &self, + shard_identity: &ShardIdentity, + target_size: u64, + block_size: u64, + ) -> KeyPartitioning { + let target_nblocks = (target_size / block_size) as u32; let mut parts = Vec::new(); let mut current_part = Vec::new(); diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index 6c91d61508..52aed7a2c2 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -6,11 +6,9 @@ pub mod key; pub mod keyspace; pub mod models; pub mod pagestream_api; -pub mod record; pub mod reltag; pub mod shard; /// Public API types pub mod upcall_api; -pub mod value; pub mod config; diff --git a/libs/pageserver_api/src/pagestream_api.rs b/libs/pageserver_api/src/pagestream_api.rs index fba64c82d9..862da8268a 100644 --- a/libs/pageserver_api/src/pagestream_api.rs +++ b/libs/pageserver_api/src/pagestream_api.rs @@ -8,9 +8,15 @@ use crate::reltag::RelTag; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; +/// Block size. +/// +/// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode +/// that in the protocol, because Postgres supports different block sizes as a compile +/// time option. +const BLCKSZ: usize = 8192; + // Wrapped in libpq CopyData #[derive(PartialEq, Eq, Debug)] pub enum PagestreamFeMessage { @@ -443,7 +449,7 @@ impl PagestreamBeMessage { Self::GetSlruSegment(resp) => { bytes.put_u8(Tag::GetSlruSegment as u8); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put_u32((resp.segment.len() / BLCKSZ) as u32); bytes.put(&resp.segment[..]); } @@ -520,7 +526,7 @@ impl PagestreamBeMessage { bytes.put_u64(resp.req.hdr.not_modified_since.0); bytes.put_u8(resp.req.kind); bytes.put_u32(resp.req.segno); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put_u32((resp.segment.len() / BLCKSZ) as u32); bytes.put(&resp.segment[..]); } @@ -662,7 +668,7 @@ impl PagestreamBeMessage { let kind = buf.read_u8()?; let segno = buf.read_u32::()?; let n_blocks = buf.read_u32::()?; - let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize]; + let mut segment = vec![0; n_blocks as usize * BLCKSZ]; buf.read_exact(&mut segment)?; Self::GetSlruSegment(PagestreamGetSlruSegmentResponse { req: PagestreamGetSlruSegmentRequest { diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index e0dd4fdfe8..d0e37dffae 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -1,9 +1,9 @@ use std::cmp::Ordering; use std::fmt; -use postgres_ffi::Oid; -use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; -use postgres_ffi::relfile_utils::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name}; +use postgres_ffi_types::Oid; +use postgres_ffi_types::constants::GLOBALTABLESPACE_OID; +use postgres_ffi_types::forknum::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name}; use serde::{Deserialize, Serialize}; /// diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index feb59f5070..9c16be93e8 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -35,7 +35,7 @@ use std::hash::{Hash, Hasher}; #[doc(inline)] pub use ::utils::shard::*; -use postgres_ffi::relfile_utils::INIT_FORKNUM; +use postgres_ffi_types::forknum::INIT_FORKNUM; use serde::{Deserialize, Serialize}; use crate::key::Key; diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index b7a376841d..67adfdd3c3 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -16,6 +16,7 @@ memoffset.workspace = true pprof.workspace = true thiserror.workspace = true serde.workspace = true +postgres_ffi_types.workspace = true utils.workspace = true tracing.workspace = true diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index b0bdd8a8da..f61b9a71c2 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -11,11 +11,7 @@ use crate::{BLCKSZ, PageHeaderData}; -// -// From pg_tablespace_d.h -// -pub const DEFAULTTABLESPACE_OID: u32 = 1663; -pub const GLOBALTABLESPACE_OID: u32 = 1664; +// Note: There are a few more widely-used constants in the postgres_ffi_types::constants crate. // From storage_xlog.h pub const XLOG_SMGR_CREATE: u8 = 0x10; diff --git a/libs/postgres_ffi/src/relfile_utils.rs b/libs/postgres_ffi/src/relfile_utils.rs index aa0e625b47..38f94b7221 100644 --- a/libs/postgres_ffi/src/relfile_utils.rs +++ b/libs/postgres_ffi/src/relfile_utils.rs @@ -4,50 +4,7 @@ use once_cell::sync::OnceCell; use regex::Regex; -// -// Fork numbers, from relpath.h -// -pub const MAIN_FORKNUM: u8 = 0; -pub const FSM_FORKNUM: u8 = 1; -pub const VISIBILITYMAP_FORKNUM: u8 = 2; -pub const INIT_FORKNUM: u8 = 3; - -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -pub enum FilePathError { - #[error("invalid relation fork name")] - InvalidForkName, - #[error("invalid relation data file name")] - InvalidFileName, -} - -impl From for FilePathError { - fn from(_e: core::num::ParseIntError) -> Self { - FilePathError::InvalidFileName - } -} - -/// Convert Postgres relation file's fork suffix to fork number. -pub fn forkname_to_number(forkname: Option<&str>) -> Result { - match forkname { - // "main" is not in filenames, it's implicit if the fork name is not present - None => Ok(MAIN_FORKNUM), - Some("fsm") => Ok(FSM_FORKNUM), - Some("vm") => Ok(VISIBILITYMAP_FORKNUM), - Some("init") => Ok(INIT_FORKNUM), - Some(_) => Err(FilePathError::InvalidForkName), - } -} - -/// Convert Postgres fork number to the right suffix of the relation data file. -pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { - match forknum { - MAIN_FORKNUM => None, - FSM_FORKNUM => Some("fsm"), - VISIBILITYMAP_FORKNUM => Some("vm"), - INIT_FORKNUM => Some("init"), - _ => Some("UNKNOWN FORKNUM"), - } -} +use postgres_ffi_types::forknum::*; /// Parse a filename of a relation file. Returns (relfilenode, forknum, segno) tuple. /// @@ -75,7 +32,9 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> { .ok_or(FilePathError::InvalidFileName)?; let relnode_str = caps.name("relnode").unwrap().as_str(); - let relnode = relnode_str.parse::()?; + let relnode = relnode_str + .parse::() + .map_err(|_e| FilePathError::InvalidFileName)?; let forkname = caps.name("forkname").map(|f| f.as_str()); let forknum = forkname_to_number(forkname)?; @@ -84,7 +43,11 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> { let segno = if segno_match.is_none() { 0 } else { - segno_match.unwrap().as_str().parse::()? + segno_match + .unwrap() + .as_str() + .parse::() + .map_err(|_e| FilePathError::InvalidFileName)? }; Ok((relnode, forknum, segno)) diff --git a/libs/postgres_ffi_types/Cargo.toml b/libs/postgres_ffi_types/Cargo.toml new file mode 100644 index 0000000000..50c6fc7874 --- /dev/null +++ b/libs/postgres_ffi_types/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "postgres_ffi_types" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +thiserror.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" } + +[dev-dependencies] diff --git a/libs/postgres_ffi_types/src/constants.rs b/libs/postgres_ffi_types/src/constants.rs new file mode 100644 index 0000000000..c1a004c5ab --- /dev/null +++ b/libs/postgres_ffi_types/src/constants.rs @@ -0,0 +1,8 @@ +//! Misc constants, copied from PostgreSQL headers. +//! +//! Any constants included here must be the same in all PostgreSQL versions and unlikely to change +//! in the future either! + +// From pg_tablespace_d.h +pub const DEFAULTTABLESPACE_OID: u32 = 1663; +pub const GLOBALTABLESPACE_OID: u32 = 1664; diff --git a/libs/postgres_ffi_types/src/forknum.rs b/libs/postgres_ffi_types/src/forknum.rs new file mode 100644 index 0000000000..9b225d8ce5 --- /dev/null +++ b/libs/postgres_ffi_types/src/forknum.rs @@ -0,0 +1,36 @@ +// Fork numbers, from relpath.h +pub const MAIN_FORKNUM: u8 = 0; +pub const FSM_FORKNUM: u8 = 1; +pub const VISIBILITYMAP_FORKNUM: u8 = 2; +pub const INIT_FORKNUM: u8 = 3; + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +pub enum FilePathError { + #[error("invalid relation fork name")] + InvalidForkName, + #[error("invalid relation data file name")] + InvalidFileName, +} + +/// Convert Postgres relation file's fork suffix to fork number. +pub fn forkname_to_number(forkname: Option<&str>) -> Result { + match forkname { + // "main" is not in filenames, it's implicit if the fork name is not present + None => Ok(MAIN_FORKNUM), + Some("fsm") => Ok(FSM_FORKNUM), + Some("vm") => Ok(VISIBILITYMAP_FORKNUM), + Some("init") => Ok(INIT_FORKNUM), + Some(_) => Err(FilePathError::InvalidForkName), + } +} + +/// Convert Postgres fork number to the right suffix of the relation data file. +pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { + match forknum { + MAIN_FORKNUM => None, + FSM_FORKNUM => Some("fsm"), + VISIBILITYMAP_FORKNUM => Some("vm"), + INIT_FORKNUM => Some("init"), + _ => Some("UNKNOWN FORKNUM"), + } +} diff --git a/libs/postgres_ffi_types/src/lib.rs b/libs/postgres_ffi_types/src/lib.rs new file mode 100644 index 0000000000..84ef499b9f --- /dev/null +++ b/libs/postgres_ffi_types/src/lib.rs @@ -0,0 +1,13 @@ +//! This package contains some PostgreSQL constants and datatypes that are the same in all versions +//! of PostgreSQL and unlikely to change in the future either. These could be derived from the +//! PostgreSQL headers with 'bindgen', but in order to avoid proliferating the dependency to bindgen +//! and the PostgreSQL C headers to all services, we prefer to have this small stand-alone crate for +//! them instead. +//! +//! Be mindful in what you add here, as these types are deeply ingrained in the APIs. + +pub mod constants; +pub mod forknum; + +pub type Oid = u32; +pub type RepOriginId = u16; diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml index cb0ef4b00d..600ef091f5 100644 --- a/libs/wal_decoder/Cargo.toml +++ b/libs/wal_decoder/Cargo.toml @@ -14,6 +14,7 @@ bytes.workspace = true pageserver_api.workspace = true prost.workspace = true postgres_ffi.workspace = true +postgres_ffi_types.workspace = true serde.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["io-util"] } diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index cb0835e894..9980a1f369 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -8,8 +8,8 @@ use pageserver_api::key::rel_block_to_key; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; use postgres_ffi::pg_constants; -use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::walrecord::*; +use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM; use utils::lsn::Lsn; use crate::models::*; diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 7e1934c6c3..94a00c0e53 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -25,6 +25,9 @@ //! | //! |--> write to KV store within the pageserver +pub mod record; +pub mod value; + use bytes::Bytes; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::walrecord::{ diff --git a/libs/pageserver_api/src/record.rs b/libs/wal_decoder/src/models/record.rs similarity index 100% rename from libs/pageserver_api/src/record.rs rename to libs/wal_decoder/src/models/record.rs diff --git a/libs/pageserver_api/src/value.rs b/libs/wal_decoder/src/models/value.rs similarity index 99% rename from libs/pageserver_api/src/value.rs rename to libs/wal_decoder/src/models/value.rs index e9000939c3..3b4f896a45 100644 --- a/libs/pageserver_api/src/value.rs +++ b/libs/wal_decoder/src/models/value.rs @@ -10,7 +10,7 @@ use bytes::Bytes; use serde::{Deserialize, Serialize}; -use crate::record::NeonWalRecord; +use crate::models::record::NeonWalRecord; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum Value { diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index b451d6d8e0..4123f7d0ac 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -1,4 +1,4 @@ -//! This module implements batch type for serialized [`pageserver_api::value::Value`] +//! This module implements batch type for serialized [`crate::models::value::Value`] //! instances. Each batch contains a raw buffer (serialized values) //! and a list of metadata for each (key, LSN) tuple present in the batch. //! @@ -10,10 +10,8 @@ use std::collections::{BTreeSet, HashMap}; use bytes::{Bytes, BytesMut}; use pageserver_api::key::{CompactKey, Key, rel_block_to_key}; use pageserver_api::keyspace::KeySpace; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::RelTag; use pageserver_api::shard::ShardIdentity; -use pageserver_api::value::Value; use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord}; use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants}; use serde::{Deserialize, Serialize}; @@ -21,6 +19,8 @@ use utils::bin_ser::BeSer; use utils::lsn::Lsn; use crate::models::InterpretedWalRecord; +use crate::models::record::NeonWalRecord; +use crate::models::value::Value; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 9591c729e8..606ba9ad8c 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -56,6 +56,7 @@ pin-project-lite.workspace = true postgres_backend.workspace = true postgres_connection.workspace = true postgres_ffi.workspace = true +postgres_ffi_types.workspace = true postgres_initdb.workspace = true postgres-protocol.workspace = true postgres-types.workspace = true diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index eaadfe14ae..681d135e09 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -13,11 +13,11 @@ use pageserver::{page_cache, virtual_file}; use pageserver_api::key::Key; use pageserver_api::models::virtual_file::IoMode; use pageserver_api::shard::TenantShardId; -use pageserver_api::value::Value; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; +use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::SerializedValueBatch; // A very cheap hash for generating non-sequential keys. diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 215682d90c..36d0d9c974 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -67,12 +67,12 @@ use once_cell::sync::Lazy; use pageserver::config::PageServerConf; use pageserver::walredo::{PostgresRedoManager, RedoAttemptType}; use pageserver_api::key::Key; -use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use tokio::sync::Barrier; use tokio::task::JoinSet; use utils::id::TenantId; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; fn bench(c: &mut Criterion) { macro_rules! bench_group { diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 2a0548b811..fe136b8bbd 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -18,13 +18,12 @@ use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; use pageserver_api::key::{Key, rel_block_to_key}; use pageserver_api::reltag::{RelTag, SlruKind}; -use postgres_ffi::pg_constants::{ - DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID, PG_HBA, PGDATA_SPECIAL_FILES, -}; -use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM}; +use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES}; use postgres_ffi::{ BLCKSZ, PG_TLI, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName, dispatch_pgversion, pg_constants, }; +use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; +use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM}; use tokio::io; use tokio::io::AsyncWrite; use tokio_tar::{Builder, EntryType, Header}; @@ -372,6 +371,7 @@ where .partition( self.timeline.get_shard_identity(), self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, + BLCKSZ as u64, ); let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 911449c7c5..96fe0c1078 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -520,7 +520,7 @@ async fn import_file( } if file_path.starts_with("global") { - let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; + let spcnode = postgres_ffi_types::constants::GLOBALTABLESPACE_OID; let dbnode = 0; match file_name.as_ref() { @@ -553,7 +553,7 @@ async fn import_file( } } } else if file_path.starts_with("base") { - let spcnode = pg_constants::DEFAULTTABLESPACE_OID; + let spcnode = postgres_ffi_types::constants::DEFAULTTABLESPACE_OID; let dbnode: u32 = file_path .iter() .nth(1) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 800b47f235..642b447e5f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -41,7 +41,7 @@ use postgres_backend::{ AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, }; use postgres_ffi::BLCKSZ; -use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; +use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID; use pq_proto::framed::ConnectionError; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor}; use smallvec::{SmallVec, smallvec}; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 633d62210d..58af2548ee 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -23,12 +23,11 @@ use pageserver_api::key::{ }; use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace}; use pageserver_api::models::RelSizeMigration; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; -use pageserver_api::value::Value; -use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; -use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId}; +use postgres_ffi::{BLCKSZ, TimestampTz, TransactionId}; +use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; +use postgres_ffi_types::{Oid, RepOriginId}; use serde::{Deserialize, Serialize}; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; @@ -36,6 +35,8 @@ use tracing::{debug, info, info_span, trace, warn}; use utils::bin_ser::{BeSer, DeserializeError}; use utils::lsn::Lsn; use utils::pausable_failpoint; +use wal_decoder::models::record::NeonWalRecord; +use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; use super::tenant::{PageReconstructError, Timeline}; @@ -720,6 +721,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, + BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( @@ -960,6 +962,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, + BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dc6926f299..d2c2fdef93 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -496,7 +496,7 @@ impl WalRedoManager { key: pageserver_api::key::Key, lsn: Lsn, base_img: Option<(Lsn, bytes::Bytes)>, - records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>, + records: Vec<(Lsn, wal_decoder::models::record::NeonWalRecord)>, pg_version: u32, redo_attempt_type: RedoAttemptType, ) -> Result { @@ -5852,10 +5852,10 @@ pub(crate) mod harness { use once_cell::sync::OnceCell; use pageserver_api::key::Key; use pageserver_api::models::ShardParameters; - use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::ShardIndex; use utils::id::TenantId; use utils::logging; + use wal_decoder::models::record::NeonWalRecord; use super::*; use crate::deletion_queue::mock::MockDeletionQueue; @@ -6110,9 +6110,6 @@ mod tests { #[cfg(feature = "testing")] use pageserver_api::keyspace::KeySpaceRandomAccum; use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings}; - #[cfg(feature = "testing")] - use pageserver_api::record::NeonWalRecord; - use pageserver_api::value::Value; use pageserver_compaction::helpers::overlaps_with; #[cfg(feature = "testing")] use rand::SeedableRng; @@ -6133,6 +6130,9 @@ mod tests { use timeline::{CompactOptions, DeltaLayerTestDesc, VersionedKeySpaceQuery}; use utils::id::TenantId; use utils::shard::{ShardCount, ShardNumber}; + #[cfg(feature = "testing")] + use wal_decoder::models::record::NeonWalRecord; + use wal_decoder::models::value::Value; use super::*; use crate::DEFAULT_PG_VERSION; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9d15e7c4de..e65d444f76 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -34,11 +34,11 @@ pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName}; use pageserver_api::config::GetVectoredConcurrentIo; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; -use pageserver_api::record::NeonWalRecord; -use pageserver_api::value::Value; use tracing::{Instrument, info_span, trace}; use utils::lsn::Lsn; use utils::sync::gate::GateGuard; +use wal_decoder::models::record::NeonWalRecord; +use wal_decoder::models::value::Value; use self::inmemory_layer::InMemoryLayerFileId; use super::PageReconstructError; diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 51f2e909a2..1d50a5f3a0 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -4,11 +4,11 @@ use std::sync::Arc; use bytes::Bytes; use pageserver_api::key::{KEY_SIZE, Key}; -use pageserver_api::value::Value; use tokio_util::sync::CancellationToken; use utils::id::TimelineId; use utils::lsn::Lsn; use utils::shard::TenantShardId; +use wal_decoder::models::value::Value; use super::errors::PutError; use super::layer::S3_UPLOAD_LIMIT; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index e82a28bb4c..ba763d4c3f 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -44,7 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; -use pageserver_api::value::Value; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; @@ -54,6 +53,7 @@ use utils::bin_ser::BeSer; use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::errors::PutError; use super::{ @@ -1306,7 +1306,7 @@ impl DeltaLayerInner { // is it an image or will_init walrecord? // FIXME: this could be handled by threading the BlobRef to the // VectoredReadBuilder - let will_init = pageserver_api::value::ValueBytes::will_init(&data) + let will_init = wal_decoder::models::value::ValueBytes::will_init(&data) .inspect_err(|_e| { #[cfg(feature = "testing")] tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); @@ -1369,7 +1369,7 @@ impl DeltaLayerInner { format!(" img {} bytes", img.len()) } Value::WalRecord(rec) => { - let wal_desc = pageserver_api::record::describe_wal_record(&rec)?; + let wal_desc = wal_decoder::models::record::describe_wal_record(&rec)?; format!( " rec {} bytes will_init: {} {}", buf.len(), @@ -1624,7 +1624,6 @@ pub(crate) mod test { use bytes::Bytes; use itertools::MinMaxResult; - use pageserver_api::value::Value; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use rand::{Rng, RngCore}; @@ -1988,7 +1987,7 @@ pub(crate) mod test { #[tokio::test] async fn copy_delta_prefix_smoke() { use bytes::Bytes; - use pageserver_api::record::NeonWalRecord; + use wal_decoder::models::record::NeonWalRecord; let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke") .await diff --git a/pageserver/src/tenant/storage_layer/filter_iterator.rs b/pageserver/src/tenant/storage_layer/filter_iterator.rs index 1a330ecfc2..d345195446 100644 --- a/pageserver/src/tenant/storage_layer/filter_iterator.rs +++ b/pageserver/src/tenant/storage_layer/filter_iterator.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use anyhow::bail; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, SparseKeySpace}; -use pageserver_api::value::Value; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::PersistentLayerKey; use super::merge_iterator::{MergeIterator, MergeIteratorItem}; @@ -126,7 +126,6 @@ mod tests { #[tokio::test] async fn filter_keyspace_iterator() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator") .await diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 740f53f928..d6f5f48a6e 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -42,7 +42,6 @@ use pageserver_api::config::MaxVectoredReadBytes; use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; -use pageserver_api::value::Value; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_stream::StreamExt; @@ -52,6 +51,7 @@ use utils::bin_ser::BeSer; use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::errors::PutError; use super::layer_name::ImageLayerName; @@ -1232,10 +1232,10 @@ mod test { use itertools::Itertools; use pageserver_api::key::Key; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; - use pageserver_api::value::Value; use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; + use wal_decoder::models::value::Value; use super::{ImageLayerIterator, ImageLayerWriter}; use crate::DEFAULT_PG_VERSION; diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 2f2ff0f273..313c133fa2 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -824,7 +824,7 @@ async fn evict_and_wait_does_not_wait_for_download() { #[tokio::test(start_paused = true)] async fn eviction_cancellation_on_drop() { use bytes::Bytes; - use pageserver_api::value::Value; + use wal_decoder::models::value::Value; // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index ea3dea50c3..c15abcdf3f 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use anyhow::bail; use pageserver_api::key::Key; -use pageserver_api::value::Value; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator}; use super::image_layer::{ImageLayerInner, ImageLayerIterator}; @@ -402,9 +402,9 @@ impl<'a> MergeIterator<'a> { mod tests { use itertools::Itertools; use pageserver_api::key::Key; - #[cfg(feature = "testing")] - use pageserver_api::record::NeonWalRecord; use utils::lsn::Lsn; + #[cfg(feature = "testing")] + use wal_decoder::models::record::NeonWalRecord; use super::*; use crate::DEFAULT_PG_VERSION; @@ -436,7 +436,6 @@ mod tests { #[tokio::test] async fn merge_in_between() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_merge_in_between") .await @@ -501,7 +500,6 @@ mod tests { #[tokio::test] async fn delta_merge() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_merge") .await @@ -578,7 +576,6 @@ mod tests { #[tokio::test] async fn delta_image_mixed_merge() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge") .await diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b8bfc4f936..a0e9d8f06a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -56,8 +56,6 @@ use pageserver_api::models::{ }; use pageserver_api::reltag::{BlockNumber, RelTag}; use pageserver_api::shard::{ShardIdentity, ShardIndex, ShardNumber, TenantShardId}; -#[cfg(test)] -use pageserver_api::value::Value; use postgres_connection::PgConnectionConfig; use postgres_ffi::v14::xlog_utils; use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp}; @@ -81,6 +79,8 @@ use utils::seqwait::SeqWait; use utils::simple_rcu::{Rcu, RcuReadGuard}; use utils::sync::gate::{Gate, GateGuard}; use utils::{completion, critical, fs_ext, pausable_failpoint}; +#[cfg(test)] +use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; use self::delete::DeleteTimelineFlow; @@ -5207,7 +5207,11 @@ impl Timeline { } let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?; - let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size); + let dense_partitioning = dense_ks.partition( + &self.shard_identity, + partition_size, + postgres_ffi::BLCKSZ as u64, + ); let sparse_partitioning = SparseKeyPartitioning { parts: vec![sparse_ks], }; // no partitioning for metadata keys for now @@ -7590,11 +7594,11 @@ mod tests { use std::sync::Arc; use pageserver_api::key::Key; - use pageserver_api::value::Value; use std::iter::Iterator; use tracing::Instrument; use utils::id::TimelineId; use utils::lsn::Lsn; + use wal_decoder::models::value::Value; use super::HeatMapTimeline; use crate::context::RequestContextBuilder; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 5307d3836f..6039c002f7 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -29,9 +29,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE; use pageserver_api::key::{KEY_SIZE, Key}; use pageserver_api::keyspace::{KeySpace, ShardedRange}; use pageserver_api::models::{CompactInfoResponse, CompactKeyRange}; -use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; -use pageserver_api::value::Value; use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; use serde::Serialize; @@ -41,6 +39,8 @@ use tracing::{Instrument, debug, error, info, info_span, trace, warn}; use utils::critical; use utils::id::TimelineId; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; +use wal_decoder::models::value::Value; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index ed679a9bdc..d471e9fc69 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -36,8 +36,8 @@ use pageserver_api::keyspace::{ShardedRange, singleton_range}; use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; +use postgres_ffi::BLCKSZ; use postgres_ffi::relfile_utils::parse_relfilename; -use postgres_ffi::{BLCKSZ, pg_constants}; use remote_storage::RemotePath; use tokio::sync::Semaphore; use tokio_stream::StreamExt; @@ -558,7 +558,7 @@ impl PgDataDir { PgDataDirDb::new( storage, &basedir.join(dboid.to_string()), - pg_constants::DEFAULTTABLESPACE_OID, + postgres_ffi_types::constants::DEFAULTTABLESPACE_OID, dboid, &datadir_path, ) @@ -571,7 +571,7 @@ impl PgDataDir { PgDataDirDb::new( storage, &datadir_path.join("global"), - postgres_ffi::pg_constants::GLOBALTABLESPACE_OID, + postgres_ffi_types::constants::GLOBALTABLESPACE_OID, 0, &datadir_path, ) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c1a3b79915..ebffaf70e2 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -28,20 +28,20 @@ use std::time::{Duration, Instant, SystemTime}; use bytes::{Buf, Bytes}; use pageserver_api::key::{Key, rel_block_to_key}; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; -use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::walrecord::*; use postgres_ffi::{ TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, fsm_logical_to_physical, pg_constants, }; +use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use tracing::*; use utils::bin_ser::{DeserializeError, SerializeError}; use utils::lsn::Lsn; use utils::rate_limit::RateLimit; use utils::{critical, failpoint_support}; +use wal_decoder::models::record::NeonWalRecord; use wal_decoder::models::*; use crate::ZERO_PAGE; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index ed8a954369..1498f3c83d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -32,12 +32,12 @@ use anyhow::Context; use bytes::{Bytes, BytesMut}; use pageserver_api::key::Key; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; -use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use tracing::*; use utils::lsn::Lsn; use utils::sync::gate::GateError; use utils::sync::heavier_once_cell; +use wal_decoder::models::record::NeonWalRecord; use crate::config::PageServerConf; use crate::metrics::{ @@ -571,11 +571,11 @@ mod tests { use bytes::Bytes; use pageserver_api::key::Key; - use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use tracing::Instrument; use utils::id::TenantId; use utils::lsn::Lsn; + use wal_decoder::models::record::NeonWalRecord; use super::PostgresRedoManager; use crate::config::PageServerConf; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index a3840f1f6f..0783c77622 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -2,16 +2,16 @@ use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; use bytes::BytesMut; use pageserver_api::key::Key; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::SlruKind; -use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::v14::nonrelfile_utils::{ mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, transaction_id_set_status, }; use postgres_ffi::{BLCKSZ, pg_constants}; +use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM; use tracing::*; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; /// Can this request be served by neon redo functions /// or we need to pass it to wal-redo postgres process? diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index 6d4a38d4ff..3dec0593bf 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -10,7 +10,6 @@ use std::time::Duration; use anyhow::Context; use bytes::Bytes; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; use postgres_ffi::BLCKSZ; @@ -18,6 +17,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{Instrument, debug, error, instrument}; use utils::lsn::Lsn; use utils::poison::Poison; +use wal_decoder::models::record::NeonWalRecord; use self::no_leak_child::NoLeakChild; use crate::config::PageServerConf; From ec1452a5597bdb1284b0183444f5f6b437aa16ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 19 Jun 2025 13:17:01 +0200 Subject: [PATCH 7/9] Switch on --timelines-onto-safekeepers in integration tests (#11712) Switch on the `--timelines-onto-safekeepers` param in integration tests. Some changes that were needed to enable this but which I put into other PRs to not clutter up this one: * #11786 * #11854 * #12129 * #12138 Further fixes that were needed for this: * https://github.com/neondatabase/neon/pull/11801 * https://github.com/neondatabase/neon/pull/12143 * https://github.com/neondatabase/neon/pull/12204 Not strictly needed, but helpful: * https://github.com/neondatabase/neon/pull/12155 Part of #11670 Closes #11424 --- control_plane/src/local_env.rs | 2 +- test_runner/fixtures/neon_fixtures.py | 7 +- test_runner/regress/test_branching.py | 14 ++++ test_runner/regress/test_normal_work.py | 5 ++ test_runner/regress/test_ondemand_download.py | 6 ++ .../regress/test_storage_controller.py | 21 ++++-- test_runner/regress/test_storage_scrubber.py | 5 ++ .../regress/test_timeline_detach_ancestor.py | 19 +++++- .../regress/test_timeline_gc_blocking.py | 2 + test_runner/regress/test_wal_acceptor.py | 66 +++++++++++++++---- .../regress/test_wal_acceptor_async.py | 12 ++++ test_runner/regress/test_wal_receiver.py | 7 ++ 12 files changed, 141 insertions(+), 25 deletions(-) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 47b77f0720..1b231151ce 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -236,7 +236,7 @@ impl Default for NeonStorageControllerConf { heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL, long_reconcile_threshold: None, use_https_pageserver_api: false, - timelines_onto_safekeepers: false, + timelines_onto_safekeepers: true, use_https_safekeeper_api: false, use_local_compute_notifications: true, } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 8cf1020adb..050d61055e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -489,7 +489,9 @@ class NeonEnvBuilder: self.config_init_force: str | None = None self.top_output_dir = top_output_dir self.control_plane_hooks_api: str | None = None - self.storage_controller_config: dict[Any, Any] | None = None + self.storage_controller_config: dict[Any, Any] | None = { + "timelines_onto_safekeepers": True, + } # Flag to enable https listener in pageserver, generate local ssl certs, # and force storage controller to use https for pageserver api. @@ -4909,6 +4911,9 @@ class Safekeeper(LogUtils): log.info(f"finished pulling timeline from {src_ids} to {self.id}") return res + def safekeeper_id(self) -> SafekeeperId: + return SafekeeperId(self.id, "localhost", self.port.pg_tenant_only) + @property def data_dir(self) -> Path: return self.env.repo_dir / "safekeepers" / f"sk{self.id}" diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 9ce618b2ad..fa5c9aa693 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -11,6 +11,7 @@ from fixtures.common_types import Lsn, TimelineId from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import wait_until_tenant_active +from fixtures.safekeeper.http import MembershipConfiguration, TimelineCreateRequest from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix from requests import RequestException @@ -164,6 +165,19 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) env.pageserver.tenant_create(env.initial_tenant) + sk = env.safekeepers[0] + assert sk + sk.http_client().timeline_create( + TimelineCreateRequest( + env.initial_tenant, + env.initial_timeline, + MembershipConfiguration(generation=1, members=[sk.safekeeper_id()], new_members=None), + int(env.pg_version), + Lsn(0), + None, + ) + ) + initial_branch = "initial_branch" def start_creating_timeline(): diff --git a/test_runner/regress/test_normal_work.py b/test_runner/regress/test_normal_work.py index 44590ea4b9..3335cf686c 100644 --- a/test_runner/regress/test_normal_work.py +++ b/test_runner/regress/test_normal_work.py @@ -64,6 +64,11 @@ def test_normal_work( """ neon_env_builder.num_safekeepers = num_safekeepers + + if safekeeper_proto_version == 2: + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 2590a3fe9d..2b71662669 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -671,6 +671,12 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu """ neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + # On the new mode, the test runs into a cancellation issue, i.e. the walproposer can't shut down + # as it is hang-waiting on the timeline_checkpoint call in WalIngest::new. + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + # turn off background tasks so that they don't interfere with the downloads env = neon_env_builder.init_start( initial_tenant_conf={ diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 8f3aa010e3..74ba74645e 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -88,6 +88,12 @@ def test_storage_controller_smoke( neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api env = neon_env_builder.init_configs() + # These bubble up from safekeepers + for ps in env.pageservers: + ps.allowed_errors.extend( + [".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"] + ) + # Start services by hand so that we can skip a pageserver (this will start + register later) env.broker.start() env.storage_controller.start() @@ -3455,7 +3461,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): assert target.get_safekeeper(fake_id) is None - assert len(target.get_safekeepers()) == 0 + start_sks = target.get_safekeepers() sk_0 = env.safekeepers[0] @@ -3477,7 +3483,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): inserted = target.get_safekeeper(fake_id) assert inserted is not None - assert target.get_safekeepers() == [inserted] + assert target.get_safekeepers() == start_sks + [inserted] assert eq_safekeeper_records(body, inserted) # error out if pk is changed (unexpected) @@ -3489,7 +3495,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): assert exc.value.status_code == 400 inserted_again = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_again] + assert target.get_safekeepers() == start_sks + [inserted_again] assert inserted_again is not None assert eq_safekeeper_records(inserted, inserted_again) @@ -3498,7 +3504,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["version"] += 1 target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) @@ -3507,7 +3513,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["https_port"] = 123 target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) env.storage_controller.consistency_check() @@ -3516,7 +3522,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["https_port"] = None target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) env.storage_controller.consistency_check() @@ -3635,6 +3641,9 @@ def test_timeline_delete_mid_live_migration(neon_env_builder: NeonEnvBuilder, mi env = neon_env_builder.init_configs() env.start() + for ps in env.pageservers: + ps.allowed_errors.append(".*Timeline.* has been deleted.*") + tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.storage_controller.tenant_create(tenant_id, placement_policy={"Attached": 1}) diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index 03cd133ccb..e29cb801d5 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -341,6 +341,11 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder env = neon_env_builder.init_configs() env.start() + for ps in env.pageservers: + ps.allowed_errors.extend( + [".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"] + ) + tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.create_tenant( diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index f0810270b1..c58f78aeb1 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -21,7 +21,10 @@ from fixtures.neon_fixtures import ( last_flush_lsn_upload, wait_for_last_flush_lsn, ) -from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException +from fixtures.pageserver.http import ( + HistoricLayerInfo, + PageserverApiException, +) from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404 from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.utils import assert_pageserver_backups_equal, skip_in_debug_build, wait_until @@ -413,6 +416,7 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots "read_only": True, }, ) + sk = env.safekeepers[0] assert sk with pytest.raises(requests.exceptions.HTTPError, match="Not Found"): @@ -504,8 +508,15 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots assert len(lineage.get("original_ancestor", [])) == 0 assert len(lineage.get("reparenting_history", [])) == 0 - for name, _, _, rows, starts in expected_result: - with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep: + for branch_name, queried_timeline, _, rows, starts in expected_result: + details = client.timeline_detail(env.initial_tenant, queried_timeline) + log.info(f"reading data from branch {branch_name}") + # specifying the lsn makes the endpoint read-only and not connect to safekeepers + with env.endpoints.create( + branch_name, + lsn=Lsn(details["last_record_lsn"]), + ) as ep: + ep.start(safekeeper_generation=1) assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1 @@ -1088,6 +1099,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion( for ps in env.pageservers: ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + ps.allowed_errors.append(".*Timeline.* has been deleted.*") pageservers = dict((int(p.id), p) for p in env.pageservers) @@ -1209,6 +1221,7 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv for ps in env.pageservers: ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + ps.allowed_errors.append(".*Timeline.* has been deleted.*") pageservers = dict((int(p.id), p) for p in env.pageservers) diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index 9a710f5b80..8ef64a0742 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -24,6 +24,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}, initial_tenant_shard_count=2 if sharded else None, ) + for ps in env.pageservers: + ps.allowed_errors.append(".*Timeline.* has been deleted.*") if sharded: http = env.storage_controller.pageserver_api() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index b9183286af..ea120c1814 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -229,7 +229,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder): # Test timeline_list endpoint. http_cli = env.safekeepers[0].http_client() - assert len(http_cli.timeline_list()) == 3 + assert len(http_cli.timeline_list()) == 4 # Check that dead minority doesn't prevent the commits: execute insert n_inserts @@ -740,8 +740,8 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.create_branch("test_timeline_status") - endpoint = env.endpoints.create_start("test_timeline_status") + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main") wa = env.safekeepers[0] @@ -1292,6 +1292,12 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder): # it works without compute at all. def test_peer_recovery(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 + + # timelines should be created the old way + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() tenant_id = env.initial_tenant @@ -1532,6 +1538,11 @@ def test_safekeeper_without_pageserver( def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): + # timelines should be created the old way manually until we have migration support + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + def execute_payload(endpoint: Endpoint): with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -1661,6 +1672,15 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool): res = env.safekeepers[3].pull_timeline( [env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id ) + sk_id_1 = env.safekeepers[0].safekeeper_id() + sk_id_3 = env.safekeepers[2].safekeeper_id() + sk_id_4 = env.safekeepers[3].safekeeper_id() + new_conf = MembershipConfiguration( + generation=2, members=[sk_id_1, sk_id_3, sk_id_4], new_members=None + ) + for i in [0, 2, 3]: + env.safekeepers[i].http_client().membership_switch(tenant_id, timeline_id, new_conf) + log.info("Finished pulling timeline") log.info(res) @@ -1705,13 +1725,15 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) env = neon_env_builder.init_start() - tenant_id = env.initial_tenant - timeline_id = env.initial_timeline (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + dst_sk.stop() + + [tenant_id, timeline_id] = env.create_tenant() + log.info("use only first 2 safekeepers, 3rd will be seeded") - endpoint = env.endpoints.create("main") + endpoint = env.endpoints.create("main", tenant_id=tenant_id) endpoint.active_safekeepers = [1, 2] endpoint.start() endpoint.safe_psql("create table t(key int, value text)") @@ -1723,6 +1745,7 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): src_http = src_sk.http_client() # run pull_timeline which will halt before downloading files src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) + dst_sk.start() pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) @@ -1782,23 +1805,27 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.initial_timeline (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + dst_sk.stop() + src_http = src_sk.http_client() + src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) + + timeline_id = env.create_branch("pull_timeline_term_changes") + + # run pull_timeline which will halt before downloading files log.info("use only first 2 safekeepers, 3rd will be seeded") - ep = env.endpoints.create("main") + ep = env.endpoints.create("pull_timeline_term_changes") ep.active_safekeepers = [1, 2] ep.start() ep.safe_psql("create table t(key int, value text)") ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'") - src_http = src_sk.http_client() - # run pull_timeline which will halt before downloading files - src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) + dst_sk.start() pt_handle.start() src_sk.wait_until_paused("sk-snapshot-after-list-pausable") @@ -1807,7 +1834,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): # restart compute to bump term ep.stop() - ep = env.endpoints.create("main") + ep = env.endpoints.create("pull_timeline_term_changes") ep.active_safekeepers = [1, 2] ep.start() ep.safe_psql("insert into t select generate_series(1, 100), 'pear'") @@ -1929,6 +1956,11 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder): @run_only_on_default_postgres("tests only safekeeper API") def test_membership_api(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 1 + # timelines should be created the old way + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() # These are expected after timeline deletion on safekeepers. @@ -2009,6 +2041,12 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): created manually, later storcon will do that. """ neon_env_builder.num_safekeepers = 3 + + # timelines should be created the old way manually + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() tenant_id = env.initial_tenant @@ -2064,7 +2102,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.create_branch("test_idle_reconnections") + timeline_id = env.initial_timeline def collect_stats() -> dict[str, float]: # we need to collect safekeeper_pg_queries_received_total metric from all safekeepers @@ -2095,7 +2133,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): collect_stats() - endpoint = env.endpoints.create_start("test_idle_reconnections") + endpoint = env.endpoints.create_start("main") # just write something to the timeline endpoint.safe_psql("create table t(i int)") collect_stats() diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index d8a7dc2a2b..1bad387a90 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -590,6 +590,13 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int): @pytest.mark.parametrize("safekeeper_proto_version", [2, 3]) def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int): neon_env_builder.num_safekeepers = 3 + if safekeeper_proto_version == 2: + # On the legacy protocol, we don't support generations, which are part of + # `timelines_onto_safekeepers` + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() asyncio.run(run_wal_truncation(env, safekeeper_proto_version)) @@ -713,6 +720,11 @@ async def run_quorum_sanity(env: NeonEnv): # we don't. def test_quorum_sanity(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 4 + + # The test fails basically always on the new mode. + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } env = neon_env_builder.init_start() asyncio.run(run_quorum_sanity(env)) diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 0252b590cc..d281c055b0 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -16,6 +16,13 @@ if TYPE_CHECKING: # Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout. # Ensures that walreceiver does not run without any data inserted and only starts after the insertion. def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): + # we assert below that the walreceiver is not active before data writes. + # with manually created timelines, it is active. + # FIXME: remove this test once we remove timelines_onto_safekeepers + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + # Trigger WAL wait timeout faster neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" env = neon_env_builder.init_start() From a6d4de25cd9ac335e08f9e89cd2b2b49b09ef774 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 19 Jun 2025 14:20:02 +0000 Subject: [PATCH 8/9] build(deps): bump urllib3 from 1.26.19 to 2.5.0 in the pip group across 1 directory (#12289) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- poetry.lock | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/poetry.lock b/poetry.lock index f9b6f83366..1bc5077eb7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -746,23 +746,23 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"] [[package]] name = "botocore" -version = "1.34.11" +version = "1.34.162" description = "Low-level, data-driven core of boto 3." optional = false -python-versions = ">= 3.8" +python-versions = ">=3.8" groups = ["main"] files = [ - {file = "botocore-1.34.11-py3-none-any.whl", hash = "sha256:1ff1398b6ea670e1c01ac67a33af3da854f8e700d3528289c04f319c330d8250"}, - {file = "botocore-1.34.11.tar.gz", hash = "sha256:51905c3d623c60df5dc5794387de7caf886d350180a01a3dfa762e903edb45a9"}, + {file = "botocore-1.34.162-py3-none-any.whl", hash = "sha256:2d918b02db88d27a75b48275e6fb2506e9adaaddbec1ffa6a8a0898b34e769be"}, + {file = "botocore-1.34.162.tar.gz", hash = "sha256:adc23be4fb99ad31961236342b7cbf3c0bfc62532cd02852196032e8c0d682f3"}, ] [package.dependencies] jmespath = ">=0.7.1,<2.0.0" python-dateutil = ">=2.1,<3.0.0" -urllib3 = {version = ">=1.25.4,<2.1", markers = "python_version >= \"3.10\""} +urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""} [package.extras] -crt = ["awscrt (==0.19.19)"] +crt = ["awscrt (==0.21.2)"] [[package]] name = "botocore-stubs" @@ -3422,20 +3422,21 @@ files = [ [[package]] name = "urllib3" -version = "1.26.19" +version = "2.5.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "urllib3-1.26.19-py2.py3-none-any.whl", hash = "sha256:37a0344459b199fce0e80b0d3569837ec6b6937435c5244e7fd73fa6006830f3"}, - {file = "urllib3-1.26.19.tar.gz", hash = "sha256:3e3d753a8618b86d7de333b4223005f68720bcd6a7d2bcb9fbd2229ec7c1e429"}, + {file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"}, + {file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"}, ] [package.extras] -brotli = ["brotli (==1.0.9) ; os_name != \"nt\" and python_version < \"3\" and platform_python_implementation == \"CPython\"", "brotli (>=1.0.9) ; python_version >= \"3\" and platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; (os_name != \"nt\" or python_version >= \"3\") and platform_python_implementation != \"CPython\"", "brotlipy (>=0.6.0) ; os_name == \"nt\" and python_version < \"3\""] -secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress ; python_version == \"2.7\"", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] -socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] +brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] +h2 = ["h2 (>=4,<5)"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] [[package]] name = "websockets" From dc1625cd8e5c5fcc478a017261e28702ae9052e4 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 19 Jun 2025 17:40:57 +0200 Subject: [PATCH 9/9] pagebench: add `basebackup` gRPC support (#12250) ## Problem Pagebench does not support gRPC for `basebackup` benchmarks. Requires #12243. Touches #11728. ## Summary of changes Add gRPC support via gRPC connstrings, e.g. `pagebench basebackup --page-service-connstring grpc://localhost:51051`. Also change `--gzip-probability` to `--no-compression`, since this must be specified per-client for gRPC. --- pageserver/page_api/src/client.rs | 2 +- pageserver/pagebench/src/cmd/basebackup.rs | 179 +++++++++++++++------ 2 files changed, 131 insertions(+), 50 deletions(-) diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 274f036f3d..aa4774c056 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -121,7 +121,7 @@ impl Client { pub async fn get_base_backup( &mut self, req: model::GetBaseBackupRequest, - ) -> Result>, tonic::Status> { + ) -> Result> + 'static, tonic::Status> { let proto_req = proto::GetBaseBackupRequest::from(req); let response_stream: Streaming = diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 43ad92980c..8015db528d 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -1,20 +1,29 @@ use std::collections::HashMap; use std::num::NonZeroUsize; use std::ops::Range; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; -use anyhow::Context; +use anyhow::anyhow; +use futures::TryStreamExt as _; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; +use pageserver_page_api as page_api; use rand::prelude::*; +use reqwest::Url; +use tokio::io::AsyncRead; use tokio::sync::Barrier; use tokio::task::JoinSet; +use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; +use tokio_util::io::StreamReader; +use tonic::async_trait; use tracing::{info, instrument}; use utils::id::TenantTimelineId; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; @@ -24,14 +33,15 @@ use crate::util::{request_stats, tokio_thread_local_stats}; pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, - #[clap(long, default_value = "postgres://postgres@localhost:64000")] + /// The Pageserver to connect to. Use postgresql:// for libpq, or grpc:// for gRPC. + #[clap(long, default_value = "postgresql://postgres@localhost:64000")] page_service_connstring: String, #[clap(long)] pageserver_jwt: Option, #[clap(long, default_value = "1")] num_clients: NonZeroUsize, - #[clap(long, default_value = "1.0")] - gzip_probability: f64, + #[clap(long)] + no_compression: bool, #[clap(long)] runtime: Option, #[clap(long)] @@ -146,12 +156,23 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); - for tl in &timelines { + let connurl = Url::parse(&args.page_service_connstring)?; + for &tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); - tasks.push(tokio::spawn(client( - args, - *tl, + + let client: Box = match connurl.scheme() { + "postgresql" | "postgres" => Box::new( + LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, + ), + "grpc" => Box::new( + GrpcClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, + ), + scheme => return Err(anyhow!("invalid scheme {scheme}")), + }; + + tasks.push(tokio::spawn(run_worker( + client, Arc::clone(&start_work_barrier), receiver, Arc::clone(&all_work_done_barrier), @@ -166,13 +187,7 @@ async fn main_impl( let mut rng = rand::thread_rng(); let target = all_targets.choose(&mut rng).unwrap(); let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r)); - ( - target.timeline, - Work { - lsn, - gzip: rng.gen_bool(args.gzip_probability), - }, - ) + (target.timeline, Work { lsn }) }; let sender = work_senders.get(&timeline).unwrap(); // TODO: what if this blocks? @@ -216,13 +231,11 @@ async fn main_impl( #[derive(Copy, Clone)] struct Work { lsn: Option, - gzip: bool, } #[instrument(skip_all)] -async fn client( - args: &'static Args, - timeline: TenantTimelineId, +async fn run_worker( + mut client: Box, start_work_barrier: Arc, mut work: tokio::sync::mpsc::Receiver, all_work_done_barrier: Arc, @@ -230,37 +243,14 @@ async fn client( ) { start_work_barrier.wait().await; - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - - while let Some(Work { lsn, gzip }) = work.recv().await { + while let Some(Work { lsn }) = work.recv().await { let start = Instant::now(); - let copy_out_stream = client - .basebackup(&BasebackupRequest { - tenant_id: timeline.tenant_id, - timeline_id: timeline.timeline_id, - lsn, - gzip, - }) - .await - .with_context(|| format!("start basebackup for {timeline}")) - .unwrap(); + let stream = client.basebackup(lsn).await.unwrap(); - use futures::StreamExt; - let size = Arc::new(AtomicUsize::new(0)); - copy_out_stream - .for_each({ - |r| { - let size = Arc::clone(&size); - async move { - let size = Arc::clone(&size); - size.fetch_add(r.unwrap().len(), Ordering::Relaxed); - } - } - }) - .await; - info!("basebackup size is {} bytes", size.load(Ordering::Relaxed)); + let size = futures::io::copy(stream.compat(), &mut tokio::io::sink().compat_write()) + .await + .unwrap(); + info!("basebackup size is {size} bytes"); let elapsed = start.elapsed(); live_stats.inc(); STATS.with(|stats| { @@ -270,3 +260,94 @@ async fn client( all_work_done_barrier.wait().await; } + +/// A basebackup client. This allows switching out the client protocol implementation. +#[async_trait] +trait Client: Send { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>>; +} + +/// A libpq-based Pageserver client. +struct LibpqClient { + inner: pageserver_client::page_service::Client, + ttid: TenantTimelineId, + compression: bool, +} + +impl LibpqClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + Ok(Self { + inner: pageserver_client::page_service::Client::new(connstring.to_string()).await?, + ttid, + compression, + }) + } +} + +#[async_trait] +impl Client for LibpqClient { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>> { + let req = BasebackupRequest { + tenant_id: self.ttid.tenant_id, + timeline_id: self.ttid.timeline_id, + lsn, + gzip: self.compression, + }; + let stream = self.inner.basebackup(&req).await?; + Ok(Box::pin(StreamReader::new( + stream.map_err(std::io::Error::other), + ))) + } +} + +/// A gRPC Pageserver client. +struct GrpcClient { + inner: page_api::Client, +} + +impl GrpcClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let inner = page_api::Client::new( + connstring.to_string(), + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + ) + .await?; + Ok(Self { inner }) + } +} + +#[async_trait] +impl Client for GrpcClient { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>> { + let req = page_api::GetBaseBackupRequest { + lsn, + replica: false, + full: false, + }; + let stream = self.inner.get_base_backup(req).await?; + Ok(Box::pin(StreamReader::new( + stream.map_err(std::io::Error::other), + ))) + } +}