Compare commits

..

12 Commits

Author SHA1 Message Date
Alex Chi Z
b7d7496221 fix storage tests
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-24 13:56:38 -04:00
Suhas Thalanki
5c9470a5cf Merge branch 'main' into thesuhas/pg_stat_rollout 2025-06-19 09:42:07 -07:00
Erik Grinaker
dc1625cd8e pagebench: add basebackup gRPC support (#12250)
## Problem

Pagebench does not support gRPC for `basebackup` benchmarks.

Requires #12243.
Touches #11728.

## Summary of changes

Add gRPC support via gRPC connstrings, e.g. `pagebench basebackup
--page-service-connstring grpc://localhost:51051`.

Also change `--gzip-probability` to `--no-compression`, since this must
be specified per-client for gRPC.
2025-06-19 15:40:57 +00:00
dependabot[bot]
a6d4de25cd build(deps): bump urllib3 from 1.26.19 to 2.5.0 in the pip group across 1 directory (#12289)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-06-19 14:20:02 +00:00
Arpad Müller
ec1452a559 Switch on --timelines-onto-safekeepers in integration tests (#11712)
Switch on the `--timelines-onto-safekeepers` param in integration tests.
Some changes that were needed to enable this but which I put into other
PRs to not clutter up this one:

* #11786
* #11854
* #12129
* #12138

Further fixes that were needed for this:

* https://github.com/neondatabase/neon/pull/11801
* https://github.com/neondatabase/neon/pull/12143
* https://github.com/neondatabase/neon/pull/12204

Not strictly needed, but helpful:

* https://github.com/neondatabase/neon/pull/12155

Part of #11670
Closes #11424
2025-06-19 11:17:01 +00:00
Heikki Linnakangas
1950ccfe33 Eliminate dependency from pageserver_api to postgres_ffi (#12273)
Introduce a separate `postgres_ffi_types` crate which contains a few
types and functions that were used in the API. `postgres_ffi_types` is a
much small crate than `postgres_ffi`, and it doesn't depend on bindgen
or the Postgres C headers.

Move NeonWalRecord and Value types to wal_decoder crate. They are only
used in the pageserver-safekeeper "ingest" API. The rest of the ingest
API types are defined in wal_decoder, so move these there as well.
2025-06-19 10:31:27 +00:00
Heikki Linnakangas
2ca6665f4a Remove outdated 'clean' Makefile targest (#12288)
We have been bad at keeping them up-to-date, several contrib modules and
neon extensions were missing from the clean rules. Give up trying, and
remove the targets altogether. In practice, it's straightforward to just
do `rm -rf pg_install/build`, so the clean-targets are hardly worth the
maintenance effort.

I kept `make distclean` though. The rule for that is simple enough.
2025-06-19 10:24:09 +00:00
Heikki Linnakangas
fa954671b2 Remove unnecessary Postgres libs from the storage docker image (#12286)
Since commit 87ad50c925, storage_controller has used diesel_async, which
in turn uses tokio-postgres as the Postgres client, which doesn't
require libpq. Thus we no longer need libpq in the storage image.
2025-06-19 10:00:01 +00:00
Erik Grinaker
6f4ffdb48b pageserver: add gRPC compression (#12280)
## Problem

The gRPC page service should support compression.

Requires #12111.
Touches #11728.
Touches https://github.com/neondatabase/cloud/issues/25679.

## Summary of changes

Add support for gzip and zstd compression in the server, and a client
parameter to enable compression.

This will need further benchmarking under realistic network conditions.
2025-06-19 09:54:34 +00:00
Vlad Lazar
3f676df3d5 pageserver: fix initial layer visibility calculation (#12206)
## Problem

GC info is an input to updating layer visibility.
Currently, gc info is updated on timeline activation and visibility is
computed on tenant attach, so we ignore branch points and compute
visibility by taking all layers into account.

Side note: gc info is also updated when timelines are created and
dropped. That doesn't help because we create the timelines in
topological order from the root. Hence the root timeline goes first,
without context of where the branch points are.

The impact of this in prod is that shards need to rehydrate layers after
live migration since the non-visible ones were excluded from the
heatmap.

## Summary of Changes

Move the visibility calculation into tenant attachment instead of
activation.
2025-06-19 09:53:18 +00:00
Suhas Thalanki
9678734b75 reverted some changes, added boot_val from global_settings in staging 2025-06-18 14:42:31 -07:00
Suhas Thalanki
ed3100a70e setting a non-zero value for pg_stat rollout test 2025-06-18 11:28:13 -07:00
69 changed files with 583 additions and 421 deletions

15
Cargo.lock generated
View File

@@ -4334,6 +4334,7 @@ dependencies = [
"postgres_backend",
"postgres_connection",
"postgres_ffi",
"postgres_ffi_types",
"postgres_initdb",
"posthog_client_lite",
"pprof",
@@ -4403,7 +4404,7 @@ dependencies = [
"nix 0.30.1",
"once_cell",
"postgres_backend",
"postgres_ffi",
"postgres_ffi_types",
"rand 0.8.5",
"remote_storage",
"reqwest",
@@ -4892,6 +4893,7 @@ dependencies = [
"memoffset 0.9.0",
"once_cell",
"postgres",
"postgres_ffi_types",
"pprof",
"regex",
"serde",
@@ -4900,6 +4902,14 @@ dependencies = [
"utils",
]
[[package]]
name = "postgres_ffi_types"
version = "0.1.0"
dependencies = [
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "postgres_initdb"
version = "0.1.0"
@@ -7561,6 +7571,7 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"flate2",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
@@ -7580,6 +7591,7 @@ dependencies = [
"tower-layer",
"tower-service",
"tracing",
"zstd",
]
[[package]]
@@ -8159,6 +8171,7 @@ dependencies = [
"futures",
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"pprof",
"prost 0.13.5",
"remote_storage",

View File

@@ -22,6 +22,7 @@ members = [
"libs/http-utils",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/postgres_ffi_types",
"libs/safekeeper_api",
"libs/desim",
"libs/neon-shmem",
@@ -199,7 +200,7 @@ tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] }
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
@@ -259,6 +260,7 @@ pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_ffi_types = { version = "0.1", path = "./libs/postgres_ffi_types/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }

View File

@@ -5,8 +5,6 @@
ARG REPOSITORY=ghcr.io/neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG DEFAULT_PG_VERSION=17
ARG STABLE_PG_VERSION=16
ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
@@ -63,14 +61,11 @@ FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG STABLE_PG_VERSION
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
COPY --from=plan /home/nonroot/recipe.json recipe.json
ARG ADDITIONAL_RUSTFLAGS=""
@@ -97,7 +92,6 @@ RUN set -e \
# Build final image
#
FROM $BASE_IMAGE_SHA
ARG DEFAULT_PG_VERSION
WORKDIR /data
RUN set -e \
@@ -107,8 +101,6 @@ RUN set -e \
libreadline-dev \
libseccomp-dev \
ca-certificates \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
openssl \
unzip \
curl \

View File

@@ -167,13 +167,6 @@ postgres-%: postgres-configure-% \
+@echo "Compiling test_decoding $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install
.PHONY: postgres-clean-%
postgres-clean-%:
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean
.PHONY: postgres-check-%
postgres-check-%: postgres-%
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check
@@ -206,21 +199,6 @@ neon-pg-ext-%: postgres-%
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
.PHONY: neon-pg-clean-ext-%
neon-pg-clean-ext-%:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean
# Build walproposer as a static library. walproposer source code is located
# in the pgxn/neon directory.
#
@@ -253,12 +231,6 @@ ifeq ($(UNAME_S),Linux)
pg_crc32c.o
endif
.PHONY: walproposer-lib-clean
walproposer-lib-clean:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
.PHONY: neon-pg-ext
neon-pg-ext: \
neon-pg-ext-v14 \
@@ -266,13 +238,6 @@ neon-pg-ext: \
neon-pg-ext-v16 \
neon-pg-ext-v17
.PHONY: neon-pg-clean-ext
neon-pg-clean-ext: \
neon-pg-clean-ext-v14 \
neon-pg-clean-ext-v15 \
neon-pg-clean-ext-v16 \
neon-pg-clean-ext-v17
# shorthand to build all Postgres versions
.PHONY: postgres
postgres: \
@@ -288,13 +253,6 @@ postgres-headers: \
postgres-headers-v16 \
postgres-headers-v17
.PHONY: postgres-clean
postgres-clean: \
postgres-clean-v14 \
postgres-clean-v15 \
postgres-clean-v16 \
postgres-clean-v17
.PHONY: postgres-check
postgres-check: \
postgres-check-v14 \
@@ -302,12 +260,6 @@ postgres-check: \
postgres-check-v16 \
postgres-check-v17
# This doesn't remove the effects of 'configure'.
.PHONY: clean
clean: postgres-clean neon-pg-clean-ext
$(MAKE) -C compute clean
$(CARGO_CMD_PREFIX) cargo clean
# This removes everything
.PHONY: distclean
distclean:

View File

@@ -1735,29 +1735,11 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools-plan
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
WORKDIR /home/nonroot
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cargo chef prepare --recipe-path recipe.json
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
COPY --from=compute-tools-plan /home/nonroot/recipe.json recipe.json
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
--mount=type=cache,uid=1000,target=/home/nonroot/target \
mold -run cargo chef cook --locked --profile release-line-debug-size-lto --recipe-path recipe.json
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \

View File

@@ -236,7 +236,7 @@ impl Default for NeonStorageControllerConf {
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
timelines_onto_safekeepers: true,
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
}

View File

@@ -17,7 +17,7 @@ anyhow.workspace = true
bytes.workspace = true
byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -4,8 +4,8 @@ use std::ops::Range;
use anyhow::{Result, bail};
use byteorder::{BE, ByteOrder};
use bytes::Bytes;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::{Oid, RepOriginId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use serde::{Deserialize, Serialize};
use utils::const_assert;
@@ -194,7 +194,7 @@ impl Key {
/// will be rejected on the write path.
#[allow(dead_code)]
pub fn is_valid_key_on_write_path_strong(&self) -> bool {
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
if !self.is_i128_representable() {
return false;
}

View File

@@ -1,7 +1,6 @@
use std::ops::Range;
use itertools::Itertools;
use postgres_ffi::BLCKSZ;
use crate::key::Key;
use crate::shard::{ShardCount, ShardIdentity};
@@ -269,9 +268,13 @@ impl KeySpace {
/// Partition a key space into roughly chunks of roughly 'target_size' bytes
/// in each partition.
///
pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
// Assume that each value is 8k in size.
let target_nblocks = (target_size / BLCKSZ as u64) as u32;
pub fn partition(
&self,
shard_identity: &ShardIdentity,
target_size: u64,
block_size: u64,
) -> KeyPartitioning {
let target_nblocks = (target_size / block_size) as u32;
let mut parts = Vec::new();
let mut current_part = Vec::new();

View File

@@ -6,11 +6,9 @@ pub mod key;
pub mod keyspace;
pub mod models;
pub mod pagestream_api;
pub mod record;
pub mod reltag;
pub mod shard;
/// Public API types
pub mod upcall_api;
pub mod value;
pub mod config;

View File

@@ -8,9 +8,15 @@ use crate::reltag::RelTag;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
/// Block size.
///
/// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode
/// that in the protocol, because Postgres supports different block sizes as a compile
/// time option.
const BLCKSZ: usize = 8192;
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
@@ -443,7 +449,7 @@ impl PagestreamBeMessage {
Self::GetSlruSegment(resp) => {
bytes.put_u8(Tag::GetSlruSegment as u8);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
bytes.put(&resp.segment[..]);
}
@@ -520,7 +526,7 @@ impl PagestreamBeMessage {
bytes.put_u64(resp.req.hdr.not_modified_since.0);
bytes.put_u8(resp.req.kind);
bytes.put_u32(resp.req.segno);
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
bytes.put(&resp.segment[..]);
}
@@ -662,7 +668,7 @@ impl PagestreamBeMessage {
let kind = buf.read_u8()?;
let segno = buf.read_u32::<BigEndian>()?;
let n_blocks = buf.read_u32::<BigEndian>()?;
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
let mut segment = vec![0; n_blocks as usize * BLCKSZ];
buf.read_exact(&mut segment)?;
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
req: PagestreamGetSlruSegmentRequest {

View File

@@ -1,9 +1,9 @@
use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::Oid;
use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
use postgres_ffi::relfile_utils::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
use postgres_ffi_types::Oid;
use postgres_ffi_types::constants::GLOBALTABLESPACE_OID;
use postgres_ffi_types::forknum::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
use serde::{Deserialize, Serialize};
///

View File

@@ -35,7 +35,7 @@ use std::hash::{Hash, Hasher};
#[doc(inline)]
pub use ::utils::shard::*;
use postgres_ffi::relfile_utils::INIT_FORKNUM;
use postgres_ffi_types::forknum::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
use crate::key::Key;

View File

@@ -16,6 +16,7 @@ memoffset.workspace = true
pprof.workspace = true
thiserror.workspace = true
serde.workspace = true
postgres_ffi_types.workspace = true
utils.workspace = true
tracing.workspace = true

View File

@@ -11,11 +11,7 @@
use crate::{BLCKSZ, PageHeaderData};
//
// From pg_tablespace_d.h
//
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;
// Note: There are a few more widely-used constants in the postgres_ffi_types::constants crate.
// From storage_xlog.h
pub const XLOG_SMGR_CREATE: u8 = 0x10;

View File

@@ -4,50 +4,7 @@
use once_cell::sync::OnceCell;
use regex::Regex;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
InvalidForkName,
#[error("invalid relation data file name")]
InvalidFileName,
}
impl From<core::num::ParseIntError> for FilePathError {
fn from(_e: core::num::ParseIntError) -> Self {
FilePathError::InvalidFileName
}
}
/// Convert Postgres relation file's fork suffix to fork number.
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}
use postgres_ffi_types::forknum::*;
/// Parse a filename of a relation file. Returns (relfilenode, forknum, segno) tuple.
///
@@ -75,7 +32,9 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
.ok_or(FilePathError::InvalidFileName)?;
let relnode_str = caps.name("relnode").unwrap().as_str();
let relnode = relnode_str.parse::<u32>()?;
let relnode = relnode_str
.parse::<u32>()
.map_err(|_e| FilePathError::InvalidFileName)?;
let forkname = caps.name("forkname").map(|f| f.as_str());
let forknum = forkname_to_number(forkname)?;
@@ -84,7 +43,11 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
let segno = if segno_match.is_none() {
0
} else {
segno_match.unwrap().as_str().parse::<u32>()?
segno_match
.unwrap()
.as_str()
.parse::<u32>()
.map_err(|_e| FilePathError::InvalidFileName)?
};
Ok((relnode, forknum, segno))

View File

@@ -0,0 +1,11 @@
[package]
name = "postgres_ffi_types"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
thiserror.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[dev-dependencies]

View File

@@ -0,0 +1,8 @@
//! Misc constants, copied from PostgreSQL headers.
//!
//! Any constants included here must be the same in all PostgreSQL versions and unlikely to change
//! in the future either!
// From pg_tablespace_d.h
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;

View File

@@ -0,0 +1,36 @@
// Fork numbers, from relpath.h
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
InvalidForkName,
#[error("invalid relation data file name")]
InvalidFileName,
}
/// Convert Postgres relation file's fork suffix to fork number.
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}

View File

@@ -0,0 +1,13 @@
//! This package contains some PostgreSQL constants and datatypes that are the same in all versions
//! of PostgreSQL and unlikely to change in the future either. These could be derived from the
//! PostgreSQL headers with 'bindgen', but in order to avoid proliferating the dependency to bindgen
//! and the PostgreSQL C headers to all services, we prefer to have this small stand-alone crate for
//! them instead.
//!
//! Be mindful in what you add here, as these types are deeply ingrained in the APIs.
pub mod constants;
pub mod forknum;
pub type Oid = u32;
pub type RepOriginId = u16;

View File

@@ -14,6 +14,7 @@ bytes.workspace = true
pageserver_api.workspace = true
prost.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["io-util"] }

View File

@@ -8,8 +8,8 @@ use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::walrecord::*;
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
use utils::lsn::Lsn;
use crate::models::*;

View File

@@ -25,6 +25,9 @@
//! |
//! |--> write to KV store within the pageserver
pub mod record;
pub mod value;
use bytes::Bytes;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::walrecord::{

View File

@@ -10,7 +10,7 @@
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use crate::record::NeonWalRecord;
use crate::models::record::NeonWalRecord;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Value {

View File

@@ -1,4 +1,4 @@
//! This module implements batch type for serialized [`pageserver_api::value::Value`]
//! This module implements batch type for serialized [`crate::models::value::Value`]
//! instances. Each batch contains a raw buffer (serialized values)
//! and a list of metadata for each (key, LSN) tuple present in the batch.
//!
@@ -10,10 +10,8 @@ use std::collections::{BTreeSet, HashMap};
use bytes::{Bytes, BytesMut};
use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::value::Value;
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
use serde::{Deserialize, Serialize};
@@ -21,6 +19,8 @@ use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use crate::models::InterpretedWalRecord;
use crate::models::record::NeonWalRecord;
use crate::models::value::Value;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);

View File

@@ -1,11 +0,0 @@
{
"jwks": [
{
"id": "1",
"role_names": ["authenticated"],
"jwks_url": "https://adapted-gorilla-88.clerk.accounts.dev/.well-known/jwks.json",
"provider_name": "foo",
"jwt_audience": null
}
]
}

View File

@@ -56,6 +56,7 @@ pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true
postgres_ffi_types.workspace = true
postgres_initdb.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true

View File

@@ -13,11 +13,11 @@ use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::SerializedValueBatch;
// A very cheap hash for generating non-sequential keys.

View File

@@ -67,12 +67,12 @@ use once_cell::sync::Lazy;
use pageserver::config::PageServerConf;
use pageserver::walredo::{PostgresRedoManager, RedoAttemptType};
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use utils::id::TenantId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
fn bench(c: &mut Criterion) {
macro_rules! bench_group {

View File

@@ -83,6 +83,7 @@ impl Client {
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_header: Option<String>,
compression: Option<tonic::codec::CompressionEncoding>,
) -> anyhow::Result<Self> {
let endpoint: tonic::transport::Endpoint = into_endpoint
.try_into()
@@ -90,7 +91,15 @@ impl Client {
let channel = endpoint.connect().await?;
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let client = proto::PageServiceClient::with_interceptor(channel, auth);
let mut client = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
// TODO: consider enabling compression by default.
client = client
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { client })
}
@@ -112,7 +121,7 @@ impl Client {
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>>, tonic::Status> {
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>> + 'static, tonic::Status> {
let proto_req = proto::GetBaseBackupRequest::from(req);
let response_stream: Streaming<proto::GetBaseBackupResponseChunk> =

View File

@@ -1,20 +1,29 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::Context;
use anyhow::anyhow;
use futures::TryStreamExt as _;
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
use pageserver_client::page_service::BasebackupRequest;
use pageserver_page_api as page_api;
use rand::prelude::*;
use reqwest::Url;
use tokio::io::AsyncRead;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _};
use tokio_util::io::StreamReader;
use tonic::async_trait;
use tracing::{info, instrument};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
@@ -24,14 +33,15 @@ use crate::util::{request_stats, tokio_thread_local_stats};
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
/// The Pageserver to connect to. Use postgresql:// for libpq, or grpc:// for gRPC.
#[clap(long, default_value = "postgresql://postgres@localhost:64000")]
page_service_connstring: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long, default_value = "1.0")]
gzip_probability: f64,
#[clap(long)]
no_compression: bool,
#[clap(long)]
runtime: Option<humantime::Duration>,
#[clap(long)]
@@ -146,12 +156,23 @@ async fn main_impl(
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let connurl = Url::parse(&args.page_service_connstring)?;
for &tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
let client: Box<dyn Client> = match connurl.scheme() {
"postgresql" | "postgres" => Box::new(
LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
),
"grpc" => Box::new(
GrpcClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
),
scheme => return Err(anyhow!("invalid scheme {scheme}")),
};
tasks.push(tokio::spawn(run_worker(
client,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
@@ -166,13 +187,7 @@ async fn main_impl(
let mut rng = rand::thread_rng();
let target = all_targets.choose(&mut rng).unwrap();
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
(
target.timeline,
Work {
lsn,
gzip: rng.gen_bool(args.gzip_probability),
},
)
(target.timeline, Work { lsn })
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
@@ -216,13 +231,11 @@ async fn main_impl(
#[derive(Copy, Clone)]
struct Work {
lsn: Option<Lsn>,
gzip: bool,
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
async fn run_worker(
mut client: Box<dyn Client>,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<Work>,
all_work_done_barrier: Arc<Barrier>,
@@ -230,37 +243,14 @@ async fn client(
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
while let Some(Work { lsn }) = work.recv().await {
let start = Instant::now();
let copy_out_stream = client
.basebackup(&BasebackupRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
lsn,
gzip,
})
.await
.with_context(|| format!("start basebackup for {timeline}"))
.unwrap();
let stream = client.basebackup(lsn).await.unwrap();
use futures::StreamExt;
let size = Arc::new(AtomicUsize::new(0));
copy_out_stream
.for_each({
|r| {
let size = Arc::clone(&size);
async move {
let size = Arc::clone(&size);
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
}
}
})
.await;
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let size = futures::io::copy(stream.compat(), &mut tokio::io::sink().compat_write())
.await
.unwrap();
info!("basebackup size is {size} bytes");
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
@@ -270,3 +260,94 @@ async fn client(
all_work_done_barrier.wait().await;
}
/// A basebackup client. This allows switching out the client protocol implementation.
#[async_trait]
trait Client: Send {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send>>>;
}
/// A libpq-based Pageserver client.
struct LibpqClient {
inner: pageserver_client::page_service::Client,
ttid: TenantTimelineId,
compression: bool,
}
impl LibpqClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
Ok(Self {
inner: pageserver_client::page_service::Client::new(connstring.to_string()).await?,
ttid,
compression,
})
}
}
#[async_trait]
impl Client for LibpqClient {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
let req = BasebackupRequest {
tenant_id: self.ttid.tenant_id,
timeline_id: self.ttid.timeline_id,
lsn,
gzip: self.compression,
};
let stream = self.inner.basebackup(&req).await?;
Ok(Box::pin(StreamReader::new(
stream.map_err(std::io::Error::other),
)))
}
}
/// A gRPC Pageserver client.
struct GrpcClient {
inner: page_api::Client,
}
impl GrpcClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = page_api::Client::new(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,
ShardIndex::unsharded(),
None,
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
)
.await?;
Ok(Self { inner })
}
}
#[async_trait]
impl Client for GrpcClient {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
let req = page_api::GetBaseBackupRequest {
lsn,
replica: false,
full: false,
};
let stream = self.inner.get_base_backup(req).await?;
Ok(Box::pin(StreamReader::new(
stream.map_err(std::io::Error::other),
)))
}
}

View File

@@ -18,13 +18,12 @@ use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{
DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID, PG_HBA, PGDATA_SPECIAL_FILES,
};
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};
use postgres_ffi::{
BLCKSZ, PG_TLI, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName, dispatch_pgversion, pg_constants,
};
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
use tokio::io;
use tokio::io::AsyncWrite;
use tokio_tar::{Builder, EntryType, Header};
@@ -372,6 +371,7 @@ where
.partition(
self.timeline.get_shard_identity(),
self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
BLCKSZ as u64,
);
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);

View File

@@ -520,7 +520,7 @@ async fn import_file(
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let spcnode = postgres_ffi_types::constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_name.as_ref() {
@@ -553,7 +553,7 @@ async fn import_file(
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let spcnode = postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.nth(1)

View File

@@ -41,7 +41,7 @@ use postgres_backend::{
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
};
use postgres_ffi::BLCKSZ;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
use pq_proto::framed::ConnectionError;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use smallvec::{SmallVec, smallvec};
@@ -3286,7 +3286,14 @@ impl GrpcPageServiceHandler {
Ok(req)
}))
// Run the page service.
.service(proto::PageServiceServer::new(page_service_handler));
.service(
proto::PageServiceServer::new(page_service_handler)
// Support both gzip and zstd compression. The client decides what to use.
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
.send_compressed(tonic::codec::CompressionEncoding::Gzip)
.send_compressed(tonic::codec::CompressionEncoding::Zstd),
);
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
@@ -3532,7 +3539,6 @@ impl proto::PageService for GrpcPageServiceHandler {
Ok(tonic::Response::new(resp.into()))
}
// TODO: ensure clients use gzip compression for the stream.
#[instrument(skip_all, fields(lsn))]
async fn get_base_backup(
&self,

View File

@@ -23,12 +23,11 @@ use pageserver_api::key::{
};
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
use pageserver_api::models::RelSizeMigration;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use pageserver_api::value::Value;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
use postgres_ffi::{BLCKSZ, TimestampTz, TransactionId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
@@ -36,6 +35,8 @@ use tracing::{debug, info, info_span, trace, warn};
use utils::bin_ser::{BeSer, DeserializeError};
use utils::lsn::Lsn;
use utils::pausable_failpoint;
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
@@ -720,6 +721,7 @@ impl Timeline {
let batches = keyspace.partition(
self.get_shard_identity(),
self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
BLCKSZ as u64,
);
let io_concurrency = IoConcurrency::spawn_from_conf(
@@ -960,6 +962,7 @@ impl Timeline {
let batches = keyspace.partition(
self.get_shard_identity(),
self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
BLCKSZ as u64,
);
let io_concurrency = IoConcurrency::spawn_from_conf(

View File

@@ -496,7 +496,7 @@ impl WalRedoManager {
key: pageserver_api::key::Key,
lsn: Lsn,
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>,
records: Vec<(Lsn, wal_decoder::models::record::NeonWalRecord)>,
pg_version: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<bytes::Bytes, walredo::Error> {
@@ -1859,6 +1859,29 @@ impl TenantShard {
}
}
// At this point we've initialized all timelines and are tracking them.
// Now compute the layer visibility for all (not offloaded) timelines.
let compute_visiblity_for = {
let timelines_accessor = self.timelines.lock().unwrap();
let mut timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
timelines_offloaded_accessor.extend(offloaded_timelines_list.into_iter());
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
timelines_accessor.values().cloned().collect::<Vec<_>>()
};
for tl in compute_visiblity_for {
tl.update_layer_visibility().await.with_context(|| {
format!(
"failed initial timeline visibility computation {} for tenant {}",
tl.timeline_id, self.tenant_shard_id
)
})?;
}
// Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
remote_timeline_client
@@ -1878,10 +1901,6 @@ impl TenantShard {
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
{
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
}
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
//
@@ -3449,9 +3468,6 @@ impl TenantShard {
.values()
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, background_jobs_can_start);
@@ -5836,10 +5852,10 @@ pub(crate) mod harness {
use once_cell::sync::OnceCell;
use pageserver_api::key::Key;
use pageserver_api::models::ShardParameters;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::ShardIndex;
use utils::id::TenantId;
use utils::logging;
use wal_decoder::models::record::NeonWalRecord;
use super::*;
use crate::deletion_queue::mock::MockDeletionQueue;
@@ -6094,9 +6110,6 @@ mod tests {
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use pageserver_compaction::helpers::overlaps_with;
#[cfg(feature = "testing")]
use rand::SeedableRng;
@@ -6117,6 +6130,9 @@ mod tests {
use timeline::{CompactOptions, DeltaLayerTestDesc, VersionedKeySpaceQuery};
use utils::id::TenantId;
use utils::shard::{ShardCount, ShardNumber};
#[cfg(feature = "testing")]
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use super::*;
use crate::DEFAULT_PG_VERSION;

View File

@@ -34,11 +34,11 @@ pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use tracing::{Instrument, info_span, trace};
use utils::lsn::Lsn;
use utils::sync::gate::GateGuard;
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use self::inmemory_layer::InMemoryLayerFileId;
use super::PageReconstructError;

View File

@@ -4,11 +4,11 @@ use std::sync::Arc;
use bytes::Bytes;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use wal_decoder::models::value::Value;
use super::errors::PutError;
use super::layer::S3_UPLOAD_LIMIT;

View File

@@ -44,7 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
@@ -54,6 +53,7 @@ use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::errors::PutError;
use super::{
@@ -1306,7 +1306,7 @@ impl DeltaLayerInner {
// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = pageserver_api::value::ValueBytes::will_init(&data)
let will_init = wal_decoder::models::value::ValueBytes::will_init(&data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
@@ -1369,7 +1369,7 @@ impl DeltaLayerInner {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = pageserver_api::record::describe_wal_record(&rec)?;
let wal_desc = wal_decoder::models::record::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
@@ -1624,7 +1624,6 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
@@ -1988,7 +1987,7 @@ pub(crate) mod test {
#[tokio::test]
async fn copy_delta_prefix_smoke() {
use bytes::Bytes;
use pageserver_api::record::NeonWalRecord;
use wal_decoder::models::record::NeonWalRecord;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
.await

View File

@@ -4,8 +4,8 @@ use std::sync::Arc;
use anyhow::bail;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, SparseKeySpace};
use pageserver_api::value::Value;
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::PersistentLayerKey;
use super::merge_iterator::{MergeIterator, MergeIteratorItem};
@@ -126,7 +126,6 @@ mod tests {
#[tokio::test]
async fn filter_keyspace_iterator() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator")
.await

View File

@@ -42,7 +42,6 @@ use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
@@ -52,6 +51,7 @@ use utils::bin_ser::BeSer;
use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::errors::PutError;
use super::layer_name::ImageLayerName;
@@ -1232,10 +1232,10 @@ mod test {
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use pageserver_api::value::Value;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::{ImageLayerIterator, ImageLayerWriter};
use crate::DEFAULT_PG_VERSION;

View File

@@ -824,7 +824,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
#[tokio::test(start_paused = true)]
async fn eviction_cancellation_on_drop() {
use bytes::Bytes;
use pageserver_api::value::Value;
use wal_decoder::models::value::Value;
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();

View File

@@ -4,8 +4,8 @@ use std::sync::Arc;
use anyhow::bail;
use pageserver_api::key::Key;
use pageserver_api::value::Value;
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
use super::image_layer::{ImageLayerInner, ImageLayerIterator};
@@ -402,9 +402,9 @@ impl<'a> MergeIterator<'a> {
mod tests {
use itertools::Itertools;
use pageserver_api::key::Key;
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
use utils::lsn::Lsn;
#[cfg(feature = "testing")]
use wal_decoder::models::record::NeonWalRecord;
use super::*;
use crate::DEFAULT_PG_VERSION;
@@ -436,7 +436,6 @@ mod tests {
#[tokio::test]
async fn merge_in_between() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_merge_in_between")
.await
@@ -501,7 +500,6 @@ mod tests {
#[tokio::test]
async fn delta_merge() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_delta_merge")
.await
@@ -578,7 +576,6 @@ mod tests {
#[tokio::test]
async fn delta_image_mixed_merge() {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
.await

View File

@@ -56,8 +56,6 @@ use pageserver_api::models::{
};
use pageserver_api::reltag::{BlockNumber, RelTag};
use pageserver_api::shard::{ShardIdentity, ShardIndex, ShardNumber, TenantShardId};
#[cfg(test)]
use pageserver_api::value::Value;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::v14::xlog_utils;
use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
@@ -81,6 +79,8 @@ use utils::seqwait::SeqWait;
use utils::simple_rcu::{Rcu, RcuReadGuard};
use utils::sync::gate::{Gate, GateGuard};
use utils::{completion, critical, fs_ext, pausable_failpoint};
#[cfg(test)]
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use self::delete::DeleteTimelineFlow;
@@ -3422,10 +3422,6 @@ impl Timeline {
// TenantShard::create_timeline will wait for these uploads to happen before returning, or
// on retry.
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
self.update_layer_visibility().await?;
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
@@ -5211,7 +5207,11 @@ impl Timeline {
}
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
let dense_partitioning = dense_ks.partition(
&self.shard_identity,
partition_size,
postgres_ffi::BLCKSZ as u64,
);
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
}; // no partitioning for metadata keys for now
@@ -5939,7 +5939,7 @@ impl Drop for Timeline {
if let Ok(mut gc_info) = ancestor.gc_info.write() {
if !gc_info.remove_child_not_offloaded(self.timeline_id) {
tracing::error!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id,
"Couldn't remove retain_lsn entry from offloaded timeline's parent: already removed");
"Couldn't remove retain_lsn entry from timeline's parent on drop: already removed");
}
}
}
@@ -7594,11 +7594,11 @@ mod tests {
use std::sync::Arc;
use pageserver_api::key::Key;
use pageserver_api::value::Value;
use std::iter::Iterator;
use tracing::Instrument;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use wal_decoder::models::value::Value;
use super::HeatMapTimeline;
use crate::context::RequestContextBuilder;

View File

@@ -29,9 +29,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
use pageserver_api::key::{KEY_SIZE, Key};
use pageserver_api::keyspace::{KeySpace, ShardedRange};
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
use pageserver_compaction::interface::*;
use serde::Serialize;
@@ -41,6 +39,8 @@ use tracing::{Instrument, debug, error, info, info_span, trace, warn};
use utils::critical;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::value::Value;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;

View File

@@ -36,8 +36,8 @@ use pageserver_api::keyspace::{ShardedRange, singleton_range};
use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::BLCKSZ;
use postgres_ffi::relfile_utils::parse_relfilename;
use postgres_ffi::{BLCKSZ, pg_constants};
use remote_storage::RemotePath;
use tokio::sync::Semaphore;
use tokio_stream::StreamExt;
@@ -558,7 +558,7 @@ impl PgDataDir {
PgDataDirDb::new(
storage,
&basedir.join(dboid.to_string()),
pg_constants::DEFAULTTABLESPACE_OID,
postgres_ffi_types::constants::DEFAULTTABLESPACE_OID,
dboid,
&datadir_path,
)
@@ -571,7 +571,7 @@ impl PgDataDir {
PgDataDirDb::new(
storage,
&datadir_path.join("global"),
postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
postgres_ffi_types::constants::GLOBALTABLESPACE_OID,
0,
&datadir_path,
)

View File

@@ -28,20 +28,20 @@ use std::time::{Duration, Instant, SystemTime};
use bytes::{Buf, Bytes};
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::walrecord::*;
use postgres_ffi::{
TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
fsm_logical_to_physical, pg_constants,
};
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use tracing::*;
use utils::bin_ser::{DeserializeError, SerializeError};
use utils::lsn::Lsn;
use utils::rate_limit::RateLimit;
use utils::{critical, failpoint_support};
use wal_decoder::models::record::NeonWalRecord;
use wal_decoder::models::*;
use crate::ZERO_PAGE;

View File

@@ -32,12 +32,12 @@ use anyhow::Context;
use bytes::{Bytes, BytesMut};
use pageserver_api::key::Key;
use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
use tracing::*;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;
use wal_decoder::models::record::NeonWalRecord;
use crate::config::PageServerConf;
use crate::metrics::{
@@ -571,11 +571,11 @@ mod tests {
use bytes::Bytes;
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::shard::TenantShardId;
use tracing::Instrument;
use utils::id::TenantId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
use super::PostgresRedoManager;
use crate::config::PageServerConf;

View File

@@ -2,16 +2,16 @@ use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use pageserver_api::key::Key;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::v14::nonrelfile_utils::{
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
transaction_id_set_status,
};
use postgres_ffi::{BLCKSZ, pg_constants};
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
use tracing::*;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
/// Can this request be served by neon redo functions
/// or we need to pass it to wal-redo postgres process?

View File

@@ -10,7 +10,6 @@ use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
@@ -18,6 +17,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{Instrument, debug, error, instrument};
use utils::lsn::Lsn;
use utils::poison::Poison;
use wal_decoder::models::record::NeonWalRecord;
use self::no_leak_child::NoLeakChild;
use crate::config::PageServerConf;

View File

@@ -514,7 +514,7 @@ _PG_init(void)
"Maximal size of pgstat.stat file saved in Neon storage",
"Zero value disables persisting pgstat.stat file",
&neon_pgstat_file_size_limit,
0, 0, 1000000, /* disabled by default */
16384, 0, 1000000, /* disabled by default */
PGC_SIGHUP,
GUC_UNIT_KB,
NULL, NULL, NULL);

27
poetry.lock generated
View File

@@ -746,23 +746,23 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"]
[[package]]
name = "botocore"
version = "1.34.11"
version = "1.34.162"
description = "Low-level, data-driven core of boto 3."
optional = false
python-versions = ">= 3.8"
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "botocore-1.34.11-py3-none-any.whl", hash = "sha256:1ff1398b6ea670e1c01ac67a33af3da854f8e700d3528289c04f319c330d8250"},
{file = "botocore-1.34.11.tar.gz", hash = "sha256:51905c3d623c60df5dc5794387de7caf886d350180a01a3dfa762e903edb45a9"},
{file = "botocore-1.34.162-py3-none-any.whl", hash = "sha256:2d918b02db88d27a75b48275e6fb2506e9adaaddbec1ffa6a8a0898b34e769be"},
{file = "botocore-1.34.162.tar.gz", hash = "sha256:adc23be4fb99ad31961236342b7cbf3c0bfc62532cd02852196032e8c0d682f3"},
]
[package.dependencies]
jmespath = ">=0.7.1,<2.0.0"
python-dateutil = ">=2.1,<3.0.0"
urllib3 = {version = ">=1.25.4,<2.1", markers = "python_version >= \"3.10\""}
urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""}
[package.extras]
crt = ["awscrt (==0.19.19)"]
crt = ["awscrt (==0.21.2)"]
[[package]]
name = "botocore-stubs"
@@ -3422,20 +3422,21 @@ files = [
[[package]]
name = "urllib3"
version = "1.26.19"
version = "2.5.0"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "urllib3-1.26.19-py2.py3-none-any.whl", hash = "sha256:37a0344459b199fce0e80b0d3569837ec6b6937435c5244e7fd73fa6006830f3"},
{file = "urllib3-1.26.19.tar.gz", hash = "sha256:3e3d753a8618b86d7de333b4223005f68720bcd6a7d2bcb9fbd2229ec7c1e429"},
{file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"},
{file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"},
]
[package.extras]
brotli = ["brotli (==1.0.9) ; os_name != \"nt\" and python_version < \"3\" and platform_python_implementation == \"CPython\"", "brotli (>=1.0.9) ; python_version >= \"3\" and platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; (os_name != \"nt\" or python_version >= \"3\") and platform_python_implementation != \"CPython\"", "brotlipy (>=0.6.0) ; os_name == \"nt\" and python_version < \"3\""]
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress ; python_version == \"2.7\"", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"]
socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""]
h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "websockets"

View File

@@ -138,29 +138,3 @@ Now from client you can start a new session:
```sh
PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.local.neon.build:4432/postgres?sslmode=verify-full"
```
## auth broker hacky setup:
```sh
docker run \
--detach \
--name proxy-postgres \
--env POSTGRES_HOST_AUTH_METHOD=trust \
--env POSTGRES_USER=authenticated \
--env POSTGRES_DB=database \
--publish 5432:5432 \
postgres:17-bookworm
```
```sh
cargo run --bin proxy -- --is-auth-broker true -c server.crt -k server.key --wss 0.0.0.0:8080 --http 0.0.0.0:7002 --auth-backend cplane-v1
```
```sh
cargo run --bin local_proxy -- --http 0.0.0.0:7432
```
```sh
export NEON_JWT="..."
curl -k "https://127.0.0.1:8080/sql" -H "Authorization: Bearer $NEON_JWT" -H "neon-connection-string: postgresql://authenticated@foo.local.neon.build/database" -d '{"query":"select 1","params":[]}'
```

View File

@@ -1,6 +1,6 @@
//! Production console backend.
use std::net::{IpAddr, Ipv4Addr};
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -17,21 +17,20 @@ use tracing::{Instrument, debug, info, info_span, warn};
use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::compute::ConnectInfo;
use crate::context::RequestContext;
use crate::control_plane::caches::ApiCaches;
use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo, Reason};
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl,
};
use crate::metrics::Metrics;
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{BranchId, EndpointCacheKey, EndpointId, ProjectId, RoleName};
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{compute, http, scram};
pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
@@ -389,17 +388,6 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
ctx: &RequestContext,
endpoint: &EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
if true {
return Ok(vec![AuthRule {
id: "1".into(),
jwks_url: "https://adapted-gorilla-88.clerk.accounts.dev/.well-known/jwks.json"
.parse()
.expect("url is valid"),
audience: None,
role_names: vec![(&RoleName::from("authenticated")).into()],
}]);
}
self.do_get_endpoint_jwks(ctx, endpoint).await
}
@@ -409,24 +397,6 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
if true {
return Ok(CachedNodeInfo::new_uncached(NodeInfo {
conn_info: ConnectInfo {
host_addr: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
host: "localhost".into(),
port: 7432,
ssl_mode: SslMode::Disable,
},
aux: MetricsAuxInfo {
endpoint_id: EndpointId::from("foo").into(),
project_id: ProjectId::from("foo").into(),
branch_id: BranchId::from("foo").into(),
compute_id: "foo".into(),
cold_start_info: ColdStartInfo::Warm,
},
}));
}
let key = user_info.endpoint_cache_key();
macro_rules! check_cache {

View File

@@ -278,24 +278,24 @@ impl PoolingBackend {
// check again for race
if !self.local_pool.initialized(&conn_info) {
// local_backend
// .compute_ctl
// .install_extension(&ExtensionInstallRequest {
// extension: EXT_NAME,
// database: conn_info.dbname.clone(),
// version: EXT_VERSION,
// })
// .await?;
local_backend
.compute_ctl
.install_extension(&ExtensionInstallRequest {
extension: EXT_NAME,
database: conn_info.dbname.clone(),
version: EXT_VERSION,
})
.await?;
// local_backend
// .compute_ctl
// .grant_role(&SetRoleGrantsRequest {
// schema: EXT_SCHEMA,
// privileges: vec![Privilege::Usage],
// database: conn_info.dbname.clone(),
// role: conn_info.user_info.user.clone(),
// })
// .await?;
local_backend
.compute_ctl
.grant_role(&SetRoleGrantsRequest {
schema: EXT_SCHEMA,
privileges: vec![Privilege::Usage],
database: conn_info.dbname.clone(),
role: conn_info.user_info.user.clone(),
})
.await?;
self.local_pool.set_initialized(&conn_info);
}
@@ -313,14 +313,14 @@ impl PoolingBackend {
.to_postgres_client_config();
config
.user(&conn_info.user_info.user)
.dbname(&conn_info.dbname);
// .set_param(
// "options",
// &format!(
// "-c pg_session_jwt.jwk={}",
// serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
// ),
// );
.dbname(&conn_info.dbname)
.set_param(
"options",
&format!(
"-c pg_session_jwt.jwk={}",
serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
),
);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = config.connect(&postgres_client::NoTls).await?;
@@ -344,11 +344,11 @@ impl PoolingBackend {
let (client, mut discard) = handle.inner();
debug!("setting up backend session state");
// // initiates the auth session
// if let Err(e) = client.batch_execute("select auth.init();").await {
// discard.discard();
// return Err(e.into());
// }
// initiates the auth session
if let Err(e) = client.batch_execute("select auth.init();").await {
discard.discard();
return Err(e.into());
}
info!("backend session state initialized");
}

View File

@@ -280,20 +280,20 @@ impl ClientInnerCommon<postgres_client::Client> {
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), SqlOverHttpError> {
if let ClientDataEnum::Local(local_data) = &mut self.data {
local_data.jti += 1;
// let token = resign_jwt(&local_data.key, payload, local_data.jti)?;
let token = resign_jwt(&local_data.key, payload, local_data.jti)?;
self.inner
.discard_all()
.await
.map_err(SqlOverHttpError::InternalPostgres)?;
// // initiates the auth session
// // this is safe from query injections as the jwt format free of any escape characters.
// let query = format!("select auth.jwt_session_init('{token}')");
// self.inner
// .batch_execute(&query)
// .await
// .map_err(SqlOverHttpError::InternalPostgres)?;
// initiates the auth session
// this is safe from query injections as the jwt format free of any escape characters.
let query = format!("select auth.jwt_session_init('{token}')");
self.inner
.batch_execute(&query)
.await
.map_err(SqlOverHttpError::InternalPostgres)?;
let pid = self.inner.get_process_id();
info!(pid, jti = local_data.jti, "user session state init");

View File

@@ -489,7 +489,9 @@ class NeonEnvBuilder:
self.config_init_force: str | None = None
self.top_output_dir = top_output_dir
self.control_plane_hooks_api: str | None = None
self.storage_controller_config: dict[Any, Any] | None = None
self.storage_controller_config: dict[Any, Any] | None = {
"timelines_onto_safekeepers": True,
}
# Flag to enable https listener in pageserver, generate local ssl certs,
# and force storage controller to use https for pageserver api.
@@ -4909,6 +4911,9 @@ class Safekeeper(LogUtils):
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
return res
def safekeeper_id(self) -> SafekeeperId:
return SafekeeperId(self.id, "localhost", self.port.pg_tenant_only)
@property
def data_dir(self) -> Path:
return self.env.repo_dir / "safekeepers" / f"sk{self.id}"
@@ -5801,6 +5806,7 @@ def generate_uploads_and_deletions(
data: str | None = None,
pageserver: NeonPageserver,
wait_until_uploaded: bool = True,
config_lines: list[str] | None = None,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
@@ -5818,7 +5824,7 @@ def generate_uploads_and_deletions(
ps_http = pageserver.http_client()
with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
"main", tenant_id=tenant_id, pageserver_id=pageserver.id, config_lines=config_lines
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")

View File

@@ -11,6 +11,7 @@ from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.safekeeper.http import MembershipConfiguration, TimelineCreateRequest
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
@@ -164,6 +165,19 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
env.pageserver.tenant_create(env.initial_tenant)
sk = env.safekeepers[0]
assert sk
sk.http_client().timeline_create(
TimelineCreateRequest(
env.initial_tenant,
env.initial_timeline,
MembershipConfiguration(generation=1, members=[sk.safekeeper_id()], new_members=None),
int(env.pg_version),
Lsn(0),
None,
)
)
initial_branch = "initial_branch"
def start_creating_timeline():

View File

@@ -64,6 +64,11 @@ def test_normal_work(
"""
neon_env_builder.num_safekeepers = num_safekeepers
if safekeeper_proto_version == 2:
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -671,6 +671,12 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu
"""
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# On the new mode, the test runs into a cancellation issue, i.e. the walproposer can't shut down
# as it is hang-waiting on the timeline_checkpoint call in WalIngest::new.
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
# turn off background tasks so that they don't interfere with the downloads
env = neon_env_builder.init_start(
initial_tenant_conf={

View File

@@ -226,7 +226,14 @@ def test_deletion_queue_recovery(
ps_http.configure_failpoints(failpoints)
generate_uploads_and_deletions(env, pageserver=main_pageserver)
# As the compute will write a pg_stat aux file record when shutting down and we check the delta layers
# generated in this test case, we need to disable it to avoid the test case from failing + make things
# more deterministic.
disable_pg_stat_persistence_config_line = ["neon.pgstat_file_size_limit = 0"]
generate_uploads_and_deletions(
env, pageserver=main_pageserver, config_lines=disable_pg_stat_persistence_config_line
)
# There should be entries in the deletion queue
assert_deletion_queue(ps_http, lambda n: n > 0)

View File

@@ -88,6 +88,12 @@ def test_storage_controller_smoke(
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
# These bubble up from safekeepers
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"]
)
# Start services by hand so that we can skip a pageserver (this will start + register later)
env.broker.start()
env.storage_controller.start()
@@ -3455,7 +3461,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert target.get_safekeeper(fake_id) is None
assert len(target.get_safekeepers()) == 0
start_sks = target.get_safekeepers()
sk_0 = env.safekeepers[0]
@@ -3477,7 +3483,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
inserted = target.get_safekeeper(fake_id)
assert inserted is not None
assert target.get_safekeepers() == [inserted]
assert target.get_safekeepers() == start_sks + [inserted]
assert eq_safekeeper_records(body, inserted)
# error out if pk is changed (unexpected)
@@ -3489,7 +3495,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert exc.value.status_code == 400
inserted_again = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_again]
assert target.get_safekeepers() == start_sks + [inserted_again]
assert inserted_again is not None
assert eq_safekeeper_records(inserted, inserted_again)
@@ -3498,7 +3504,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
body["version"] += 1
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert target.get_safekeepers() == start_sks + [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
@@ -3507,7 +3513,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
body["https_port"] = 123
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert target.get_safekeepers() == start_sks + [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
env.storage_controller.consistency_check()
@@ -3516,7 +3522,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
body["https_port"] = None
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert target.get_safekeepers() == start_sks + [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
env.storage_controller.consistency_check()
@@ -3635,6 +3641,9 @@ def test_timeline_delete_mid_live_migration(neon_env_builder: NeonEnvBuilder, mi
env = neon_env_builder.init_configs()
env.start()
for ps in env.pageservers:
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.storage_controller.tenant_create(tenant_id, placement_policy={"Attached": 1})

View File

@@ -341,6 +341,11 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
env = neon_env_builder.init_configs()
env.start()
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Timeline.* has been deleted.*", ".*Timeline.*was cancelled and cannot be used"]
)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(

View File

@@ -21,7 +21,10 @@ from fixtures.neon_fixtures import (
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException
from fixtures.pageserver.http import (
HistoricLayerInfo,
PageserverApiException,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import assert_pageserver_backups_equal, skip_in_debug_build, wait_until
@@ -90,7 +93,14 @@ def test_ancestor_detach_branched_from(
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
# As the compute will write a pg_stat aux file record when shutting down and we check the delta layers
# generated in this test case, we need to disable it to avoid the test case from failing + make things
# more deterministic.
disable_pg_stat_persistence_config_line = ["neon.pgstat_file_size_limit = 0"]
with env.endpoints.create_start(
"main", tenant_id=env.initial_tenant, config_lines=disable_pg_stat_persistence_config_line
) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
after_first_tx = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
@@ -148,7 +158,9 @@ def test_ancestor_detach_branched_from(
assert branch_at == recorded, "the test should not use unaligned lsns"
if write_to_branch_first:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
with env.endpoints.create_start(
name, tenant_id=env.initial_tenant, config_lines=disable_pg_stat_persistence_config_line
) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
# make sure the ep is writable
# with BEFORE_L0, AFTER_L0 there will be a gap in Lsns caused by accurate end_lsn on straddling layers
@@ -177,10 +189,14 @@ def test_ancestor_detach_branched_from(
env.pageserver.stop()
env.pageserver.start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
with env.endpoints.create_start(
"main", tenant_id=env.initial_tenant, config_lines=disable_pg_stat_persistence_config_line
) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == 16384
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
with env.endpoints.create_start(
name, tenant_id=env.initial_tenant, config_lines=disable_pg_stat_persistence_config_line
) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
old_main_info = client.layer_map_info(env.initial_tenant, env.initial_timeline)
@@ -413,6 +429,7 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots
"read_only": True,
},
)
sk = env.safekeepers[0]
assert sk
with pytest.raises(requests.exceptions.HTTPError, match="Not Found"):
@@ -504,8 +521,15 @@ def test_ancestor_detach_behavior_v2(neon_env_builder: NeonEnvBuilder, snapshots
assert len(lineage.get("original_ancestor", [])) == 0
assert len(lineage.get("reparenting_history", [])) == 0
for name, _, _, rows, starts in expected_result:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
for branch_name, queried_timeline, _, rows, starts in expected_result:
details = client.timeline_detail(env.initial_tenant, queried_timeline)
log.info(f"reading data from branch {branch_name}")
# specifying the lsn makes the endpoint read-only and not connect to safekeepers
with env.endpoints.create(
branch_name,
lsn=Lsn(details["last_record_lsn"]),
) as ep:
ep.start(safekeeper_generation=1)
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1
@@ -1088,6 +1112,7 @@ def test_timeline_detach_ancestor_interrupted_by_deletion(
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
pageservers = dict((int(p.id), p) for p in env.pageservers)
@@ -1209,6 +1234,7 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
pageservers = dict((int(p.id), p) for p in env.pageservers)

View File

@@ -24,6 +24,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"},
initial_tenant_shard_count=2 if sharded else None,
)
for ps in env.pageservers:
ps.allowed_errors.append(".*Timeline.* has been deleted.*")
if sharded:
http = env.storage_controller.pageserver_api()

View File

@@ -229,7 +229,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder):
# Test timeline_list endpoint.
http_cli = env.safekeepers[0].http_client()
assert len(http_cli.timeline_list()) == 3
assert len(http_cli.timeline_list()) == 4
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
@@ -740,8 +740,8 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_timeline_status")
endpoint = env.endpoints.create_start("test_timeline_status")
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
wa = env.safekeepers[0]
@@ -1292,6 +1292,12 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder):
# it works without compute at all.
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
# timelines should be created the old way
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
@@ -1532,6 +1538,11 @@ def test_safekeeper_without_pageserver(
def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
# timelines should be created the old way manually until we have migration support
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
@@ -1661,6 +1672,15 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
res = env.safekeepers[3].pull_timeline(
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
)
sk_id_1 = env.safekeepers[0].safekeeper_id()
sk_id_3 = env.safekeepers[2].safekeeper_id()
sk_id_4 = env.safekeepers[3].safekeeper_id()
new_conf = MembershipConfiguration(
generation=2, members=[sk_id_1, sk_id_3, sk_id_4], new_members=None
)
for i in [0, 2, 3]:
env.safekeepers[i].http_client().membership_switch(tenant_id, timeline_id, new_conf)
log.info("Finished pulling timeline")
log.info(res)
@@ -1705,13 +1725,15 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
dst_sk.stop()
[tenant_id, timeline_id] = env.create_tenant()
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint = env.endpoints.create("main", tenant_id=tenant_id)
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
@@ -1723,6 +1745,7 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
src_http = src_sk.http_client()
# run pull_timeline which will halt before downloading files
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
dst_sk.start()
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
@@ -1782,23 +1805,27 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
dst_sk.stop()
src_http = src_sk.http_client()
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
timeline_id = env.create_branch("pull_timeline_term_changes")
# run pull_timeline which will halt before downloading files
log.info("use only first 2 safekeepers, 3rd will be seeded")
ep = env.endpoints.create("main")
ep = env.endpoints.create("pull_timeline_term_changes")
ep.active_safekeepers = [1, 2]
ep.start()
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
src_http = src_sk.http_client()
# run pull_timeline which will halt before downloading files
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause"))
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
dst_sk.start()
pt_handle.start()
src_sk.wait_until_paused("sk-snapshot-after-list-pausable")
@@ -1807,7 +1834,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
# restart compute to bump term
ep.stop()
ep = env.endpoints.create("main")
ep = env.endpoints.create("pull_timeline_term_changes")
ep.active_safekeepers = [1, 2]
ep.start()
ep.safe_psql("insert into t select generate_series(1, 100), 'pear'")
@@ -1929,6 +1956,11 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder):
@run_only_on_default_postgres("tests only safekeeper API")
def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
# timelines should be created the old way
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
# These are expected after timeline deletion on safekeepers.
@@ -2009,6 +2041,12 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
created manually, later storcon will do that.
"""
neon_env_builder.num_safekeepers = 3
# timelines should be created the old way manually
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
@@ -2064,7 +2102,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_idle_reconnections")
timeline_id = env.initial_timeline
def collect_stats() -> dict[str, float]:
# we need to collect safekeeper_pg_queries_received_total metric from all safekeepers
@@ -2095,7 +2133,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
collect_stats()
endpoint = env.endpoints.create_start("test_idle_reconnections")
endpoint = env.endpoints.create_start("main")
# just write something to the timeline
endpoint.safe_psql("create table t(i int)")
collect_stats()

View File

@@ -590,6 +590,13 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int):
neon_env_builder.num_safekeepers = 3
if safekeeper_proto_version == 2:
# On the legacy protocol, we don't support generations, which are part of
# `timelines_onto_safekeepers`
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
@@ -713,6 +720,11 @@ async def run_quorum_sanity(env: NeonEnv):
# we don't.
def test_quorum_sanity(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
# The test fails basically always on the new mode.
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
env = neon_env_builder.init_start()
asyncio.run(run_quorum_sanity(env))

View File

@@ -16,6 +16,13 @@ if TYPE_CHECKING:
# Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout.
# Ensures that walreceiver does not run without any data inserted and only starts after the insertion.
def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
# we assert below that the walreceiver is not active before data writes.
# with manually created timelines, it is active.
# FIXME: remove this test once we remove timelines_onto_safekeepers
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
# Trigger WAL wait timeout faster
neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'"
env = neon_env_builder.init_start()