diff --git a/Cargo.lock b/Cargo.lock index e6b8399b5e..5ab26b02fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4334,6 +4334,7 @@ dependencies = [ "postgres_backend", "postgres_connection", "postgres_ffi", + "postgres_ffi_types", "postgres_initdb", "posthog_client_lite", "pprof", @@ -4403,7 +4404,7 @@ dependencies = [ "nix 0.30.1", "once_cell", "postgres_backend", - "postgres_ffi", + "postgres_ffi_types", "rand 0.8.5", "remote_storage", "reqwest", @@ -4892,6 +4893,7 @@ dependencies = [ "memoffset 0.9.0", "once_cell", "postgres", + "postgres_ffi_types", "pprof", "regex", "serde", @@ -4900,6 +4902,14 @@ dependencies = [ "utils", ] +[[package]] +name = "postgres_ffi_types" +version = "0.1.0" +dependencies = [ + "thiserror 1.0.69", + "workspace_hack", +] + [[package]] name = "postgres_initdb" version = "0.1.0" @@ -7561,6 +7571,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", @@ -7580,6 +7591,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd", ] [[package]] @@ -8159,6 +8171,7 @@ dependencies = [ "futures", "pageserver_api", "postgres_ffi", + "postgres_ffi_types", "pprof", "prost 0.13.5", "remote_storage", diff --git a/Cargo.toml b/Cargo.toml index 2f4fcbc249..2a6acc132e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "libs/http-utils", "libs/pageserver_api", "libs/postgres_ffi", + "libs/postgres_ffi_types", "libs/safekeeper_api", "libs/desim", "libs/neon-shmem", @@ -199,7 +200,7 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" -tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } +tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] } tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } @@ -259,6 +260,7 @@ pageserver_page_api = { path = "./pageserver/page_api" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" } postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" } +postgres_ffi_types = { version = "0.1", path = "./libs/postgres_ffi_types/" } postgres_initdb = { path = "./libs/postgres_initdb" } posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" } pq_proto = { version = "0.1", path = "./libs/pq_proto/" } diff --git a/Dockerfile b/Dockerfile index 0b7ef491fd..f72d7d9bbc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,8 +5,6 @@ ARG REPOSITORY=ghcr.io/neondatabase ARG IMAGE=build-tools ARG TAG=pinned -ARG DEFAULT_PG_VERSION=17 -ARG STABLE_PG_VERSION=16 ARG DEBIAN_VERSION=bookworm ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim @@ -63,14 +61,11 @@ FROM $REPOSITORY/$IMAGE:$TAG AS build WORKDIR /home/nonroot ARG GIT_VERSION=local ARG BUILD_TAG -ARG STABLE_PG_VERSION COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server -COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib -COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib COPY --from=plan /home/nonroot/recipe.json recipe.json ARG ADDITIONAL_RUSTFLAGS="" @@ -97,7 +92,6 @@ RUN set -e \ # Build final image # FROM $BASE_IMAGE_SHA -ARG DEFAULT_PG_VERSION WORKDIR /data RUN set -e \ @@ -107,8 +101,6 @@ RUN set -e \ libreadline-dev \ libseccomp-dev \ ca-certificates \ - # System postgres for use with client libraries (e.g. in storage controller) - postgresql-15 \ openssl \ unzip \ curl \ diff --git a/Makefile b/Makefile index 0911465fb8..5130e17e59 100644 --- a/Makefile +++ b/Makefile @@ -167,13 +167,6 @@ postgres-%: postgres-configure-% \ +@echo "Compiling test_decoding $*" $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install -.PHONY: postgres-clean-% -postgres-clean-%: - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean - .PHONY: postgres-check-% postgres-check-%: postgres-% $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check @@ -206,21 +199,6 @@ neon-pg-ext-%: postgres-% -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install -.PHONY: neon-pg-clean-ext-% -neon-pg-clean-ext-%: - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean - # Build walproposer as a static library. walproposer source code is located # in the pgxn/neon directory. # @@ -253,12 +231,6 @@ ifeq ($(UNAME_S),Linux) pg_crc32c.o endif -.PHONY: walproposer-lib-clean -walproposer-lib-clean: - $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config \ - -C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \ - -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean - .PHONY: neon-pg-ext neon-pg-ext: \ neon-pg-ext-v14 \ @@ -266,13 +238,6 @@ neon-pg-ext: \ neon-pg-ext-v16 \ neon-pg-ext-v17 -.PHONY: neon-pg-clean-ext -neon-pg-clean-ext: \ - neon-pg-clean-ext-v14 \ - neon-pg-clean-ext-v15 \ - neon-pg-clean-ext-v16 \ - neon-pg-clean-ext-v17 - # shorthand to build all Postgres versions .PHONY: postgres postgres: \ @@ -288,13 +253,6 @@ postgres-headers: \ postgres-headers-v16 \ postgres-headers-v17 -.PHONY: postgres-clean -postgres-clean: \ - postgres-clean-v14 \ - postgres-clean-v15 \ - postgres-clean-v16 \ - postgres-clean-v17 - .PHONY: postgres-check postgres-check: \ postgres-check-v14 \ @@ -302,12 +260,6 @@ postgres-check: \ postgres-check-v16 \ postgres-check-v17 -# This doesn't remove the effects of 'configure'. -.PHONY: clean -clean: postgres-clean neon-pg-clean-ext - $(MAKE) -C compute clean - $(CARGO_CMD_PREFIX) cargo clean - # This removes everything .PHONY: distclean distclean: diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d42e3cc860..7a7f2dfedc 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -2320,8 +2320,6 @@ pub fn forward_termination_signal(dev_mode: bool) { } if !dev_mode { - info!("not in dev mode, terminating pgbouncer"); - // Terminate pgbouncer with SIGKILL match pid_file::read(PGBOUNCER_PIDFILE.into()) { Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => { @@ -2353,25 +2351,27 @@ pub fn forward_termination_signal(dev_mode: bool) { error!("error reading pgbouncer pid file: {}", e); } } - } - // Terminate local_proxy - match pid_file::read("/etc/local_proxy/pid".into()) { - Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => { - info!("sending SIGTERM to local_proxy process pid: {}", pid); - if let Err(e) = kill(pid, Signal::SIGTERM) { - error!("failed to terminate local_proxy: {}", e); + // Terminate local_proxy + match pid_file::read("/etc/local_proxy/pid".into()) { + Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => { + info!("sending SIGTERM to local_proxy process pid: {}", pid); + if let Err(e) = kill(pid, Signal::SIGTERM) { + error!("failed to terminate local_proxy: {}", e); + } + } + Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => { + info!("local_proxy PID file exists but process not running"); + } + Ok(pid_file::PidFileRead::NotExist) => { + info!("local_proxy PID file not found, process may not be running"); + } + Err(e) => { + error!("error reading local_proxy PID file: {}", e); } } - Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => { - info!("local_proxy PID file exists but process not running"); - } - Ok(pid_file::PidFileRead::NotExist) => { - info!("local_proxy PID file not found, process may not be running"); - } - Err(e) => { - error!("error reading local_proxy PID file: {}", e); - } + } else { + info!("Skipping pgbouncer and local_proxy termination because in dev mode"); } let pg_pid = PG_PID.load(Ordering::SeqCst); diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 47b77f0720..1b231151ce 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -236,7 +236,7 @@ impl Default for NeonStorageControllerConf { heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL, long_reconcile_threshold: None, use_https_pageserver_api: false, - timelines_onto_safekeepers: false, + timelines_onto_safekeepers: true, use_https_safekeeper_api: false, use_local_compute_notifications: true, } diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 25f29b8ecd..5a9a74b93d 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -17,7 +17,7 @@ anyhow.workspace = true bytes.workspace = true byteorder.workspace = true utils.workspace = true -postgres_ffi.workspace = true +postgres_ffi_types.workspace = true enum-map.workspace = true strum.workspace = true strum_macros.workspace = true diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index c14975167b..102bbee879 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -4,8 +4,8 @@ use std::ops::Range; use anyhow::{Result, bail}; use byteorder::{BE, ByteOrder}; use bytes::Bytes; -use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; -use postgres_ffi::{Oid, RepOriginId}; +use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; +use postgres_ffi_types::{Oid, RepOriginId}; use serde::{Deserialize, Serialize}; use utils::const_assert; @@ -194,7 +194,7 @@ impl Key { /// will be rejected on the write path. #[allow(dead_code)] pub fn is_valid_key_on_write_path_strong(&self) -> bool { - use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; + use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; if !self.is_i128_representable() { return false; } diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 79e3ef553b..1b48d3c462 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -1,7 +1,6 @@ use std::ops::Range; use itertools::Itertools; -use postgres_ffi::BLCKSZ; use crate::key::Key; use crate::shard::{ShardCount, ShardIdentity}; @@ -269,9 +268,13 @@ impl KeySpace { /// Partition a key space into roughly chunks of roughly 'target_size' bytes /// in each partition. /// - pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning { - // Assume that each value is 8k in size. - let target_nblocks = (target_size / BLCKSZ as u64) as u32; + pub fn partition( + &self, + shard_identity: &ShardIdentity, + target_size: u64, + block_size: u64, + ) -> KeyPartitioning { + let target_nblocks = (target_size / block_size) as u32; let mut parts = Vec::new(); let mut current_part = Vec::new(); diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index 6c91d61508..52aed7a2c2 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -6,11 +6,9 @@ pub mod key; pub mod keyspace; pub mod models; pub mod pagestream_api; -pub mod record; pub mod reltag; pub mod shard; /// Public API types pub mod upcall_api; -pub mod value; pub mod config; diff --git a/libs/pageserver_api/src/pagestream_api.rs b/libs/pageserver_api/src/pagestream_api.rs index fba64c82d9..862da8268a 100644 --- a/libs/pageserver_api/src/pagestream_api.rs +++ b/libs/pageserver_api/src/pagestream_api.rs @@ -8,9 +8,15 @@ use crate::reltag::RelTag; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; +/// Block size. +/// +/// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode +/// that in the protocol, because Postgres supports different block sizes as a compile +/// time option. +const BLCKSZ: usize = 8192; + // Wrapped in libpq CopyData #[derive(PartialEq, Eq, Debug)] pub enum PagestreamFeMessage { @@ -443,7 +449,7 @@ impl PagestreamBeMessage { Self::GetSlruSegment(resp) => { bytes.put_u8(Tag::GetSlruSegment as u8); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put_u32((resp.segment.len() / BLCKSZ) as u32); bytes.put(&resp.segment[..]); } @@ -520,7 +526,7 @@ impl PagestreamBeMessage { bytes.put_u64(resp.req.hdr.not_modified_since.0); bytes.put_u8(resp.req.kind); bytes.put_u32(resp.req.segno); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put_u32((resp.segment.len() / BLCKSZ) as u32); bytes.put(&resp.segment[..]); } @@ -662,7 +668,7 @@ impl PagestreamBeMessage { let kind = buf.read_u8()?; let segno = buf.read_u32::()?; let n_blocks = buf.read_u32::()?; - let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize]; + let mut segment = vec![0; n_blocks as usize * BLCKSZ]; buf.read_exact(&mut segment)?; Self::GetSlruSegment(PagestreamGetSlruSegmentResponse { req: PagestreamGetSlruSegmentRequest { diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index e0dd4fdfe8..d0e37dffae 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -1,9 +1,9 @@ use std::cmp::Ordering; use std::fmt; -use postgres_ffi::Oid; -use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; -use postgres_ffi::relfile_utils::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name}; +use postgres_ffi_types::Oid; +use postgres_ffi_types::constants::GLOBALTABLESPACE_OID; +use postgres_ffi_types::forknum::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name}; use serde::{Deserialize, Serialize}; /// diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index feb59f5070..9c16be93e8 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -35,7 +35,7 @@ use std::hash::{Hash, Hasher}; #[doc(inline)] pub use ::utils::shard::*; -use postgres_ffi::relfile_utils::INIT_FORKNUM; +use postgres_ffi_types::forknum::INIT_FORKNUM; use serde::{Deserialize, Serialize}; use crate::key::Key; diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index b7a376841d..67adfdd3c3 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -16,6 +16,7 @@ memoffset.workspace = true pprof.workspace = true thiserror.workspace = true serde.workspace = true +postgres_ffi_types.workspace = true utils.workspace = true tracing.workspace = true diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index b0bdd8a8da..f61b9a71c2 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -11,11 +11,7 @@ use crate::{BLCKSZ, PageHeaderData}; -// -// From pg_tablespace_d.h -// -pub const DEFAULTTABLESPACE_OID: u32 = 1663; -pub const GLOBALTABLESPACE_OID: u32 = 1664; +// Note: There are a few more widely-used constants in the postgres_ffi_types::constants crate. // From storage_xlog.h pub const XLOG_SMGR_CREATE: u8 = 0x10; diff --git a/libs/postgres_ffi/src/relfile_utils.rs b/libs/postgres_ffi/src/relfile_utils.rs index aa0e625b47..38f94b7221 100644 --- a/libs/postgres_ffi/src/relfile_utils.rs +++ b/libs/postgres_ffi/src/relfile_utils.rs @@ -4,50 +4,7 @@ use once_cell::sync::OnceCell; use regex::Regex; -// -// Fork numbers, from relpath.h -// -pub const MAIN_FORKNUM: u8 = 0; -pub const FSM_FORKNUM: u8 = 1; -pub const VISIBILITYMAP_FORKNUM: u8 = 2; -pub const INIT_FORKNUM: u8 = 3; - -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -pub enum FilePathError { - #[error("invalid relation fork name")] - InvalidForkName, - #[error("invalid relation data file name")] - InvalidFileName, -} - -impl From for FilePathError { - fn from(_e: core::num::ParseIntError) -> Self { - FilePathError::InvalidFileName - } -} - -/// Convert Postgres relation file's fork suffix to fork number. -pub fn forkname_to_number(forkname: Option<&str>) -> Result { - match forkname { - // "main" is not in filenames, it's implicit if the fork name is not present - None => Ok(MAIN_FORKNUM), - Some("fsm") => Ok(FSM_FORKNUM), - Some("vm") => Ok(VISIBILITYMAP_FORKNUM), - Some("init") => Ok(INIT_FORKNUM), - Some(_) => Err(FilePathError::InvalidForkName), - } -} - -/// Convert Postgres fork number to the right suffix of the relation data file. -pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { - match forknum { - MAIN_FORKNUM => None, - FSM_FORKNUM => Some("fsm"), - VISIBILITYMAP_FORKNUM => Some("vm"), - INIT_FORKNUM => Some("init"), - _ => Some("UNKNOWN FORKNUM"), - } -} +use postgres_ffi_types::forknum::*; /// Parse a filename of a relation file. Returns (relfilenode, forknum, segno) tuple. /// @@ -75,7 +32,9 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> { .ok_or(FilePathError::InvalidFileName)?; let relnode_str = caps.name("relnode").unwrap().as_str(); - let relnode = relnode_str.parse::()?; + let relnode = relnode_str + .parse::() + .map_err(|_e| FilePathError::InvalidFileName)?; let forkname = caps.name("forkname").map(|f| f.as_str()); let forknum = forkname_to_number(forkname)?; @@ -84,7 +43,11 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> { let segno = if segno_match.is_none() { 0 } else { - segno_match.unwrap().as_str().parse::()? + segno_match + .unwrap() + .as_str() + .parse::() + .map_err(|_e| FilePathError::InvalidFileName)? }; Ok((relnode, forknum, segno)) diff --git a/libs/postgres_ffi_types/Cargo.toml b/libs/postgres_ffi_types/Cargo.toml new file mode 100644 index 0000000000..50c6fc7874 --- /dev/null +++ b/libs/postgres_ffi_types/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "postgres_ffi_types" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +thiserror.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" } + +[dev-dependencies] diff --git a/libs/postgres_ffi_types/src/constants.rs b/libs/postgres_ffi_types/src/constants.rs new file mode 100644 index 0000000000..c1a004c5ab --- /dev/null +++ b/libs/postgres_ffi_types/src/constants.rs @@ -0,0 +1,8 @@ +//! Misc constants, copied from PostgreSQL headers. +//! +//! Any constants included here must be the same in all PostgreSQL versions and unlikely to change +//! in the future either! + +// From pg_tablespace_d.h +pub const DEFAULTTABLESPACE_OID: u32 = 1663; +pub const GLOBALTABLESPACE_OID: u32 = 1664; diff --git a/libs/postgres_ffi_types/src/forknum.rs b/libs/postgres_ffi_types/src/forknum.rs new file mode 100644 index 0000000000..9b225d8ce5 --- /dev/null +++ b/libs/postgres_ffi_types/src/forknum.rs @@ -0,0 +1,36 @@ +// Fork numbers, from relpath.h +pub const MAIN_FORKNUM: u8 = 0; +pub const FSM_FORKNUM: u8 = 1; +pub const VISIBILITYMAP_FORKNUM: u8 = 2; +pub const INIT_FORKNUM: u8 = 3; + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +pub enum FilePathError { + #[error("invalid relation fork name")] + InvalidForkName, + #[error("invalid relation data file name")] + InvalidFileName, +} + +/// Convert Postgres relation file's fork suffix to fork number. +pub fn forkname_to_number(forkname: Option<&str>) -> Result { + match forkname { + // "main" is not in filenames, it's implicit if the fork name is not present + None => Ok(MAIN_FORKNUM), + Some("fsm") => Ok(FSM_FORKNUM), + Some("vm") => Ok(VISIBILITYMAP_FORKNUM), + Some("init") => Ok(INIT_FORKNUM), + Some(_) => Err(FilePathError::InvalidForkName), + } +} + +/// Convert Postgres fork number to the right suffix of the relation data file. +pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { + match forknum { + MAIN_FORKNUM => None, + FSM_FORKNUM => Some("fsm"), + VISIBILITYMAP_FORKNUM => Some("vm"), + INIT_FORKNUM => Some("init"), + _ => Some("UNKNOWN FORKNUM"), + } +} diff --git a/libs/postgres_ffi_types/src/lib.rs b/libs/postgres_ffi_types/src/lib.rs new file mode 100644 index 0000000000..84ef499b9f --- /dev/null +++ b/libs/postgres_ffi_types/src/lib.rs @@ -0,0 +1,13 @@ +//! This package contains some PostgreSQL constants and datatypes that are the same in all versions +//! of PostgreSQL and unlikely to change in the future either. These could be derived from the +//! PostgreSQL headers with 'bindgen', but in order to avoid proliferating the dependency to bindgen +//! and the PostgreSQL C headers to all services, we prefer to have this small stand-alone crate for +//! them instead. +//! +//! Be mindful in what you add here, as these types are deeply ingrained in the APIs. + +pub mod constants; +pub mod forknum; + +pub type Oid = u32; +pub type RepOriginId = u16; diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml index cb0ef4b00d..600ef091f5 100644 --- a/libs/wal_decoder/Cargo.toml +++ b/libs/wal_decoder/Cargo.toml @@ -14,6 +14,7 @@ bytes.workspace = true pageserver_api.workspace = true prost.workspace = true postgres_ffi.workspace = true +postgres_ffi_types.workspace = true serde.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["io-util"] } diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index cb0835e894..9980a1f369 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -8,8 +8,8 @@ use pageserver_api::key::rel_block_to_key; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; use postgres_ffi::pg_constants; -use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::walrecord::*; +use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM; use utils::lsn::Lsn; use crate::models::*; diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 7e1934c6c3..94a00c0e53 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -25,6 +25,9 @@ //! | //! |--> write to KV store within the pageserver +pub mod record; +pub mod value; + use bytes::Bytes; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::walrecord::{ diff --git a/libs/pageserver_api/src/record.rs b/libs/wal_decoder/src/models/record.rs similarity index 100% rename from libs/pageserver_api/src/record.rs rename to libs/wal_decoder/src/models/record.rs diff --git a/libs/pageserver_api/src/value.rs b/libs/wal_decoder/src/models/value.rs similarity index 99% rename from libs/pageserver_api/src/value.rs rename to libs/wal_decoder/src/models/value.rs index e9000939c3..3b4f896a45 100644 --- a/libs/pageserver_api/src/value.rs +++ b/libs/wal_decoder/src/models/value.rs @@ -10,7 +10,7 @@ use bytes::Bytes; use serde::{Deserialize, Serialize}; -use crate::record::NeonWalRecord; +use crate::models::record::NeonWalRecord; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum Value { diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index b451d6d8e0..4123f7d0ac 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -1,4 +1,4 @@ -//! This module implements batch type for serialized [`pageserver_api::value::Value`] +//! This module implements batch type for serialized [`crate::models::value::Value`] //! instances. Each batch contains a raw buffer (serialized values) //! and a list of metadata for each (key, LSN) tuple present in the batch. //! @@ -10,10 +10,8 @@ use std::collections::{BTreeSet, HashMap}; use bytes::{Bytes, BytesMut}; use pageserver_api::key::{CompactKey, Key, rel_block_to_key}; use pageserver_api::keyspace::KeySpace; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::RelTag; use pageserver_api::shard::ShardIdentity; -use pageserver_api::value::Value; use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord}; use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants}; use serde::{Deserialize, Serialize}; @@ -21,6 +19,8 @@ use utils::bin_ser::BeSer; use utils::lsn::Lsn; use crate::models::InterpretedWalRecord; +use crate::models::record::NeonWalRecord; +use crate::models::value::Value; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 9591c729e8..606ba9ad8c 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -56,6 +56,7 @@ pin-project-lite.workspace = true postgres_backend.workspace = true postgres_connection.workspace = true postgres_ffi.workspace = true +postgres_ffi_types.workspace = true postgres_initdb.workspace = true postgres-protocol.workspace = true postgres-types.workspace = true diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index eaadfe14ae..681d135e09 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -13,11 +13,11 @@ use pageserver::{page_cache, virtual_file}; use pageserver_api::key::Key; use pageserver_api::models::virtual_file::IoMode; use pageserver_api::shard::TenantShardId; -use pageserver_api::value::Value; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; +use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::SerializedValueBatch; // A very cheap hash for generating non-sequential keys. diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 215682d90c..36d0d9c974 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -67,12 +67,12 @@ use once_cell::sync::Lazy; use pageserver::config::PageServerConf; use pageserver::walredo::{PostgresRedoManager, RedoAttemptType}; use pageserver_api::key::Key; -use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use tokio::sync::Barrier; use tokio::task::JoinSet; use utils::id::TenantId; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; fn bench(c: &mut Criterion) { macro_rules! bench_group { diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 057a1d4ad6..aa4774c056 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -83,6 +83,7 @@ impl Client { timeline_id: TimelineId, shard_id: ShardIndex, auth_header: Option, + compression: Option, ) -> anyhow::Result { let endpoint: tonic::transport::Endpoint = into_endpoint .try_into() @@ -90,7 +91,15 @@ impl Client { let channel = endpoint.connect().await?; let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id) .map_err(|e| anyhow::anyhow!(e.to_string()))?; - let client = proto::PageServiceClient::with_interceptor(channel, auth); + let mut client = proto::PageServiceClient::with_interceptor(channel, auth); + + if let Some(compression) = compression { + // TODO: benchmark this (including network latency). + // TODO: consider enabling compression by default. + client = client + .accept_compressed(compression) + .send_compressed(compression); + } Ok(Self { client }) } @@ -112,7 +121,7 @@ impl Client { pub async fn get_base_backup( &mut self, req: model::GetBaseBackupRequest, - ) -> Result>, tonic::Status> { + ) -> Result> + 'static, tonic::Status> { let proto_req = proto::GetBaseBackupRequest::from(req); let response_stream: Streaming = diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 43ad92980c..8015db528d 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -1,20 +1,29 @@ use std::collections::HashMap; use std::num::NonZeroUsize; use std::ops::Range; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; -use anyhow::Context; +use anyhow::anyhow; +use futures::TryStreamExt as _; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; +use pageserver_page_api as page_api; use rand::prelude::*; +use reqwest::Url; +use tokio::io::AsyncRead; use tokio::sync::Barrier; use tokio::task::JoinSet; +use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; +use tokio_util::io::StreamReader; +use tonic::async_trait; use tracing::{info, instrument}; use utils::id::TenantTimelineId; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; @@ -24,14 +33,15 @@ use crate::util::{request_stats, tokio_thread_local_stats}; pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, - #[clap(long, default_value = "postgres://postgres@localhost:64000")] + /// The Pageserver to connect to. Use postgresql:// for libpq, or grpc:// for gRPC. + #[clap(long, default_value = "postgresql://postgres@localhost:64000")] page_service_connstring: String, #[clap(long)] pageserver_jwt: Option, #[clap(long, default_value = "1")] num_clients: NonZeroUsize, - #[clap(long, default_value = "1.0")] - gzip_probability: f64, + #[clap(long)] + no_compression: bool, #[clap(long)] runtime: Option, #[clap(long)] @@ -146,12 +156,23 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); - for tl in &timelines { + let connurl = Url::parse(&args.page_service_connstring)?; + for &tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); - tasks.push(tokio::spawn(client( - args, - *tl, + + let client: Box = match connurl.scheme() { + "postgresql" | "postgres" => Box::new( + LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, + ), + "grpc" => Box::new( + GrpcClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, + ), + scheme => return Err(anyhow!("invalid scheme {scheme}")), + }; + + tasks.push(tokio::spawn(run_worker( + client, Arc::clone(&start_work_barrier), receiver, Arc::clone(&all_work_done_barrier), @@ -166,13 +187,7 @@ async fn main_impl( let mut rng = rand::thread_rng(); let target = all_targets.choose(&mut rng).unwrap(); let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r)); - ( - target.timeline, - Work { - lsn, - gzip: rng.gen_bool(args.gzip_probability), - }, - ) + (target.timeline, Work { lsn }) }; let sender = work_senders.get(&timeline).unwrap(); // TODO: what if this blocks? @@ -216,13 +231,11 @@ async fn main_impl( #[derive(Copy, Clone)] struct Work { lsn: Option, - gzip: bool, } #[instrument(skip_all)] -async fn client( - args: &'static Args, - timeline: TenantTimelineId, +async fn run_worker( + mut client: Box, start_work_barrier: Arc, mut work: tokio::sync::mpsc::Receiver, all_work_done_barrier: Arc, @@ -230,37 +243,14 @@ async fn client( ) { start_work_barrier.wait().await; - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - - while let Some(Work { lsn, gzip }) = work.recv().await { + while let Some(Work { lsn }) = work.recv().await { let start = Instant::now(); - let copy_out_stream = client - .basebackup(&BasebackupRequest { - tenant_id: timeline.tenant_id, - timeline_id: timeline.timeline_id, - lsn, - gzip, - }) - .await - .with_context(|| format!("start basebackup for {timeline}")) - .unwrap(); + let stream = client.basebackup(lsn).await.unwrap(); - use futures::StreamExt; - let size = Arc::new(AtomicUsize::new(0)); - copy_out_stream - .for_each({ - |r| { - let size = Arc::clone(&size); - async move { - let size = Arc::clone(&size); - size.fetch_add(r.unwrap().len(), Ordering::Relaxed); - } - } - }) - .await; - info!("basebackup size is {} bytes", size.load(Ordering::Relaxed)); + let size = futures::io::copy(stream.compat(), &mut tokio::io::sink().compat_write()) + .await + .unwrap(); + info!("basebackup size is {size} bytes"); let elapsed = start.elapsed(); live_stats.inc(); STATS.with(|stats| { @@ -270,3 +260,94 @@ async fn client( all_work_done_barrier.wait().await; } + +/// A basebackup client. This allows switching out the client protocol implementation. +#[async_trait] +trait Client: Send { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>>; +} + +/// A libpq-based Pageserver client. +struct LibpqClient { + inner: pageserver_client::page_service::Client, + ttid: TenantTimelineId, + compression: bool, +} + +impl LibpqClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + Ok(Self { + inner: pageserver_client::page_service::Client::new(connstring.to_string()).await?, + ttid, + compression, + }) + } +} + +#[async_trait] +impl Client for LibpqClient { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>> { + let req = BasebackupRequest { + tenant_id: self.ttid.tenant_id, + timeline_id: self.ttid.timeline_id, + lsn, + gzip: self.compression, + }; + let stream = self.inner.basebackup(&req).await?; + Ok(Box::pin(StreamReader::new( + stream.map_err(std::io::Error::other), + ))) + } +} + +/// A gRPC Pageserver client. +struct GrpcClient { + inner: page_api::Client, +} + +impl GrpcClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let inner = page_api::Client::new( + connstring.to_string(), + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + ) + .await?; + Ok(Self { inner }) + } +} + +#[async_trait] +impl Client for GrpcClient { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>> { + let req = page_api::GetBaseBackupRequest { + lsn, + replica: false, + full: false, + }; + let stream = self.inner.get_base_backup(req).await?; + Ok(Box::pin(StreamReader::new( + stream.map_err(std::io::Error::other), + ))) + } +} diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 2a0548b811..fe136b8bbd 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -18,13 +18,12 @@ use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; use pageserver_api::key::{Key, rel_block_to_key}; use pageserver_api::reltag::{RelTag, SlruKind}; -use postgres_ffi::pg_constants::{ - DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID, PG_HBA, PGDATA_SPECIAL_FILES, -}; -use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM}; +use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES}; use postgres_ffi::{ BLCKSZ, PG_TLI, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName, dispatch_pgversion, pg_constants, }; +use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; +use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM}; use tokio::io; use tokio::io::AsyncWrite; use tokio_tar::{Builder, EntryType, Header}; @@ -372,6 +371,7 @@ where .partition( self.timeline.get_shard_identity(), self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, + BLCKSZ as u64, ); let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 911449c7c5..96fe0c1078 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -520,7 +520,7 @@ async fn import_file( } if file_path.starts_with("global") { - let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; + let spcnode = postgres_ffi_types::constants::GLOBALTABLESPACE_OID; let dbnode = 0; match file_name.as_ref() { @@ -553,7 +553,7 @@ async fn import_file( } } } else if file_path.starts_with("base") { - let spcnode = pg_constants::DEFAULTTABLESPACE_OID; + let spcnode = postgres_ffi_types::constants::DEFAULTTABLESPACE_OID; let dbnode: u32 = file_path .iter() .nth(1) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 57087dc6c3..642b447e5f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -41,7 +41,7 @@ use postgres_backend::{ AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, }; use postgres_ffi::BLCKSZ; -use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; +use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID; use pq_proto::framed::ConnectionError; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor}; use smallvec::{SmallVec, smallvec}; @@ -3286,7 +3286,14 @@ impl GrpcPageServiceHandler { Ok(req) })) // Run the page service. - .service(proto::PageServiceServer::new(page_service_handler)); + .service( + proto::PageServiceServer::new(page_service_handler) + // Support both gzip and zstd compression. The client decides what to use. + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(tonic::codec::CompressionEncoding::Zstd) + .send_compressed(tonic::codec::CompressionEncoding::Gzip) + .send_compressed(tonic::codec::CompressionEncoding::Zstd), + ); let server = server.add_service(page_service); // Reflection service for use with e.g. grpcurl. @@ -3532,7 +3539,6 @@ impl proto::PageService for GrpcPageServiceHandler { Ok(tonic::Response::new(resp.into())) } - // TODO: ensure clients use gzip compression for the stream. #[instrument(skip_all, fields(lsn))] async fn get_base_backup( &self, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 633d62210d..58af2548ee 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -23,12 +23,11 @@ use pageserver_api::key::{ }; use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace}; use pageserver_api::models::RelSizeMigration; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; -use pageserver_api::value::Value; -use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; -use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId}; +use postgres_ffi::{BLCKSZ, TimestampTz, TransactionId}; +use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; +use postgres_ffi_types::{Oid, RepOriginId}; use serde::{Deserialize, Serialize}; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; @@ -36,6 +35,8 @@ use tracing::{debug, info, info_span, trace, warn}; use utils::bin_ser::{BeSer, DeserializeError}; use utils::lsn::Lsn; use utils::pausable_failpoint; +use wal_decoder::models::record::NeonWalRecord; +use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; use super::tenant::{PageReconstructError, Timeline}; @@ -720,6 +721,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, + BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( @@ -960,6 +962,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, + BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cfecf5561c..d2c2fdef93 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -496,7 +496,7 @@ impl WalRedoManager { key: pageserver_api::key::Key, lsn: Lsn, base_img: Option<(Lsn, bytes::Bytes)>, - records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>, + records: Vec<(Lsn, wal_decoder::models::record::NeonWalRecord)>, pg_version: u32, redo_attempt_type: RedoAttemptType, ) -> Result { @@ -1859,6 +1859,29 @@ impl TenantShard { } } + // At this point we've initialized all timelines and are tracking them. + // Now compute the layer visibility for all (not offloaded) timelines. + let compute_visiblity_for = { + let timelines_accessor = self.timelines.lock().unwrap(); + let mut timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap(); + + timelines_offloaded_accessor.extend(offloaded_timelines_list.into_iter()); + + // Before activation, populate each Timeline's GcInfo with information about its children + self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None); + + timelines_accessor.values().cloned().collect::>() + }; + + for tl in compute_visiblity_for { + tl.update_layer_visibility().await.with_context(|| { + format!( + "failed initial timeline visibility computation {} for tenant {}", + tl.timeline_id, self.tenant_shard_id + ) + })?; + } + // Walk through deleted timelines, resume deletion for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions { remote_timeline_client @@ -1878,10 +1901,6 @@ impl TenantShard { .context("resume_deletion") .map_err(LoadLocalTimelineError::ResumeDeletion)?; } - { - let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap(); - offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter()); - } // Stash the preloaded tenant manifest, and upload a new manifest if changed. // @@ -3449,9 +3468,6 @@ impl TenantShard { .values() .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping())); - // Before activation, populate each Timeline's GcInfo with information about its children - self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None); - // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. tasks::start_background_loops(self, background_jobs_can_start); @@ -5836,10 +5852,10 @@ pub(crate) mod harness { use once_cell::sync::OnceCell; use pageserver_api::key::Key; use pageserver_api::models::ShardParameters; - use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::ShardIndex; use utils::id::TenantId; use utils::logging; + use wal_decoder::models::record::NeonWalRecord; use super::*; use crate::deletion_queue::mock::MockDeletionQueue; @@ -6094,9 +6110,6 @@ mod tests { #[cfg(feature = "testing")] use pageserver_api::keyspace::KeySpaceRandomAccum; use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings}; - #[cfg(feature = "testing")] - use pageserver_api::record::NeonWalRecord; - use pageserver_api::value::Value; use pageserver_compaction::helpers::overlaps_with; #[cfg(feature = "testing")] use rand::SeedableRng; @@ -6117,6 +6130,9 @@ mod tests { use timeline::{CompactOptions, DeltaLayerTestDesc, VersionedKeySpaceQuery}; use utils::id::TenantId; use utils::shard::{ShardCount, ShardNumber}; + #[cfg(feature = "testing")] + use wal_decoder::models::record::NeonWalRecord; + use wal_decoder::models::value::Value; use super::*; use crate::DEFAULT_PG_VERSION; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9d15e7c4de..e65d444f76 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -34,11 +34,11 @@ pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName}; use pageserver_api::config::GetVectoredConcurrentIo; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; -use pageserver_api::record::NeonWalRecord; -use pageserver_api::value::Value; use tracing::{Instrument, info_span, trace}; use utils::lsn::Lsn; use utils::sync::gate::GateGuard; +use wal_decoder::models::record::NeonWalRecord; +use wal_decoder::models::value::Value; use self::inmemory_layer::InMemoryLayerFileId; use super::PageReconstructError; diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 51f2e909a2..1d50a5f3a0 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -4,11 +4,11 @@ use std::sync::Arc; use bytes::Bytes; use pageserver_api::key::{KEY_SIZE, Key}; -use pageserver_api::value::Value; use tokio_util::sync::CancellationToken; use utils::id::TimelineId; use utils::lsn::Lsn; use utils::shard::TenantShardId; +use wal_decoder::models::value::Value; use super::errors::PutError; use super::layer::S3_UPLOAD_LIMIT; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index e82a28bb4c..ba763d4c3f 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -44,7 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; -use pageserver_api::value::Value; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; @@ -54,6 +53,7 @@ use utils::bin_ser::BeSer; use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::errors::PutError; use super::{ @@ -1306,7 +1306,7 @@ impl DeltaLayerInner { // is it an image or will_init walrecord? // FIXME: this could be handled by threading the BlobRef to the // VectoredReadBuilder - let will_init = pageserver_api::value::ValueBytes::will_init(&data) + let will_init = wal_decoder::models::value::ValueBytes::will_init(&data) .inspect_err(|_e| { #[cfg(feature = "testing")] tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value"); @@ -1369,7 +1369,7 @@ impl DeltaLayerInner { format!(" img {} bytes", img.len()) } Value::WalRecord(rec) => { - let wal_desc = pageserver_api::record::describe_wal_record(&rec)?; + let wal_desc = wal_decoder::models::record::describe_wal_record(&rec)?; format!( " rec {} bytes will_init: {} {}", buf.len(), @@ -1624,7 +1624,6 @@ pub(crate) mod test { use bytes::Bytes; use itertools::MinMaxResult; - use pageserver_api::value::Value; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use rand::{Rng, RngCore}; @@ -1988,7 +1987,7 @@ pub(crate) mod test { #[tokio::test] async fn copy_delta_prefix_smoke() { use bytes::Bytes; - use pageserver_api::record::NeonWalRecord; + use wal_decoder::models::record::NeonWalRecord; let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke") .await diff --git a/pageserver/src/tenant/storage_layer/filter_iterator.rs b/pageserver/src/tenant/storage_layer/filter_iterator.rs index 1a330ecfc2..d345195446 100644 --- a/pageserver/src/tenant/storage_layer/filter_iterator.rs +++ b/pageserver/src/tenant/storage_layer/filter_iterator.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use anyhow::bail; use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, SparseKeySpace}; -use pageserver_api::value::Value; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::PersistentLayerKey; use super::merge_iterator::{MergeIterator, MergeIteratorItem}; @@ -126,7 +126,6 @@ mod tests { #[tokio::test] async fn filter_keyspace_iterator() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator") .await diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 740f53f928..d6f5f48a6e 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -42,7 +42,6 @@ use pageserver_api::config::MaxVectoredReadBytes; use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; -use pageserver_api::value::Value; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_stream::StreamExt; @@ -52,6 +51,7 @@ use utils::bin_ser::BeSer; use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::errors::PutError; use super::layer_name::ImageLayerName; @@ -1232,10 +1232,10 @@ mod test { use itertools::Itertools; use pageserver_api::key::Key; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; - use pageserver_api::value::Value; use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; + use wal_decoder::models::value::Value; use super::{ImageLayerIterator, ImageLayerWriter}; use crate::DEFAULT_PG_VERSION; diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 2f2ff0f273..313c133fa2 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -824,7 +824,7 @@ async fn evict_and_wait_does_not_wait_for_download() { #[tokio::test(start_paused = true)] async fn eviction_cancellation_on_drop() { use bytes::Bytes; - use pageserver_api::value::Value; + use wal_decoder::models::value::Value; // this is the runtime on which Layer spawns the blocking tasks on let handle = tokio::runtime::Handle::current(); diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index ea3dea50c3..c15abcdf3f 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use anyhow::bail; use pageserver_api::key::Key; -use pageserver_api::value::Value; use utils::lsn::Lsn; +use wal_decoder::models::value::Value; use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator}; use super::image_layer::{ImageLayerInner, ImageLayerIterator}; @@ -402,9 +402,9 @@ impl<'a> MergeIterator<'a> { mod tests { use itertools::Itertools; use pageserver_api::key::Key; - #[cfg(feature = "testing")] - use pageserver_api::record::NeonWalRecord; use utils::lsn::Lsn; + #[cfg(feature = "testing")] + use wal_decoder::models::record::NeonWalRecord; use super::*; use crate::DEFAULT_PG_VERSION; @@ -436,7 +436,6 @@ mod tests { #[tokio::test] async fn merge_in_between() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_merge_in_between") .await @@ -501,7 +500,6 @@ mod tests { #[tokio::test] async fn delta_merge() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_merge") .await @@ -578,7 +576,6 @@ mod tests { #[tokio::test] async fn delta_image_mixed_merge() { use bytes::Bytes; - use pageserver_api::value::Value; let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge") .await diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a1969ecae6..a0e9d8f06a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -56,8 +56,6 @@ use pageserver_api::models::{ }; use pageserver_api::reltag::{BlockNumber, RelTag}; use pageserver_api::shard::{ShardIdentity, ShardIndex, ShardNumber, TenantShardId}; -#[cfg(test)] -use pageserver_api::value::Value; use postgres_connection::PgConnectionConfig; use postgres_ffi::v14::xlog_utils; use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp}; @@ -81,6 +79,8 @@ use utils::seqwait::SeqWait; use utils::simple_rcu::{Rcu, RcuReadGuard}; use utils::sync::gate::{Gate, GateGuard}; use utils::{completion, critical, fs_ext, pausable_failpoint}; +#[cfg(test)] +use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; use self::delete::DeleteTimelineFlow; @@ -3422,10 +3422,6 @@ impl Timeline { // TenantShard::create_timeline will wait for these uploads to happen before returning, or // on retry. - // Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan) - drop(guard); // drop write lock, update_layer_visibility will take a read lock. - self.update_layer_visibility().await?; - info!( "loaded layer map with {} layers at {}, total physical size: {}", num_layers, disk_consistent_lsn, total_physical_size @@ -5211,7 +5207,11 @@ impl Timeline { } let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?; - let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size); + let dense_partitioning = dense_ks.partition( + &self.shard_identity, + partition_size, + postgres_ffi::BLCKSZ as u64, + ); let sparse_partitioning = SparseKeyPartitioning { parts: vec![sparse_ks], }; // no partitioning for metadata keys for now @@ -5939,7 +5939,7 @@ impl Drop for Timeline { if let Ok(mut gc_info) = ancestor.gc_info.write() { if !gc_info.remove_child_not_offloaded(self.timeline_id) { tracing::error!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id, - "Couldn't remove retain_lsn entry from offloaded timeline's parent: already removed"); + "Couldn't remove retain_lsn entry from timeline's parent on drop: already removed"); } } } @@ -7594,11 +7594,11 @@ mod tests { use std::sync::Arc; use pageserver_api::key::Key; - use pageserver_api::value::Value; use std::iter::Iterator; use tracing::Instrument; use utils::id::TimelineId; use utils::lsn::Lsn; + use wal_decoder::models::value::Value; use super::HeatMapTimeline; use crate::context::RequestContextBuilder; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 5307d3836f..6039c002f7 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -29,9 +29,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE; use pageserver_api::key::{KEY_SIZE, Key}; use pageserver_api::keyspace::{KeySpace, ShardedRange}; use pageserver_api::models::{CompactInfoResponse, CompactKeyRange}; -use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; -use pageserver_api::value::Value; use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; use serde::Serialize; @@ -41,6 +39,8 @@ use tracing::{Instrument, debug, error, info, info_span, trace, warn}; use utils::critical; use utils::id::TimelineId; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; +use wal_decoder::models::value::Value; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index ed679a9bdc..d471e9fc69 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -36,8 +36,8 @@ use pageserver_api::keyspace::{ShardedRange, singleton_range}; use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; +use postgres_ffi::BLCKSZ; use postgres_ffi::relfile_utils::parse_relfilename; -use postgres_ffi::{BLCKSZ, pg_constants}; use remote_storage::RemotePath; use tokio::sync::Semaphore; use tokio_stream::StreamExt; @@ -558,7 +558,7 @@ impl PgDataDir { PgDataDirDb::new( storage, &basedir.join(dboid.to_string()), - pg_constants::DEFAULTTABLESPACE_OID, + postgres_ffi_types::constants::DEFAULTTABLESPACE_OID, dboid, &datadir_path, ) @@ -571,7 +571,7 @@ impl PgDataDir { PgDataDirDb::new( storage, &datadir_path.join("global"), - postgres_ffi::pg_constants::GLOBALTABLESPACE_OID, + postgres_ffi_types::constants::GLOBALTABLESPACE_OID, 0, &datadir_path, ) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c1a3b79915..ebffaf70e2 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -28,20 +28,20 @@ use std::time::{Duration, Instant, SystemTime}; use bytes::{Buf, Bytes}; use pageserver_api::key::{Key, rel_block_to_key}; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; -use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::walrecord::*; use postgres_ffi::{ TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, fsm_logical_to_physical, pg_constants, }; +use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use tracing::*; use utils::bin_ser::{DeserializeError, SerializeError}; use utils::lsn::Lsn; use utils::rate_limit::RateLimit; use utils::{critical, failpoint_support}; +use wal_decoder::models::record::NeonWalRecord; use wal_decoder::models::*; use crate::ZERO_PAGE; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index ed8a954369..1498f3c83d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -32,12 +32,12 @@ use anyhow::Context; use bytes::{Bytes, BytesMut}; use pageserver_api::key::Key; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; -use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use tracing::*; use utils::lsn::Lsn; use utils::sync::gate::GateError; use utils::sync::heavier_once_cell; +use wal_decoder::models::record::NeonWalRecord; use crate::config::PageServerConf; use crate::metrics::{ @@ -571,11 +571,11 @@ mod tests { use bytes::Bytes; use pageserver_api::key::Key; - use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; use tracing::Instrument; use utils::id::TenantId; use utils::lsn::Lsn; + use wal_decoder::models::record::NeonWalRecord; use super::PostgresRedoManager; use crate::config::PageServerConf; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index a3840f1f6f..0783c77622 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -2,16 +2,16 @@ use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; use bytes::BytesMut; use pageserver_api::key::Key; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::SlruKind; -use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::v14::nonrelfile_utils::{ mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, transaction_id_set_status, }; use postgres_ffi::{BLCKSZ, pg_constants}; +use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM; use tracing::*; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; /// Can this request be served by neon redo functions /// or we need to pass it to wal-redo postgres process? diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index 6d4a38d4ff..3dec0593bf 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -10,7 +10,6 @@ use std::time::Duration; use anyhow::Context; use bytes::Bytes; -use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; use postgres_ffi::BLCKSZ; @@ -18,6 +17,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{Instrument, debug, error, instrument}; use utils::lsn::Lsn; use utils::poison::Poison; +use wal_decoder::models::record::NeonWalRecord; use self::no_leak_child::NoLeakChild; use crate::config::PageServerConf; diff --git a/poetry.lock b/poetry.lock index f9b6f83366..1bc5077eb7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -746,23 +746,23 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"] [[package]] name = "botocore" -version = "1.34.11" +version = "1.34.162" description = "Low-level, data-driven core of boto 3." optional = false -python-versions = ">= 3.8" +python-versions = ">=3.8" groups = ["main"] files = [ - {file = "botocore-1.34.11-py3-none-any.whl", hash = "sha256:1ff1398b6ea670e1c01ac67a33af3da854f8e700d3528289c04f319c330d8250"}, - {file = "botocore-1.34.11.tar.gz", hash = "sha256:51905c3d623c60df5dc5794387de7caf886d350180a01a3dfa762e903edb45a9"}, + {file = "botocore-1.34.162-py3-none-any.whl", hash = "sha256:2d918b02db88d27a75b48275e6fb2506e9adaaddbec1ffa6a8a0898b34e769be"}, + {file = "botocore-1.34.162.tar.gz", hash = "sha256:adc23be4fb99ad31961236342b7cbf3c0bfc62532cd02852196032e8c0d682f3"}, ] [package.dependencies] jmespath = ">=0.7.1,<2.0.0" python-dateutil = ">=2.1,<3.0.0" -urllib3 = {version = ">=1.25.4,<2.1", markers = "python_version >= \"3.10\""} +urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""} [package.extras] -crt = ["awscrt (==0.19.19)"] +crt = ["awscrt (==0.21.2)"] [[package]] name = "botocore-stubs" @@ -3422,20 +3422,21 @@ files = [ [[package]] name = "urllib3" -version = "1.26.19" +version = "2.5.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "urllib3-1.26.19-py2.py3-none-any.whl", hash = "sha256:37a0344459b199fce0e80b0d3569837ec6b6937435c5244e7fd73fa6006830f3"}, - {file = "urllib3-1.26.19.tar.gz", hash = "sha256:3e3d753a8618b86d7de333b4223005f68720bcd6a7d2bcb9fbd2229ec7c1e429"}, + {file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"}, + {file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"}, ] [package.extras] -brotli = ["brotli (==1.0.9) ; os_name != \"nt\" and python_version < \"3\" and platform_python_implementation == \"CPython\"", "brotli (>=1.0.9) ; python_version >= \"3\" and platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; (os_name != \"nt\" or python_version >= \"3\") and platform_python_implementation != \"CPython\"", "brotlipy (>=0.6.0) ; os_name == \"nt\" and python_version < \"3\""] -secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress ; python_version == \"2.7\"", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] -socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] +brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] +h2 = ["h2 (>=4,<5)"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] [[package]] name = "websockets" diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 8cf1020adb..050d61055e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -489,7 +489,9 @@ class NeonEnvBuilder: self.config_init_force: str | None = None self.top_output_dir = top_output_dir self.control_plane_hooks_api: str | None = None - self.storage_controller_config: dict[Any, Any] | None = None + self.storage_controller_config: dict[Any, Any] | None = { + "timelines_onto_safekeepers": True, + } # Flag to enable https listener in pageserver, generate local ssl certs, # and force storage controller to use https for pageserver api. @@ -4909,6 +4911,9 @@ class Safekeeper(LogUtils): log.info(f"finished pulling timeline from {src_ids} to {self.id}") return res + def safekeeper_id(self) -> SafekeeperId: + return SafekeeperId(self.id, "localhost", self.port.pg_tenant_only) + @property def data_dir(self) -> Path: return self.env.repo_dir / "safekeepers" / f"sk{self.id}" diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 9ce618b2ad..fa5c9aa693 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -11,6 +11,7 @@ from fixtures.common_types import Lsn, TimelineId from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import wait_until_tenant_active +from fixtures.safekeeper.http import MembershipConfiguration, TimelineCreateRequest from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix from requests import RequestException @@ -164,6 +165,19 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) env.pageserver.tenant_create(env.initial_tenant) + sk = env.safekeepers[0] + assert sk + sk.http_client().timeline_create( + TimelineCreateRequest( + env.initial_tenant, + env.initial_timeline, + MembershipConfiguration(generation=1, members=[sk.safekeeper_id()], new_members=None), + int(env.pg_version), + Lsn(0), + None, + ) + ) + initial_branch = "initial_branch" def start_creating_timeline(): diff --git a/test_runner/regress/test_normal_work.py b/test_runner/regress/test_normal_work.py index 44590ea4b9..3335cf686c 100644 --- a/test_runner/regress/test_normal_work.py +++ b/test_runner/regress/test_normal_work.py @@ -64,6 +64,11 @@ def test_normal_work( """ neon_env_builder.num_safekeepers = num_safekeepers + + if safekeeper_proto_version == 2: + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 2590a3fe9d..2b71662669 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -671,6 +671,12 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu """ neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + # On the new mode, the test runs into a cancellation issue, i.e. the walproposer can't shut down + # as it is hang-waiting on the timeline_checkpoint call in WalIngest::new. + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + # turn off background tasks so that they don't interfere with the downloads env = neon_env_builder.init_start( initial_tenant_conf={ diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 8f3aa010e3..74ba74645e 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -88,6 +88,12 @@ def test_storage_controller_smoke( neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api env = neon_env_builder.init_configs() + # These bubble up from safekeepers + for ps in env.pageservers: + ps.allowed_errors.extend( + [".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"] + ) + # Start services by hand so that we can skip a pageserver (this will start + register later) env.broker.start() env.storage_controller.start() @@ -3455,7 +3461,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): assert target.get_safekeeper(fake_id) is None - assert len(target.get_safekeepers()) == 0 + start_sks = target.get_safekeepers() sk_0 = env.safekeepers[0] @@ -3477,7 +3483,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): inserted = target.get_safekeeper(fake_id) assert inserted is not None - assert target.get_safekeepers() == [inserted] + assert target.get_safekeepers() == start_sks + [inserted] assert eq_safekeeper_records(body, inserted) # error out if pk is changed (unexpected) @@ -3489,7 +3495,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): assert exc.value.status_code == 400 inserted_again = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_again] + assert target.get_safekeepers() == start_sks + [inserted_again] assert inserted_again is not None assert eq_safekeeper_records(inserted, inserted_again) @@ -3498,7 +3504,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["version"] += 1 target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) @@ -3507,7 +3513,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["https_port"] = 123 target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) env.storage_controller.consistency_check() @@ -3516,7 +3522,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): body["https_port"] = None target.on_safekeeper_deploy(fake_id, body) inserted_now = target.get_safekeeper(fake_id) - assert target.get_safekeepers() == [inserted_now] + assert target.get_safekeepers() == start_sks + [inserted_now] assert inserted_now is not None assert eq_safekeeper_records(body, inserted_now) env.storage_controller.consistency_check() @@ -3635,6 +3641,9 @@ def test_timeline_delete_mid_live_migration(neon_env_builder: NeonEnvBuilder, mi env = neon_env_builder.init_configs() env.start() + for ps in env.pageservers: + ps.allowed_errors.append(".*Timeline.* has been deleted.*") + tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.storage_controller.tenant_create(tenant_id, placement_policy={"Attached": 1}) diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index 03cd133ccb..e29cb801d5 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -341,6 +341,11 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder env = neon_env_builder.init_configs() env.start() + for ps in env.pageservers: + ps.allowed_errors.extend( + [".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"] + ) + tenant_id = TenantId.generate() timeline_id = TimelineId.generate() env.create_tenant( diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index f0810270b1..c58f78aeb1 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -21,7 +21,10 @@ from fixtures.neon_fixtures import ( last_flush_lsn_upload, wait_for_last_flush_lsn, ) -from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException +from fixtures.pageserver.http import ( + HistoricLayerInfo, + PageserverApiException, +) from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404 from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.utils import assert_pageserver_backups_equal, skip_in_debug_build, wait_until @@ -413,6 +416,7 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots "read_only": True, }, ) + sk = env.safekeepers[0] assert sk with pytest.raises(requests.exceptions.HTTPError, match="Not Found"): @@ -504,8 +508,15 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots assert len(lineage.get("original_ancestor", [])) == 0 assert len(lineage.get("reparenting_history", [])) == 0 - for name, _, _, rows, starts in expected_result: - with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep: + for branch_name, queried_timeline, _, rows, starts in expected_result: + details = client.timeline_detail(env.initial_tenant, queried_timeline) + log.info(f"reading data from branch {branch_name}") + # specifying the lsn makes the endpoint read-only and not connect to safekeepers + with env.endpoints.create( + branch_name, + lsn=Lsn(details["last_record_lsn"]), + ) as ep: + ep.start(safekeeper_generation=1) assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1 @@ -1088,6 +1099,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion( for ps in env.pageservers: ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + ps.allowed_errors.append(".*Timeline.* has been deleted.*") pageservers = dict((int(p.id), p) for p in env.pageservers) @@ -1209,6 +1221,7 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv for ps in env.pageservers: ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS) + ps.allowed_errors.append(".*Timeline.* has been deleted.*") pageservers = dict((int(p.id), p) for p in env.pageservers) diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index 9a710f5b80..8ef64a0742 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -24,6 +24,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}, initial_tenant_shard_count=2 if sharded else None, ) + for ps in env.pageservers: + ps.allowed_errors.append(".*Timeline.* has been deleted.*") if sharded: http = env.storage_controller.pageserver_api() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index b9183286af..ea120c1814 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -229,7 +229,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder): # Test timeline_list endpoint. http_cli = env.safekeepers[0].http_client() - assert len(http_cli.timeline_list()) == 3 + assert len(http_cli.timeline_list()) == 4 # Check that dead minority doesn't prevent the commits: execute insert n_inserts @@ -740,8 +740,8 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.create_branch("test_timeline_status") - endpoint = env.endpoints.create_start("test_timeline_status") + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main") wa = env.safekeepers[0] @@ -1292,6 +1292,12 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder): # it works without compute at all. def test_peer_recovery(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 + + # timelines should be created the old way + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() tenant_id = env.initial_tenant @@ -1532,6 +1538,11 @@ def test_safekeeper_without_pageserver( def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): + # timelines should be created the old way manually until we have migration support + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + def execute_payload(endpoint: Endpoint): with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -1661,6 +1672,15 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool): res = env.safekeepers[3].pull_timeline( [env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id ) + sk_id_1 = env.safekeepers[0].safekeeper_id() + sk_id_3 = env.safekeepers[2].safekeeper_id() + sk_id_4 = env.safekeepers[3].safekeeper_id() + new_conf = MembershipConfiguration( + generation=2, members=[sk_id_1, sk_id_3, sk_id_4], new_members=None + ) + for i in [0, 2, 3]: + env.safekeepers[i].http_client().membership_switch(tenant_id, timeline_id, new_conf) + log.info("Finished pulling timeline") log.info(res) @@ -1705,13 +1725,15 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) env = neon_env_builder.init_start() - tenant_id = env.initial_tenant - timeline_id = env.initial_timeline (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + dst_sk.stop() + + [tenant_id, timeline_id] = env.create_tenant() + log.info("use only first 2 safekeepers, 3rd will be seeded") - endpoint = env.endpoints.create("main") + endpoint = env.endpoints.create("main", tenant_id=tenant_id) endpoint.active_safekeepers = [1, 2] endpoint.start() endpoint.safe_psql("create table t(key int, value text)") @@ -1723,6 +1745,7 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): src_http = src_sk.http_client() # run pull_timeline which will halt before downloading files src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) + dst_sk.start() pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) @@ -1782,23 +1805,27 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.initial_timeline (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + dst_sk.stop() + src_http = src_sk.http_client() + src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) + + timeline_id = env.create_branch("pull_timeline_term_changes") + + # run pull_timeline which will halt before downloading files log.info("use only first 2 safekeepers, 3rd will be seeded") - ep = env.endpoints.create("main") + ep = env.endpoints.create("pull_timeline_term_changes") ep.active_safekeepers = [1, 2] ep.start() ep.safe_psql("create table t(key int, value text)") ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'") - src_http = src_sk.http_client() - # run pull_timeline which will halt before downloading files - src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) + dst_sk.start() pt_handle.start() src_sk.wait_until_paused("sk-snapshot-after-list-pausable") @@ -1807,7 +1834,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): # restart compute to bump term ep.stop() - ep = env.endpoints.create("main") + ep = env.endpoints.create("pull_timeline_term_changes") ep.active_safekeepers = [1, 2] ep.start() ep.safe_psql("insert into t select generate_series(1, 100), 'pear'") @@ -1929,6 +1956,11 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder): @run_only_on_default_postgres("tests only safekeeper API") def test_membership_api(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 1 + # timelines should be created the old way + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() # These are expected after timeline deletion on safekeepers. @@ -2009,6 +2041,12 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): created manually, later storcon will do that. """ neon_env_builder.num_safekeepers = 3 + + # timelines should be created the old way manually + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() tenant_id = env.initial_tenant @@ -2064,7 +2102,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.create_branch("test_idle_reconnections") + timeline_id = env.initial_timeline def collect_stats() -> dict[str, float]: # we need to collect safekeeper_pg_queries_received_total metric from all safekeepers @@ -2095,7 +2133,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): collect_stats() - endpoint = env.endpoints.create_start("test_idle_reconnections") + endpoint = env.endpoints.create_start("main") # just write something to the timeline endpoint.safe_psql("create table t(i int)") collect_stats() diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index d8a7dc2a2b..1bad387a90 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -590,6 +590,13 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int): @pytest.mark.parametrize("safekeeper_proto_version", [2, 3]) def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int): neon_env_builder.num_safekeepers = 3 + if safekeeper_proto_version == 2: + # On the legacy protocol, we don't support generations, which are part of + # `timelines_onto_safekeepers` + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + env = neon_env_builder.init_start() asyncio.run(run_wal_truncation(env, safekeeper_proto_version)) @@ -713,6 +720,11 @@ async def run_quorum_sanity(env: NeonEnv): # we don't. def test_quorum_sanity(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 4 + + # The test fails basically always on the new mode. + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } env = neon_env_builder.init_start() asyncio.run(run_quorum_sanity(env)) diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 0252b590cc..d281c055b0 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -16,6 +16,13 @@ if TYPE_CHECKING: # Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout. # Ensures that walreceiver does not run without any data inserted and only starts after the insertion. def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): + # we assert below that the walreceiver is not active before data writes. + # with manually created timelines, it is active. + # FIXME: remove this test once we remove timelines_onto_safekeepers + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": False, + } + # Trigger WAL wait timeout faster neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" env = neon_env_builder.init_start()