Merge branch 'main' into thesuhas/pg_stat_rollout

This commit is contained in:
Suhas Thalanki
2025-06-19 09:42:07 -07:00
62 changed files with 532 additions and 306 deletions

15
Cargo.lock generated
View File

@@ -4334,6 +4334,7 @@ dependencies = [
"postgres_backend",
"postgres_connection",
"postgres_ffi",
"postgres_ffi_types",
"postgres_initdb",
"posthog_client_lite",
"pprof",
@@ -4403,7 +4404,7 @@ dependencies = [
"nix 0.30.1",
"once_cell",
"postgres_backend",
"postgres_ffi",
"postgres_ffi_types",
"rand 0.8.5",
"remote_storage",
"reqwest",
@@ -4892,6 +4893,7 @@ dependencies = [
"memoffset 0.9.0",
"once_cell",
"postgres",
"postgres_ffi_types",
"pprof",
"regex",
"serde",
@@ -4900,6 +4902,14 @@ dependencies = [
"utils",
]
[[package]]
name = "postgres_ffi_types"
version = "0.1.0"
dependencies = [
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "postgres_initdb"
version = "0.1.0"
@@ -7561,6 +7571,7 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"flate2",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
@@ -7580,6 +7591,7 @@ dependencies = [
"tower-layer",
"tower-service",
"tracing",
"zstd",
]
[[package]]
@@ -8159,6 +8171,7 @@ dependencies = [
"futures",
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"pprof",
"prost 0.13.5",
"remote_storage",

View File

@@ -22,6 +22,7 @@ members = [
"libs/http-utils",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/postgres_ffi_types",
"libs/safekeeper_api",
"libs/desim",
"libs/neon-shmem",
@@ -199,7 +200,7 @@ tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] }
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
@@ -259,6 +260,7 @@ pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_ffi_types = { version = "0.1", path = "./libs/postgres_ffi_types/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }

View File

@@ -5,8 +5,6 @@
ARG REPOSITORY=ghcr.io/neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG DEFAULT_PG_VERSION=17
ARG STABLE_PG_VERSION=16
ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
@@ -63,14 +61,11 @@ FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG STABLE_PG_VERSION
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
COPY --from=plan /home/nonroot/recipe.json recipe.json
ARG ADDITIONAL_RUSTFLAGS=""
@@ -97,7 +92,6 @@ RUN set -e \
# Build final image
#
FROM $BASE_IMAGE_SHA
ARG DEFAULT_PG_VERSION
WORKDIR /data
RUN set -e \
@@ -107,8 +101,6 @@ RUN set -e \
libreadline-dev \
libseccomp-dev \
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
openssl \
unzip \
curl \

View File

@@ -167,13 +167,6 @@ postgres-%: postgres-configure-% \
+@echo "Compiling test_decoding $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install
.PHONY: postgres-clean-%
postgres-clean-%:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean
.PHONY: postgres-check-%
postgres-check-%: postgres-%
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check
@@ -206,21 +199,6 @@ neon-pg-ext-%: postgres-%
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
.PHONY: neon-pg-clean-ext-%
neon-pg-clean-ext-%:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean
# Build walproposer as a static library. walproposer source code is located
# in the pgxn/neon directory.
#
@@ -253,12 +231,6 @@ ifeq ($(UNAME_S),Linux)
pg_crc32c.o
endif
.PHONY: walproposer-lib-clean
walproposer-lib-clean:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
.PHONY: neon-pg-ext
neon-pg-ext: \
neon-pg-ext-v14 \
@@ -266,13 +238,6 @@ neon-pg-ext: \
neon-pg-ext-v16 \
neon-pg-ext-v17
.PHONY: neon-pg-clean-ext
neon-pg-clean-ext: \
neon-pg-clean-ext-v14 \
neon-pg-clean-ext-v15 \
neon-pg-clean-ext-v16 \
neon-pg-clean-ext-v17
# shorthand to build all Postgres versions
.PHONY: postgres
postgres: \
@@ -288,13 +253,6 @@ postgres-headers: \
postgres-headers-v16 \
postgres-headers-v17
.PHONY: postgres-clean
postgres-clean: \
postgres-clean-v14 \
postgres-clean-v15 \
postgres-clean-v16 \
postgres-clean-v17
.PHONY: postgres-check
postgres-check: \
postgres-check-v14 \
@@ -302,12 +260,6 @@ postgres-check: \
postgres-check-v16 \
postgres-check-v17
# This doesn't remove the effects of 'configure'.
.PHONY: clean
clean: postgres-clean neon-pg-clean-ext
$(MAKE) -C compute clean
$(CARGO_CMD_PREFIX) cargo clean
# This removes everything
.PHONY: distclean
distclean:

View File

@@ -2320,8 +2320,6 @@ pub fn forward_termination_signal(dev_mode: bool) {
}
if !dev_mode {
info!("not in dev mode, terminating pgbouncer");
// Terminate pgbouncer with SIGKILL
match pid_file::read(PGBOUNCER_PIDFILE.into()) {
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
@@ -2353,25 +2351,27 @@ pub fn forward_termination_signal(dev_mode: bool) {
error!("error reading pgbouncer pid file: {}", e);
}
}
}
// Terminate local_proxy
match pid_file::read("/etc/local_proxy/pid".into()) {
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
info!("sending SIGTERM to local_proxy process pid: {}", pid);
if let Err(e) = kill(pid, Signal::SIGTERM) {
error!("failed to terminate local_proxy: {}", e);
// Terminate local_proxy
match pid_file::read("/etc/local_proxy/pid".into()) {
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
info!("sending SIGTERM to local_proxy process pid: {}", pid);
if let Err(e) = kill(pid, Signal::SIGTERM) {
error!("failed to terminate local_proxy: {}", e);
}
}
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
info!("local_proxy PID file exists but process not running");
}
Ok(pid_file::PidFileRead::NotExist) => {
info!("local_proxy PID file not found, process may not be running");
}
Err(e) => {
error!("error reading local_proxy PID file: {}", e);
}
}
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
info!("local_proxy PID file exists but process not running");
}
Ok(pid_file::PidFileRead::NotExist) => {
info!("local_proxy PID file not found, process may not be running");
}
Err(e) => {
error!("error reading local_proxy PID file: {}", e);
}
} else {
info!("Skipping pgbouncer and local_proxy termination because in dev mode");
}
let pg_pid = PG_PID.load(Ordering::SeqCst);

View File

@@ -236,7 +236,7 @@ impl Default for NeonStorageControllerConf {
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
timelines_onto_safekeepers: true,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}

View File

@@ -17,7 +17,7 @@ anyhow.workspace = true
bytes.workspace = true
byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -4,8 +4,8 @@ use std::ops::Range;
use anyhow::{Result, bail};
use byteorder::{BE, ByteOrder};
use bytes::Bytes;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::{Oid, RepOriginId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use serde::{Deserialize, Serialize};
use utils::const_assert;
@@ -194,7 +194,7 @@ impl Key {
/// will be rejected on the write path.
#[allow(dead_code)]
pub fn is_valid_key_on_write_path_strong(&self) -> bool {
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
if !self.is_i128_representable() {
return false;
}

View File

@@ -1,7 +1,6 @@
use std::ops::Range;
use itertools::Itertools;
use postgres_ffi::BLCKSZ;
use crate::key::Key;
use crate::shard::{ShardCount, ShardIdentity};
@@ -269,9 +268,13 @@ impl KeySpace {
/// Partition a key space into roughly chunks of roughly 'target_size' bytes
/// in each partition.
///
pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
// Assume that each value is 8k in size.
let target_nblocks = (target_size / BLCKSZ as u64) as u32;
pub fn partition(
&self,
shard_identity: &ShardIdentity,
target_size: u64,
block_size: u64,
) -> KeyPartitioning {
let target_nblocks = (target_size / block_size) as u32;
let mut parts = Vec::new();
let mut current_part = Vec::new();

View File

@@ -6,11 +6,9 @@ pub mod key;
pub mod keyspace;
pub mod models;
pub mod pagestream_api;
pub mod record;
pub mod reltag;
pub mod shard;
/// Public API types
pub mod upcall_api;
pub mod value;
pub mod config;

View File

@@ -8,9 +8,15 @@ use crate::reltag::RelTag;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
/// Block size.
///
/// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode
/// that in the protocol, because Postgres supports different block sizes as a compile
/// time option.
const BLCKSZ: usize = 8192;
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
@@ -443,7 +449,7 @@ impl PagestreamBeMessage {
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
bytes.put(&resp.segment[..]);
}
@@ -520,7 +526,7 @@ impl PagestreamBeMessage {
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u8(resp.req.kind);
bytes.put_u32(resp.req.segno);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
bytes.put(&resp.segment[..]);
}
@@ -662,7 +668,7 @@ impl PagestreamBeMessage {
let kind = buf.read_u8()?;
let segno = buf.read_u32::<BigEndian>()?;
let n_blocks = buf.read_u32::<BigEndian>()?;
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
let mut segment = vec![0; n_blocks as usize * BLCKSZ];
buf.read_exact(&mut segment)?;
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
req: PagestreamGetSlruSegmentRequest {

View File

@@ -1,9 +1,9 @@
use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::Oid;
use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
use postgres_ffi::relfile_utils::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
use postgres_ffi_types::Oid;
use postgres_ffi_types::constants::GLOBALTABLESPACE_OID;
use postgres_ffi_types::forknum::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
use serde::{Deserialize, Serialize};
///

View File

@@ -35,7 +35,7 @@ use std::hash::{Hash, Hasher};
#[doc(inline)]
pub use ::utils::shard::*;
use postgres_ffi::relfile_utils::INIT_FORKNUM;
use postgres_ffi_types::forknum::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
use crate::key::Key;

View File

@@ -16,6 +16,7 @@ memoffset.workspace = true
pprof.workspace = true
thiserror.workspace = true
serde.workspace = true
postgres_ffi_types.workspace = true
utils.workspace = true
tracing.workspace = true

View File

@@ -11,11 +11,7 @@
use crate::{BLCKSZ, PageHeaderData};
//
// From pg_tablespace_d.h
//
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;
// Note: There are a few more widely-used constants in the postgres_ffi_types::constants crate.
// From storage_xlog.h
pub const XLOG_SMGR_CREATE: u8 = 0x10;

View File

@@ -4,50 +4,7 @@
use once_cell::sync::OnceCell;
use regex::Regex;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
InvalidForkName,
#[error("invalid relation data file name")]
InvalidFileName,
}
impl From<core::num::ParseIntError> for FilePathError {
fn from(_e: core::num::ParseIntError) -> Self {
FilePathError::InvalidFileName
}
}
/// Convert Postgres relation file's fork suffix to fork number.
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}
use postgres_ffi_types::forknum::*;
/// Parse a filename of a relation file. Returns (relfilenode, forknum, segno) tuple.
///
@@ -75,7 +32,9 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
.ok_or(FilePathError::InvalidFileName)?;
let relnode_str = caps.name("relnode").unwrap().as_str();
let relnode = relnode_str.parse::<u32>()?;
let relnode = relnode_str
.parse::<u32>()
.map_err(|_e| FilePathError::InvalidFileName)?;
let forkname = caps.name("forkname").map(|f| f.as_str());
let forknum = forkname_to_number(forkname)?;
@@ -84,7 +43,11 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
let segno = if segno_match.is_none() {
0
} else {
segno_match.unwrap().as_str().parse::<u32>()?
segno_match
.unwrap()
.as_str()
.parse::<u32>()
.map_err(|_e| FilePathError::InvalidFileName)?
};
Ok((relnode, forknum, segno))

View File

@@ -0,0 +1,11 @@
[package]
name = "postgres_ffi_types"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
thiserror.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[dev-dependencies]

View File

@@ -0,0 +1,8 @@
//! Misc constants, copied from PostgreSQL headers.
//!
//! Any constants included here must be the same in all PostgreSQL versions and unlikely to change
//! in the future either!
// From pg_tablespace_d.h
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;

View File

@@ -0,0 +1,36 @@
// Fork numbers, from relpath.h
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
InvalidForkName,
#[error("invalid relation data file name")]
InvalidFileName,
}
/// Convert Postgres relation file's fork suffix to fork number.
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}

View File

@@ -0,0 +1,13 @@
//! This package contains some PostgreSQL constants and datatypes that are the same in all versions
//! of PostgreSQL and unlikely to change in the future either. These could be derived from the
//! PostgreSQL headers with 'bindgen', but in order to avoid proliferating the dependency to bindgen
//! and the PostgreSQL C headers to all services, we prefer to have this small stand-alone crate for
//! them instead.
//!
//! Be mindful in what you add here, as these types are deeply ingrained in the APIs.
pub mod constants;
pub mod forknum;
pub type Oid = u32;
pub type RepOriginId = u16;

View File

@@ -14,6 +14,7 @@ bytes.workspace = true
pageserver_api.workspace = true
prost.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["io-util"] }

View File

@@ -8,8 +8,8 @@ use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::walrecord::*;
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
use utils::lsn::Lsn;
use crate::models::*;

View File

@@ -25,6 +25,9 @@
//! |
//! |--> write to KV store within the pageserver
pub mod record;
pub mod value;
use bytes::Bytes;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::walrecord::{

View File

@@ -10,7 +10,7 @@
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use crate::record::NeonWalRecord;
use crate::models::record::NeonWalRecord;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Value {

View File

@@ -1,4 +1,4 @@
//! This module implements batch type for serialized [`pageserver_api::value::Value`]
//! This module implements batch type for serialized [`crate::models::value::Value`]
//! instances. Each batch contains a raw buffer (serialized values)
//! and a list of metadata for each (key, LSN) tuple present in the batch.
//!
@@ -10,10 +10,8 @@ use std::collections::{BTreeSet, HashMap};
use bytes::{Bytes, BytesMut};
use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::value::Value;
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
use serde::{Deserialize, Serialize};
@@ -21,6 +19,8 @@ use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use crate::models::InterpretedWalRecord;
use crate::models::record::NeonWalRecord;
use crate::models::value::Value;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);

View File

@@ -56,6 +56,7 @@ pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_initdb.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true

View File

@@ -13,11 +13,11 @@ use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::SerializedValueBatch;
// A very cheap hash for generating non-sequential keys.

View File

@@ -67,12 +67,12 @@ use once_cell::sync::Lazy;
use pageserver::config::PageServerConf;
use pageserver::walredo::{PostgresRedoManager, RedoAttemptType};
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use utils::id::TenantId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
fn bench(c: &mut Criterion) {
macro_rules! bench_group {

View File

@@ -83,6 +83,7 @@ impl Client {
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_header: Option<String>,
compression: Option<tonic::codec::CompressionEncoding>,
) -> anyhow::Result<Self> {
let endpoint: tonic::transport::Endpoint = into_endpoint
.try_into()
@@ -90,7 +91,15 @@ impl Client {
let channel = endpoint.connect().await?;
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let client = proto::PageServiceClient::with_interceptor(channel, auth);
let mut client = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
// TODO: consider enabling compression by default.
client = client
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { client })
}
@@ -112,7 +121,7 @@ impl Client {
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>>, tonic::Status> {
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>> + 'static, tonic::Status> {
let proto_req = proto::GetBaseBackupRequest::from(req);
let response_stream: Streaming<proto::GetBaseBackupResponseChunk> =

View File

@@ -1,20 +1,29 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::Context;
use anyhow::anyhow;
use futures::TryStreamExt as _;
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
use pageserver_client::page_service::BasebackupRequest;
use pageserver_page_api as page_api;
use rand::prelude::*;
use reqwest::Url;
use tokio::io::AsyncRead;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _};
use tokio_util::io::StreamReader;
use tonic::async_trait;
use tracing::{info, instrument};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
@@ -24,14 +33,15 @@ use crate::util::{request_stats, tokio_thread_local_stats};
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
/// The Pageserver to connect to. Use postgresql:// for libpq, or grpc:// for gRPC.
#[clap(long, default_value = "postgresql://postgres@localhost:64000")]
page_service_connstring: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long, default_value = "1.0")]
gzip_probability: f64,
#[clap(long)]
no_compression: bool,
#[clap(long)]
runtime: Option<humantime::Duration>,
#[clap(long)]
@@ -146,12 +156,23 @@ async fn main_impl(
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let connurl = Url::parse(&args.page_service_connstring)?;
for &tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
let client: Box<dyn Client> = match connurl.scheme() {
"postgresql" | "postgres" => Box::new(
LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
),
"grpc" => Box::new(
GrpcClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
),
scheme => return Err(anyhow!("invalid scheme {scheme}")),
};
tasks.push(tokio::spawn(run_worker(
client,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
@@ -166,13 +187,7 @@ async fn main_impl(
let mut rng = rand::thread_rng();
let target = all_targets.choose(&mut rng).unwrap();
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
(
target.timeline,
Work {
lsn,
gzip: rng.gen_bool(args.gzip_probability),
},
)
(target.timeline, Work { lsn })
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
@@ -216,13 +231,11 @@ async fn main_impl(
#[derive(Copy, Clone)]
struct Work {
lsn: Option<Lsn>,
gzip: bool,
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
async fn run_worker(
mut client: Box<dyn Client>,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<Work>,
all_work_done_barrier: Arc<Barrier>,
@@ -230,37 +243,14 @@ async fn client(
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
while let Some(Work { lsn }) = work.recv().await {
let start = Instant::now();
let copy_out_stream = client
.basebackup(&BasebackupRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
lsn,
gzip,
})
.await
.with_context(|| format!("start basebackup for {timeline}"))
.unwrap();
let stream = client.basebackup(lsn).await.unwrap();
use futures::StreamExt;
let size = Arc::new(AtomicUsize::new(0));
copy_out_stream
.for_each({
|r| {
let size = Arc::clone(&size);
async move {
let size = Arc::clone(&size);
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
}
}
})
.await;
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let size = futures::io::copy(stream.compat(), &mut tokio::io::sink().compat_write())
.await
.unwrap();
info!("basebackup size is {size} bytes");
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
@@ -270,3 +260,94 @@ async fn client(
all_work_done_barrier.wait().await;
}
/// A basebackup client. This allows switching out the client protocol implementation.
#[async_trait]
trait Client: Send {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send>>>;
}
/// A libpq-based Pageserver client.
struct LibpqClient {
inner: pageserver_client::page_service::Client,
ttid: TenantTimelineId,
compression: bool,
}
impl LibpqClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
Ok(Self {
inner: pageserver_client::page_service::Client::new(connstring.to_string()).await?,
ttid,
compression,
})
}
}
#[async_trait]
impl Client for LibpqClient {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
let req = BasebackupRequest {
tenant_id: self.ttid.tenant_id,
timeline_id: self.ttid.timeline_id,
lsn,
gzip: self.compression,
};
let stream = self.inner.basebackup(&req).await?;
Ok(Box::pin(StreamReader::new(
stream.map_err(std::io::Error::other),
)))
}
}
/// A gRPC Pageserver client.
struct GrpcClient {
inner: page_api::Client,
}
impl GrpcClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = page_api::Client::new(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,
ShardIndex::unsharded(),
None,
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
)
.await?;
Ok(Self { inner })
}
}
#[async_trait]
impl Client for GrpcClient {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
let req = page_api::GetBaseBackupRequest {
lsn,
replica: false,
full: false,
};
let stream = self.inner.get_base_backup(req).await?;
Ok(Box::pin(StreamReader::new(
stream.map_err(std::io::Error::other),
)))
}
}

View File

@@ -18,13 +18,12 @@ use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{
DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID, PG_HBA, PGDATA_SPECIAL_FILES,
};
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};
use postgres_ffi::{
BLCKSZ, PG_TLI, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName, dispatch_pgversion, pg_constants,
};
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
use tokio::io;
use tokio::io::AsyncWrite;
use tokio_tar::{Builder, EntryType, Header};
@@ -372,6 +371,7 @@ where
.partition(
self.timeline.get_shard_identity(),
self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
BLCKSZ as u64,
);
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);

View File

@@ -520,7 +520,7 @@ async fn import_file(
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let spcnode = postgres_ffi_types::constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_name.as_ref() {
@@ -553,7 +553,7 @@ async fn import_file(
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let spcnode = postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.nth(1)

View File

@@ -41,7 +41,7 @@ use postgres_backend::{
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
};
use postgres_ffi::BLCKSZ;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
use pq_proto::framed::ConnectionError;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use smallvec::{SmallVec, smallvec};
@@ -3286,7 +3286,14 @@ impl GrpcPageServiceHandler {
Ok(req)
}))
// Run the page service.
.service(proto::PageServiceServer::new(page_service_handler));
.service(
proto::PageServiceServer::new(page_service_handler)
// Support both gzip and zstd compression. The client decides what to use.
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
.send_compressed(tonic::codec::CompressionEncoding::Gzip)
.send_compressed(tonic::codec::CompressionEncoding::Zstd),
);
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
@@ -3532,7 +3539,6 @@ impl proto::PageService for GrpcPageServiceHandler {
Ok(tonic::Response::new(resp.into()))
}
// TODO: ensure clients use gzip compression for the stream.
#[instrument(skip_all, fields(lsn))]
async fn get_base_backup(
&self,

View File

@@ -23,12 +23,11 @@ use pageserver_api::key::{
};
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
use pageserver_api::models::RelSizeMigration;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use pageserver_api::value::Value;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
use postgres_ffi::{BLCKSZ, TimestampTz, TransactionId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
@@ -36,6 +35,8 @@ use tracing::{debug, info, info_span, trace, warn};
use utils::bin_ser::{BeSer, DeserializeError};
use utils::lsn::Lsn;
use utils::pausable_failpoint;
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
@@ -720,6 +721,7 @@ impl Timeline {
let batches = keyspace.partition(
self.get_shard_identity(),
self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
BLCKSZ as u64,
);
let io_concurrency = IoConcurrency::spawn_from_conf(
@@ -960,6 +962,7 @@ impl Timeline {
let batches = keyspace.partition(
self.get_shard_identity(),
self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
BLCKSZ as u64,
);
let io_concurrency = IoConcurrency::spawn_from_conf(

View File

@@ -496,7 +496,7 @@ impl WalRedoManager {
key: pageserver_api::key::Key,
lsn: Lsn,
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>,
records: Vec<(Lsn, wal_decoder::models::record::NeonWalRecord)>,
pg_version: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<bytes::Bytes, walredo::Error> {
@@ -1859,6 +1859,29 @@ impl TenantShard {
}
}
// At this point we've initialized all timelines and are tracking them.
// Now compute the layer visibility for all (not offloaded) timelines.
let compute_visiblity_for = {
let timelines_accessor = self.timelines.lock().unwrap();
let mut timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
timelines_offloaded_accessor.extend(offloaded_timelines_list.into_iter());
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
timelines_accessor.values().cloned().collect::<Vec<_>>()
};
for tl in compute_visiblity_for {
tl.update_layer_visibility().await.with_context(|| {
format!(
"failed initial timeline visibility computation {} for tenant {}",
tl.timeline_id, self.tenant_shard_id
)
})?;
}
// Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
remote_timeline_client
@@ -1878,10 +1901,6 @@ impl TenantShard {
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
{
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
}
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
//
@@ -3449,9 +3468,6 @@ impl TenantShard {
.values()
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, background_jobs_can_start);
@@ -5836,10 +5852,10 @@ pub(crate) mod harness {
use once_cell::sync::OnceCell;
use pageserver_api::key::Key;
use pageserver_api::models::ShardParameters;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::ShardIndex;
use utils::id::TenantId;
use utils::logging;
use wal_decoder::models::record::NeonWalRecord;
use super::*;
use crate::deletion_queue::mock::MockDeletionQueue;
@@ -6094,9 +6110,6 @@ mod tests {
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use pageserver_compaction::helpers::overlaps_with;
#[cfg(feature = "testing")]
use rand::SeedableRng;
@@ -6117,6 +6130,9 @@ mod tests {
use timeline::{CompactOptions, DeltaLayerTestDesc, VersionedKeySpaceQuery};
use utils::id::TenantId;
use utils::shard::{ShardCount, ShardNumber};
#[cfg(feature = "testing")]
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use super::*;
use crate::DEFAULT_PG_VERSION;

View File

@@ -34,11 +34,11 @@ pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use tracing::{Instrument, info_span, trace};
use utils::lsn::Lsn;
use utils::sync::gate::GateGuard;
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use self::inmemory_layer::InMemoryLayerFileId;
use super::PageReconstructError;

View File

@@ -4,11 +4,11 @@ use std::sync::Arc;
use bytes::Bytes;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use wal_decoder::models::value::Value;
use super::errors::PutError;
use super::layer::S3_UPLOAD_LIMIT;

View File

@@ -44,7 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
@@ -54,6 +53,7 @@ use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::errors::PutError;
use super::{
@@ -1306,7 +1306,7 @@ impl DeltaLayerInner {
// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = pageserver_api::value::ValueBytes::will_init(&data)
let will_init = wal_decoder::models::value::ValueBytes::will_init(&data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
@@ -1369,7 +1369,7 @@ impl DeltaLayerInner {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = pageserver_api::record::describe_wal_record(&rec)?;
let wal_desc = wal_decoder::models::record::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
@@ -1624,7 +1624,6 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
@@ -1988,7 +1987,7 @@ pub(crate) mod test {
#[tokio::test]
async fn copy_delta_prefix_smoke() {
use bytes::Bytes;
use pageserver_api::record::NeonWalRecord;
use wal_decoder::models::record::NeonWalRecord;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
.await

View File

@@ -4,8 +4,8 @@ use std::sync::Arc;
use anyhow::bail;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, SparseKeySpace};
use pageserver_api::value::Value;
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::PersistentLayerKey;
use super::merge_iterator::{MergeIterator, MergeIteratorItem};
@@ -126,7 +126,6 @@ mod tests {
#[tokio::test]
async fn filter_keyspace_iterator() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator")
.await

View File

@@ -42,7 +42,6 @@ use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
@@ -52,6 +51,7 @@ use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::errors::PutError;
use super::layer_name::ImageLayerName;
@@ -1232,10 +1232,10 @@ mod test {
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use pageserver_api::value::Value;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::{ImageLayerIterator, ImageLayerWriter};
use crate::DEFAULT_PG_VERSION;

View File

@@ -824,7 +824,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
#[tokio::test(start_paused = true)]
async fn eviction_cancellation_on_drop() {
use bytes::Bytes;
use pageserver_api::value::Value;
use wal_decoder::models::value::Value;
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();

View File

@@ -4,8 +4,8 @@ use std::sync::Arc;
use anyhow::bail;
use pageserver_api::key::Key;
use pageserver_api::value::Value;
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
use super::image_layer::{ImageLayerInner, ImageLayerIterator};
@@ -402,9 +402,9 @@ impl<'a> MergeIterator<'a> {
mod tests {
use itertools::Itertools;
use pageserver_api::key::Key;
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
use utils::lsn::Lsn;
#[cfg(feature = "testing")]
use wal_decoder::models::record::NeonWalRecord;
use super::*;
use crate::DEFAULT_PG_VERSION;
@@ -436,7 +436,6 @@ mod tests {
#[tokio::test]
async fn merge_in_between() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_merge_in_between")
.await
@@ -501,7 +500,6 @@ mod tests {
#[tokio::test]
async fn delta_merge() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_delta_merge")
.await
@@ -578,7 +576,6 @@ mod tests {
#[tokio::test]
async fn delta_image_mixed_merge() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
.await

View File

@@ -56,8 +56,6 @@ use pageserver_api::models::{
};
use pageserver_api::reltag::{BlockNumber, RelTag};
use pageserver_api::shard::{ShardIdentity, ShardIndex, ShardNumber, TenantShardId};
#[cfg(test)]
use pageserver_api::value::Value;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::v14::xlog_utils;
use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
@@ -81,6 +79,8 @@ use utils::seqwait::SeqWait;
use utils::simple_rcu::{Rcu, RcuReadGuard};
use utils::sync::gate::{Gate, GateGuard};
use utils::{completion, critical, fs_ext, pausable_failpoint};
#[cfg(test)]
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use self::delete::DeleteTimelineFlow;
@@ -3422,10 +3422,6 @@ impl Timeline {
// TenantShard::create_timeline will wait for these uploads to happen before returning, or
// on retry.
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
self.update_layer_visibility().await?;
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
@@ -5211,7 +5207,11 @@ impl Timeline {
}
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
let dense_partitioning = dense_ks.partition(
&self.shard_identity,
partition_size,
postgres_ffi::BLCKSZ as u64,
);
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
}; // no partitioning for metadata keys for now
@@ -5939,7 +5939,7 @@ impl Drop for Timeline {
if let Ok(mut gc_info) = ancestor.gc_info.write() {
if !gc_info.remove_child_not_offloaded(self.timeline_id) {
tracing::error!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id,
"Couldn't remove retain_lsn entry from offloaded timeline's parent: already removed");
"Couldn't remove retain_lsn entry from timeline's parent on drop: already removed");
}
}
}
@@ -7594,11 +7594,11 @@ mod tests {
use std::sync::Arc;
use pageserver_api::key::Key;
use pageserver_api::value::Value;
use std::iter::Iterator;
use tracing::Instrument;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::HeatMapTimeline;
use crate::context::RequestContextBuilder;

View File

@@ -29,9 +29,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::keyspace::{KeySpace, ShardedRange};
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
use pageserver_compaction::interface::*;
use serde::Serialize;
@@ -41,6 +39,8 @@ use tracing::{Instrument, debug, error, info, info_span, trace, warn};
use utils::critical;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;

View File

@@ -36,8 +36,8 @@ use pageserver_api::keyspace::{ShardedRange, singleton_range};
use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::BLCKSZ;
use postgres_ffi::relfile_utils::parse_relfilename;
use postgres_ffi::{BLCKSZ, pg_constants};
use remote_storage::RemotePath;
use tokio::sync::Semaphore;
use tokio_stream::StreamExt;
@@ -558,7 +558,7 @@ impl PgDataDir {
PgDataDirDb::new(
storage,
&basedir.join(dboid.to_string()),
pg_constants::DEFAULTTABLESPACE_OID,
postgres_ffi_types::constants::DEFAULTTABLESPACE_OID,
dboid,
&datadir_path,
)
@@ -571,7 +571,7 @@ impl PgDataDir {
PgDataDirDb::new(
storage,
&datadir_path.join("global"),
postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
postgres_ffi_types::constants::GLOBALTABLESPACE_OID,
0,
&datadir_path,
)

View File

@@ -28,20 +28,20 @@ use std::time::{Duration, Instant, SystemTime};
use bytes::{Buf, Bytes};
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::walrecord::*;
use postgres_ffi::{
TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
fsm_logical_to_physical, pg_constants,
};
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use tracing::*;
use utils::bin_ser::{DeserializeError, SerializeError};
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::*;
use crate::ZERO_PAGE;

View File

@@ -32,12 +32,12 @@ use anyhow::Context;
use bytes::{Bytes, BytesMut};
use pageserver_api::key::Key;
use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
use tracing::*;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;
use wal_decoder::models::record::NeonWalRecord;
use crate::config::PageServerConf;
use crate::metrics::{
@@ -571,11 +571,11 @@ mod tests {
use bytes::Bytes;
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
use tracing::Instrument;
use utils::id::TenantId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
use super::PostgresRedoManager;
use crate::config::PageServerConf;

View File

@@ -2,16 +2,16 @@ use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::v14::nonrelfile_utils::{
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
transaction_id_set_status,
};
use postgres_ffi::{BLCKSZ, pg_constants};
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
use tracing::*;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
/// Can this request be served by neon redo functions
/// or we need to pass it to wal-redo postgres process?

View File

@@ -10,7 +10,6 @@ use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
@@ -18,6 +17,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{Instrument, debug, error, instrument};
use utils::lsn::Lsn;
use utils::poison::Poison;
use wal_decoder::models::record::NeonWalRecord;
use self::no_leak_child::NoLeakChild;
use crate::config::PageServerConf;

27
poetry.lock generated
View File

@@ -746,23 +746,23 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"]
[[package]]
name = "botocore"
version = "1.34.11"
version = "1.34.162"
description = "Low-level, data-driven core of boto 3."
optional = false
python-versions = ">= 3.8"
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "botocore-1.34.11-py3-none-any.whl", hash = "sha256:1ff1398b6ea670e1c01ac67a33af3da854f8e700d3528289c04f319c330d8250"},
{file = "botocore-1.34.11.tar.gz", hash = "sha256:51905c3d623c60df5dc5794387de7caf886d350180a01a3dfa762e903edb45a9"},
{file = "botocore-1.34.162-py3-none-any.whl", hash = "sha256:2d918b02db88d27a75b48275e6fb2506e9adaaddbec1ffa6a8a0898b34e769be"},
{file = "botocore-1.34.162.tar.gz", hash = "sha256:adc23be4fb99ad31961236342b7cbf3c0bfc62532cd02852196032e8c0d682f3"},
]
[package.dependencies]
jmespath = ">=0.7.1,<2.0.0"
python-dateutil = ">=2.1,<3.0.0"
urllib3 = {version = ">=1.25.4,<2.1", markers = "python_version >= \"3.10\""}
urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""}
[package.extras]
crt = ["awscrt (==0.19.19)"]
crt = ["awscrt (==0.21.2)"]
[[package]]
name = "botocore-stubs"
@@ -3422,20 +3422,21 @@ files = [
[[package]]
name = "urllib3"
version = "1.26.19"
version = "2.5.0"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "urllib3-1.26.19-py2.py3-none-any.whl", hash = "sha256:37a0344459b199fce0e80b0d3569837ec6b6937435c5244e7fd73fa6006830f3"},
{file = "urllib3-1.26.19.tar.gz", hash = "sha256:3e3d753a8618b86d7de333b4223005f68720bcd6a7d2bcb9fbd2229ec7c1e429"},
{file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"},
{file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"},
]
[package.extras]
brotli = ["brotli (==1.0.9) ; os_name != \"nt\" and python_version < \"3\" and platform_python_implementation == \"CPython\"", "brotli (>=1.0.9) ; python_version >= \"3\" and platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; (os_name != \"nt\" or python_version >= \"3\") and platform_python_implementation != \"CPython\"", "brotlipy (>=0.6.0) ; os_name == \"nt\" and python_version < \"3\""]
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress ; python_version == \"2.7\"", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"]
socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""]
h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "websockets"

View File

@@ -489,7 +489,9 @@ class NeonEnvBuilder:
self.config_init_force: str | None = None
self.top_output_dir = top_output_dir
self.control_plane_hooks_api: str | None = None
self.storage_controller_config: dict[Any, Any] | None = None
self.storage_controller_config: dict[Any, Any] | None = {
"timelines_onto_safekeepers": True,
}
# Flag to enable https listener in pageserver, generate local ssl certs,
# and force storage controller to use https for pageserver api.
@@ -4909,6 +4911,9 @@ class Safekeeper(LogUtils):
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
return res
def safekeeper_id(self) -> SafekeeperId:
return SafekeeperId(self.id, "localhost", self.port.pg_tenant_only)
@property
def data_dir(self) -> Path:
return self.env.repo_dir / "safekeepers" / f"sk{self.id}"

View File

@@ -11,6 +11,7 @@ from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.safekeeper.http import MembershipConfiguration, TimelineCreateRequest
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
@@ -164,6 +165,19 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
env.pageserver.tenant_create(env.initial_tenant)
sk = env.safekeepers[0]
assert sk
sk.http_client().timeline_create(
TimelineCreateRequest(
env.initial_tenant,
env.initial_timeline,
MembershipConfiguration(generation=1, members=[sk.safekeeper_id()], new_members=None),
int(env.pg_version),
Lsn(0),
None,
)
)
initial_branch = "initial_branch"
def start_creating_timeline():

View File

@@ -64,6 +64,11 @@ def test_normal_work(
"""
neon_env_builder.num_safekeepers = num_safekeepers
if safekeeper_proto_version == 2:
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -671,6 +671,12 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu
"""
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# On the new mode, the test runs into a cancellation issue, i.e. the walproposer can't shut down
# as it is hang-waiting on the timeline_checkpoint call in WalIngest::new.
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
# turn off background tasks so that they don't interfere with the downloads
env = neon_env_builder.init_start(
initial_tenant_conf={

View File

@@ -88,6 +88,12 @@ def test_storage_controller_smoke(
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
# These bubble up from safekeepers
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"]
)
# Start services by hand so that we can skip a pageserver (this will start + register later)
env.broker.start()
env.storage_controller.start()
@@ -3455,7 +3461,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert target.get_safekeeper(fake_id) is None
assert len(target.get_safekeepers()) == 0
start_sks = target.get_safekeepers()
sk_0 = env.safekeepers[0]
@@ -3477,7 +3483,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
inserted = target.get_safekeeper(fake_id)
assert inserted is not None
assert target.get_safekeepers() == [inserted]
assert target.get_safekeepers() == start_sks + [inserted]
assert eq_safekeeper_records(body, inserted)
# error out if pk is changed (unexpected)
@@ -3489,7 +3495,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert exc.value.status_code == 400
inserted_again = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_again]
assert target.get_safekeepers() == start_sks + [inserted_again]
assert inserted_again is not None
assert eq_safekeeper_records(inserted, inserted_again)
@@ -3498,7 +3504,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
body["version"] += 1
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert target.get_safekeepers() == start_sks + [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
@@ -3507,7 +3513,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
body["https_port"] = 123
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert target.get_safekeepers() == start_sks + [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
env.storage_controller.consistency_check()
@@ -3516,7 +3522,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
body["https_port"] = None
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert target.get_safekeepers() == start_sks + [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
env.storage_controller.consistency_check()
@@ -3635,6 +3641,9 @@ def test_timeline_delete_mid_live_migration(neon_env_builder: NeonEnvBuilder, mi
env = neon_env_builder.init_configs()
env.start()
for ps in env.pageservers:
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.storage_controller.tenant_create(tenant_id, placement_policy={"Attached": 1})

View File

@@ -341,6 +341,11 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
env = neon_env_builder.init_configs()
env.start()
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"]
)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(

View File

@@ -21,7 +21,10 @@ from fixtures.neon_fixtures import (
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException
from fixtures.pageserver.http import (
HistoricLayerInfo,
PageserverApiException,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import assert_pageserver_backups_equal, skip_in_debug_build, wait_until
@@ -413,6 +416,7 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots
"read_only": True,
},
)
sk = env.safekeepers[0]
assert sk
with pytest.raises(requests.exceptions.HTTPError, match="Not Found"):
@@ -504,8 +508,15 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots
assert len(lineage.get("original_ancestor", [])) == 0
assert len(lineage.get("reparenting_history", [])) == 0
for name, _, _, rows, starts in expected_result:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
for branch_name, queried_timeline, _, rows, starts in expected_result:
details = client.timeline_detail(env.initial_tenant, queried_timeline)
log.info(f"reading data from branch {branch_name}")
# specifying the lsn makes the endpoint read-only and not connect to safekeepers
with env.endpoints.create(
branch_name,
lsn=Lsn(details["last_record_lsn"]),
) as ep:
ep.start(safekeeper_generation=1)
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1
@@ -1088,6 +1099,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
pageservers = dict((int(p.id), p) for p in env.pageservers)
@@ -1209,6 +1221,7 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
pageservers = dict((int(p.id), p) for p in env.pageservers)

View File

@@ -24,6 +24,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"},
initial_tenant_shard_count=2 if sharded else None,
)
for ps in env.pageservers:
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
if sharded:
http = env.storage_controller.pageserver_api()

View File

@@ -229,7 +229,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder):
# Test timeline_list endpoint.
http_cli = env.safekeepers[0].http_client()
assert len(http_cli.timeline_list()) == 3
assert len(http_cli.timeline_list()) == 4
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
@@ -740,8 +740,8 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_timeline_status")
endpoint = env.endpoints.create_start("test_timeline_status")
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
wa = env.safekeepers[0]
@@ -1292,6 +1292,12 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder):
# it works without compute at all.
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
# timelines should be created the old way
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
@@ -1532,6 +1538,11 @@ def test_safekeeper_without_pageserver(
def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
# timelines should be created the old way manually until we have migration support
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
@@ -1661,6 +1672,15 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
res = env.safekeepers[3].pull_timeline(
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
)
sk_id_1 = env.safekeepers[0].safekeeper_id()
sk_id_3 = env.safekeepers[2].safekeeper_id()
sk_id_4 = env.safekeepers[3].safekeeper_id()
new_conf = MembershipConfiguration(
generation=2, members=[sk_id_1, sk_id_3, sk_id_4], new_members=None
)
for i in [0, 2, 3]:
env.safekeepers[i].http_client().membership_switch(tenant_id, timeline_id, new_conf)
log.info("Finished pulling timeline")
log.info(res)
@@ -1705,13 +1725,15 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
dst_sk.stop()
[tenant_id, timeline_id] = env.create_tenant()
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint = env.endpoints.create("main", tenant_id=tenant_id)
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
@@ -1723,6 +1745,7 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
src_http = src_sk.http_client()
# run pull_timeline which will halt before downloading files
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
dst_sk.start()
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
@@ -1782,23 +1805,27 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
dst_sk.stop()
src_http = src_sk.http_client()
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
timeline_id = env.create_branch("pull_timeline_term_changes")
# run pull_timeline which will halt before downloading files
log.info("use only first 2 safekeepers, 3rd will be seeded")
ep = env.endpoints.create("main")
ep = env.endpoints.create("pull_timeline_term_changes")
ep.active_safekeepers = [1, 2]
ep.start()
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
src_http = src_sk.http_client()
# run pull_timeline which will halt before downloading files
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
dst_sk.start()
pt_handle.start()
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
@@ -1807,7 +1834,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
# restart compute to bump term
ep.stop()
ep = env.endpoints.create("main")
ep = env.endpoints.create("pull_timeline_term_changes")
ep.active_safekeepers = [1, 2]
ep.start()
ep.safe_psql("insert into t select generate_series(1, 100), 'pear'")
@@ -1929,6 +1956,11 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder):
@run_only_on_default_postgres("tests only safekeeper API")
def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
# timelines should be created the old way
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
# These are expected after timeline deletion on safekeepers.
@@ -2009,6 +2041,12 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
created manually, later storcon will do that.
"""
neon_env_builder.num_safekeepers = 3
# timelines should be created the old way manually
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
@@ -2064,7 +2102,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_idle_reconnections")
timeline_id = env.initial_timeline
def collect_stats() -> dict[str, float]:
# we need to collect safekeeper_pg_queries_received_total metric from all safekeepers
@@ -2095,7 +2133,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
collect_stats()
endpoint = env.endpoints.create_start("test_idle_reconnections")
endpoint = env.endpoints.create_start("main")
# just write something to the timeline
endpoint.safe_psql("create table t(i int)")
collect_stats()

View File

@@ -590,6 +590,13 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int):
neon_env_builder.num_safekeepers = 3
if safekeeper_proto_version == 2:
# On the legacy protocol, we don't support generations, which are part of
# `timelines_onto_safekeepers`
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
@@ -713,6 +720,11 @@ async def run_quorum_sanity(env: NeonEnv):
# we don't.
def test_quorum_sanity(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
# The test fails basically always on the new mode.
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
asyncio.run(run_quorum_sanity(env))

View File

@@ -16,6 +16,13 @@ if TYPE_CHECKING:
# Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout.
# Ensures that walreceiver does not run without any data inserted and only starts after the insertion.
def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
# we assert below that the walreceiver is not active before data writes.
# with manually created timelines, it is active.
# FIXME: remove this test once we remove timelines_onto_safekeepers
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
# Trigger WAL wait timeout faster
neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'"
env = neon_env_builder.init_start()