Compare commits

...

12 Commits

Author SHA1 Message Date
John Spray
ff6143975a lint 2025-07-02 13:44:13 +01:00
Haoyu Huang
fbbf578c6b SK: re-elect leader when backup lag is high (#781)
We observe that the offloader fails to upload a segment due to race
conditions on XLOG SWITCH and PG start streaming WALs. wal_backup task
continously failing to upload a full segment while the segment remains
partial on the disk.

The consequence is that commit_lsn for all SKs move forward but
backup_lsn stays the same. Then, all SKs run out of disk space.

See go/sk-ood-xlog-switch for more details.

To mitigate this issue, we will re-elect a new offloader if the current
offloader is lagging behind too much.
Each SK makes the decision locally but they are aware of each other's
commit and backup lsns.

The new algorithm is
- determine_offloader will pick a SK. say SK-1.
- Each SK checks
-- if commit_lsn - back_lsn > threshold,
-- -- remove SK-1 from the candidate and call determine_offloader again.

SK-1 will step down and all SKs will elect the same leader again.
After the backup is caught up, the leader will become SK-1 again.

This also helps when SK-1 is slow to backup.

I'll set the reelect backup lag to 4 GB later. Setting to 128 MB in dev
to trigger the code more frequently.

DEV.

(cherry picked from commit 7286f79f9536380d321e2442318bd8a631269499)
2025-07-02 08:32:45 +01:00
Alex Chi Z.
6c77638ea1 feat(storcon): retrieve feature flag and pass to pageservers (#12324)
## Problem

part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

It costs $$$ to directly retrieve the feature flags from the pageserver.
Therefore, this patch adds new APIs to retrieve the spec from the
storcon and updates it via pageserver.

* Storcon retrieves the feature flag and send it to the pageservers.
* If the feature flag gets updated outside of the normal refresh loop of
the pageserver, pageserver won't fetch the flags on its own as long as
the last updated time <= refresh_period.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-25 14:58:18 +00:00
Conrad Ludgate
517a3d0d86 [proxy]: BatchQueue::call is not cancel safe - make it directly cancellation aware (#12345)
## Problem

https://github.com/neondatabase/cloud/issues/30539

If the current leader cancels the `call` function, then it has removed
the jobs from the queue, but will never finish sending the responses.
Because of this, it is not cancellation safe.

## Summary of changes

Document these functions as not cancellation safe. Move cancellation of
the queued jobs into the queue itself.

## Alternatives considered

1. We could spawn the task that runs the batch, since that won't get
cancelled.
* This requires `fn call(self: Arc<Self>)` or `fn call(&'static self)`.
2. We could add another scopeguard and return the requests back to the
queue.
* This requires that requests are always retry safe, and also requires
requests to be `Clone`.
2025-06-25 14:19:20 +00:00
Conrad Ludgate
27ca1e21be [console_redirect_proxy]: fix channel binding (#12238)
## Problem

While working more on TLS to compute, I realised that Console Redirect
-> pg-sni-router -> compute would break if channel binding was set to
prefer. This is because the channel binding data would differ between
Console Redirect -> pg-sni-router vs pg-sni-router -> compute.

I also noticed that I actually disabled channel binding in #12145, since
`connect_raw` would think that the connection didn't support TLS.

## Summary of changes

Make sure we specify the channel binding.
Make sure that `connect_raw` can see if we have TLS support.
2025-06-25 13:41:30 +00:00
Arpad Müller
1dc01c9bed Support cancellations of timelines with hanging ondemand downloads (#12330)
In `test_layer_download_cancelled_by_config_location`, we simulate hung
downloads via the `before-downloading-layer-stream-pausable` failpoint.
Then, we cancel a timeline via the `location_config` endpoint.

With the new default as of
https://github.com/neondatabase/neon/pull/11712, we would be creating
the timeline on safekeepers regardless if there have been writes or not,
and it turns out the test relied on the timeline not existing on
safekeepers, due to a cancellation bug:

* as established before, the test makes the read path hang
* the timeline cancellation function first cancels the walreceiver, and
only then cancels the timeline's token
* `WalIngest::new` is requesting a checkpoint, which hits the read path
* at cancellation time, we'd be hanging inside the read, not seeing the
cancellation of the walreceiver
* the test would time out due to the hang

This is probably also reproducible in the wild when there is S3
unavailabilies or bottlenecks. So we thought that it's worthwhile to fix
the hang issue. The approach chosen in the end involves the
`tokio::select` macro.

In PR 11712, we originally punted on the test due to the hang and opted
it out from the new default, but now we can use the new default.

Part of https://github.com/neondatabase/neon/issues/12299
2025-06-25 13:40:38 +00:00
Heikki Linnakangas
7c4c36f5ac Remove unnecessary separate installation of libpq (#12287)
`make install` compiles and installs libpq. Remove redundant separate
step to compile and install it.
2025-06-25 10:47:56 +00:00
Tristan Partin
a2d623696c Update pgaudit to latest versions (#12328)
These updates contain some bug fixes and are completely backwards
compatible with what we currently support in Neon.

Link: https://github.com/pgaudit/pgaudit/compare/1.6.2...1.6.3
Link: https://github.com/pgaudit/pgaudit/compare/1.7.0...1.7.1
Link: https://github.com/pgaudit/pgaudit/compare/16.0...16.1
Link: https://github.com/pgaudit/pgaudit/compare/17.0...17.1
Signed-off-by: Tristan Partin <tristan.partin@databricks.com>

Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
2025-06-25 09:03:02 +00:00
Tristan Partin
aa75722010 Set pgaudit.log=none for monitoring connections (#12137)
pgaudit can spam logs due to all the monitoring that we do. Logs from
these connections are not necessary for HIPPA compliance, so we can stop
logging from those connections.

Part-of: https://github.com/neondatabase/cloud/issues/29574

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-24 17:42:23 +00:00
Matthias van de Meent
6c6de6382a Use enum-typed PG versions (#12317)
This makes it possible for the compiler to validate that a match block
matched all PostgreSQL versions we support.

## Problem
We did not have a complete picture about which places we had to test
against PG versions, and what format these versions were: The full PG
version ID format (Major/minor/bugfix `MMmmbb`) as transfered in
protocol messages, or only the Major release version (`MM`). This meant
type confusion was rampant.

With this change, it becomes easier to develop new version-dependent
features, by making type and niche confusion impossible.

## Summary of changes
Every use of `pg_version` is now typed as either `PgVersionId` (u32,
valued in decimal `MMmmbb`) or PgMajorVersion (an enum, with a value for
every major version we support, serialized and stored like a u32 with
the value of that major version)

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-06-24 17:25:31 +00:00
Dmitry Savelev
158d84ea30 Switch the billing metrics storage format to ndjson. (#12338)
## Problem

The billing team wants to change the billing events pipeline and use a
common events format in S3 buckets across different event producers.

## Summary of changes

Change the events storage format for billing events from JSON to NDJSON.

Resolves: https://github.com/neondatabase/cloud/issues/29994
2025-06-24 15:36:36 +00:00
Conrad Ludgate
4dd9ca7b04 [proxy]: authenticate to compute after connect_to_compute (#12335)
## Problem

PGLB will do the connect_to_compute logic, neonkeeper will do the
session establishment logic. We should split it.

## Summary of changes

Moves postgres authentication to compute to a separate routine that
happens after connect_to_compute.
2025-06-24 14:15:36 +00:00
100 changed files with 1424 additions and 649 deletions

31
Cargo.lock generated
View File

@@ -1318,6 +1318,7 @@ dependencies = [
"p256 0.13.2",
"postgres",
"postgres_initdb",
"postgres_versioninfo",
"regex",
"remote_storage",
"reqwest",
@@ -4406,6 +4407,7 @@ dependencies = [
"once_cell",
"postgres_backend",
"postgres_ffi_types",
"postgres_versioninfo",
"rand 0.8.5",
"remote_storage",
"reqwest",
@@ -4429,6 +4431,7 @@ dependencies = [
"futures",
"http-utils",
"pageserver_api",
"postgres_versioninfo",
"reqwest",
"serde",
"thiserror 1.0.69",
@@ -4897,6 +4900,7 @@ dependencies = [
"once_cell",
"postgres",
"postgres_ffi_types",
"postgres_versioninfo",
"pprof",
"regex",
"serde",
@@ -4919,11 +4923,23 @@ version = "0.1.0"
dependencies = [
"anyhow",
"camino",
"postgres_versioninfo",
"thiserror 1.0.69",
"tokio",
"workspace_hack",
]
[[package]]
name = "postgres_versioninfo"
version = "0.1.0"
dependencies = [
"anyhow",
"serde",
"serde_repr",
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "posthog_client_lite"
version = "0.1.0"
@@ -6115,6 +6131,7 @@ dependencies = [
"postgres-protocol",
"postgres_backend",
"postgres_ffi",
"postgres_versioninfo",
"pprof",
"pq_proto",
"rand 0.8.5",
@@ -6159,6 +6176,7 @@ dependencies = [
"const_format",
"pageserver_api",
"postgres_ffi",
"postgres_versioninfo",
"pq_proto",
"serde",
"serde_json",
@@ -6481,6 +6499,17 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "serde_repr"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "serde_spanned"
version = "0.6.6"
@@ -6786,6 +6815,7 @@ dependencies = [
"hex",
"http-utils",
"humantime",
"humantime-serde",
"hyper 0.14.30",
"itertools 0.10.5",
"json-structural-diff",
@@ -6796,6 +6826,7 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"posthog_client_lite",
"rand 0.8.5",
"regex",
"reqwest",

View File

@@ -23,6 +23,7 @@ members = [
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/postgres_ffi_types",
"libs/postgres_versioninfo",
"libs/safekeeper_api",
"libs/desim",
"libs/neon-shmem",
@@ -174,6 +175,7 @@ serde_json = "1"
serde_path_to_error = "0.1"
serde_with = { version = "3", features = [ "base64" ] }
serde_assert = "0.5.0"
serde_repr = "0.1.20"
sha2 = "0.10.2"
signal-hook = "0.3"
smallvec = "1.11"
@@ -261,6 +263,7 @@ postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
postgres_ffi_types = { version = "0.1", path = "./libs/postgres_ffi_types/" }
postgres_versioninfo = { version = "0.1", path = "./libs/postgres_versioninfo/" }
postgres_initdb = { path = "./libs/postgres_initdb" }
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }

View File

@@ -159,8 +159,6 @@ postgres-%: postgres-configure-% \
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL $*"
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install
+@echo "Compiling libpq $*"
$(MAKE) -C $(BUILD_DIR)/$*/src/interfaces/libpq install
+@echo "Compiling pg_prewarm $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install
+@echo "Compiling pg_buffercache $*"

View File

@@ -22,7 +22,7 @@ sql_exporter.yml: $(jsonnet_files)
--output-file etc/$@ \
--tla-str collector_name=neon_collector \
--tla-str collector_file=neon_collector.yml \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter' \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter&pgaudit.log=none' \
etc/sql_exporter.jsonnet
sql_exporter_autoscaling.yml: $(jsonnet_files)
@@ -30,7 +30,7 @@ sql_exporter_autoscaling.yml: $(jsonnet_files)
--output-file etc/$@ \
--tla-str collector_name=neon_collector_autoscaling \
--tla-str collector_file=neon_collector_autoscaling.yml \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling' \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling&pgaudit.log=none' \
etc/sql_exporter.jsonnet
.PHONY: clean

View File

@@ -171,9 +171,6 @@ RUN cd postgres && \
eval $CONFIGURE_CMD && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
# Enable some of contrib extensions
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/dblink.control && \
@@ -1568,20 +1565,20 @@ ARG PG_VERSION
WORKDIR /ext-src
RUN case "${PG_VERSION}" in \
"v14") \
export PGAUDIT_VERSION=1.6.2 \
export PGAUDIT_CHECKSUM=1f350d70a0cbf488c0f2b485e3a5c9b11f78ad9e3cbb95ef6904afa1eb3187eb \
export PGAUDIT_VERSION=1.6.3 \
export PGAUDIT_CHECKSUM=37a8f5a7cc8d9188e536d15cf0fdc457fcdab2547caedb54442c37f124110919 \
;; \
"v15") \
export PGAUDIT_VERSION=1.7.0 \
export PGAUDIT_CHECKSUM=8f4a73e451c88c567e516e6cba7dc1e23bc91686bb6f1f77f8f3126d428a8bd8 \
export PGAUDIT_VERSION=1.7.1 \
export PGAUDIT_CHECKSUM=e9c8e6e092d82b2f901d72555ce0fe7780552f35f8985573796cd7e64b09d4ec \
;; \
"v16") \
export PGAUDIT_VERSION=16.0 \
export PGAUDIT_CHECKSUM=d53ef985f2d0b15ba25c512c4ce967dce07b94fd4422c95bd04c4c1a055fe738 \
export PGAUDIT_VERSION=16.1 \
export PGAUDIT_CHECKSUM=3bae908ab70ba0c6f51224009dbcfff1a97bd6104c6273297a64292e1b921fee \
;; \
"v17") \
export PGAUDIT_VERSION=17.0 \
export PGAUDIT_CHECKSUM=7d0d08d030275d525f36cd48b38c6455f1023da863385badff0cec44965bfd8c \
export PGAUDIT_VERSION=17.1 \
export PGAUDIT_CHECKSUM=9c5f37504d393486cc75d2ced83f75f5899be64fa85f689d6babb833b4361e6c \
;; \
*) \
echo "pgaudit is not supported on this PostgreSQL version" && exit 1;; \

View File

@@ -26,7 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn
@@ -59,7 +59,7 @@ files:
# the rules use ALL as the hostname. Avoid the pointless lookups and the "unable to
# resolve host" log messages that they generate.
Defaults !fqdn
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)

View File

@@ -26,7 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn
@@ -59,7 +59,7 @@ files:
# the rules use ALL as the hostname. Avoid the pointless lookups and the "unable to
# resolve host" log messages that they generate.
Defaults !fqdn
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)

View File

@@ -64,6 +64,7 @@ uuid.workspace = true
walkdir.workspace = true
x509-cert.workspace = true
postgres_versioninfo.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true
utils.workspace = true

View File

@@ -29,7 +29,7 @@ use anyhow::{Context, bail};
use aws_config::BehaviorVersion;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use compute_tools::extension_server::{PostgresMajorVersion, get_pg_version};
use compute_tools::extension_server::get_pg_version;
use nix::unistd::Pid;
use std::ops::Not;
use tracing::{Instrument, error, info, info_span, warn};
@@ -179,12 +179,8 @@ impl PostgresProcess {
.await
.context("create pgdata directory")?;
let pg_version = match get_pg_version(self.pgbin.as_ref()) {
PostgresMajorVersion::V14 => 14,
PostgresMajorVersion::V15 => 15,
PostgresMajorVersion::V16 => 16,
PostgresMajorVersion::V17 => 17,
};
let pg_version = get_pg_version(self.pgbin.as_ref());
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser: initdb_user,
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,

View File

@@ -405,7 +405,7 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
let options = match conn_conf.get_options() {
// Allow the control plane to override any options set by the
// compute

View File

@@ -74,9 +74,11 @@ More specifically, here is an example ext_index.json
use std::path::Path;
use std::str;
use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
use anyhow::{Context, Result, bail};
use bytes::Bytes;
use compute_api::spec::RemoteExtSpec;
use postgres_versioninfo::PgMajorVersion;
use regex::Regex;
use remote_storage::*;
use reqwest::StatusCode;
@@ -86,8 +88,6 @@ use tracing::log::warn;
use url::Url;
use zstd::stream::read::Decoder;
use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
@@ -106,7 +106,7 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
.to_string()
}
pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
pub fn get_pg_version(pgbin: &str) -> PgMajorVersion {
// pg_config --version returns a (platform specific) human readable string
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
let human_version = get_pg_config("--version", pgbin);
@@ -114,25 +114,11 @@ pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
}
pub fn get_pg_version_string(pgbin: &str) -> String {
match get_pg_version(pgbin) {
PostgresMajorVersion::V14 => "v14",
PostgresMajorVersion::V15 => "v15",
PostgresMajorVersion::V16 => "v16",
PostgresMajorVersion::V17 => "v17",
}
.to_owned()
get_pg_version(pgbin).v_str()
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PostgresMajorVersion {
V14,
V15,
V16,
V17,
}
fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
use PostgresMajorVersion::*;
fn parse_pg_version(human_version: &str) -> PgMajorVersion {
use PgMajorVersion::*;
// Normal releases have version strings like "PostgreSQL 15.4". But there
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
@@ -143,10 +129,10 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
.captures(human_version)
{
Some(captures) if captures.len() == 2 => match &captures["major"] {
"14" => return V14,
"15" => return V15,
"16" => return V16,
"17" => return V17,
"14" => return PG14,
"15" => return PG15,
"16" => return PG16,
"17" => return PG17,
_ => {}
},
_ => {}
@@ -343,25 +329,25 @@ mod tests {
#[test]
fn test_parse_pg_version() {
use super::PostgresMajorVersion::*;
assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15);
assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15);
use postgres_versioninfo::PgMajorVersion::*;
assert_eq!(parse_pg_version("PostgreSQL 15.4"), PG15);
assert_eq!(parse_pg_version("PostgreSQL 15.14"), PG15);
assert_eq!(
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
V15
PG15
);
assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14);
assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14);
assert_eq!(parse_pg_version("PostgreSQL 14.15"), PG14);
assert_eq!(parse_pg_version("PostgreSQL 14.0"), PG14);
assert_eq!(
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
V14
PG14
);
assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16);
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16);
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16);
assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16);
assert_eq!(parse_pg_version("PostgreSQL 16devel"), PG16);
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), PG16);
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), PG16);
assert_eq!(parse_pg_version("PostgreSQL 16extra"), PG16);
}
#[test]

View File

@@ -48,7 +48,7 @@ use postgres_connection::parse_host_port;
use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId};
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, PgMajorVersion, PgVersionId,
};
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use tokio::task::JoinSet;
@@ -64,7 +64,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: u32 = 17;
const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
@@ -169,7 +169,7 @@ struct TenantCreateCmdArgs {
#[arg(default_value_t = DEFAULT_PG_VERSION)]
#[clap(long, help = "Postgres version to use for the initial timeline")]
pg_version: u32,
pg_version: PgMajorVersion,
#[clap(
long,
@@ -292,7 +292,7 @@ struct TimelineCreateCmdArgs {
#[arg(default_value_t = DEFAULT_PG_VERSION)]
#[clap(long, help = "Postgres version")]
pg_version: u32,
pg_version: PgMajorVersion,
}
#[derive(clap::Args)]
@@ -324,7 +324,7 @@ struct TimelineImportCmdArgs {
#[arg(default_value_t = DEFAULT_PG_VERSION)]
#[clap(long, help = "Postgres version of the backup being imported")]
pg_version: u32,
pg_version: PgMajorVersion,
}
#[derive(clap::Subcommand)]
@@ -603,7 +603,7 @@ struct EndpointCreateCmdArgs {
#[arg(default_value_t = DEFAULT_PG_VERSION)]
#[clap(long, help = "Postgres version")]
pg_version: u32,
pg_version: PgMajorVersion,
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
///
@@ -1295,7 +1295,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
},
new_members: None,
};
let pg_version = args.pg_version * 10000;
let pg_version = PgVersionId::from(args.pg_version);
let req = safekeeper_api::models::TimelineCreateRequest {
tenant_id,
timeline_id,

View File

@@ -67,6 +67,7 @@ use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use pem::Pem;
use reqwest::header::CONTENT_TYPE;
use safekeeper_api::PgMajorVersion;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
@@ -89,7 +90,7 @@ pub struct EndpointConf {
pg_port: u16,
external_http_port: u16,
internal_http_port: u16,
pg_version: u32,
pg_version: PgMajorVersion,
grpc: bool,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
@@ -192,7 +193,7 @@ impl ComputeControlPlane {
pg_port: Option<u16>,
external_http_port: Option<u16>,
internal_http_port: Option<u16>,
pg_version: u32,
pg_version: PgMajorVersion,
mode: ComputeMode,
grpc: bool,
skip_pg_catalog_updates: bool,
@@ -312,7 +313,7 @@ pub struct Endpoint {
pub internal_http_address: SocketAddr,
// postgres major version in the format: 14, 15, etc.
pg_version: u32,
pg_version: PgMajorVersion,
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
@@ -557,7 +558,7 @@ impl Endpoint {
conf.append("hot_standby", "on");
// prefetching of blocks referenced in WAL doesn't make sense for us
// Neon hot standby ignores pages that are not in the shared_buffers
if self.pg_version >= 15 {
if self.pg_version >= PgMajorVersion::PG15 {
conf.append("recovery_prefetch", "off");
}
}

View File

@@ -12,9 +12,11 @@ use std::{env, fs};
use anyhow::{Context, bail};
use clap::ValueEnum;
use pageserver_api::config::PostHogConfig;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Certificate, Url};
use safekeeper_api::PgMajorVersion;
use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
@@ -212,6 +214,8 @@ pub struct NeonStorageControllerConf {
pub timeline_safekeeper_count: Option<i64>,
pub posthog_config: Option<PostHogConfig>,
pub kick_secondary_downloads: Option<bool>,
}
@@ -244,6 +248,7 @@ impl Default for NeonStorageControllerConf {
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
timeline_safekeeper_count: None,
posthog_config: None,
kick_secondary_downloads: None,
}
}
@@ -424,25 +429,21 @@ impl LocalEnv {
self.pg_distrib_dir.clone()
}
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_distrib_dir(&self, pg_version: PgMajorVersion) -> anyhow::Result<PathBuf> {
let path = self.pg_distrib_dir.clone();
#[allow(clippy::manual_range_patterns)]
match pg_version {
14 | 15 | 16 | 17 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
}
Ok(path.join(pg_version.v_str()))
}
pub fn pg_dir(&self, pg_version: u32, dir_name: &str) -> anyhow::Result<PathBuf> {
pub fn pg_dir(&self, pg_version: PgMajorVersion, dir_name: &str) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join(dir_name))
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_bin_dir(&self, pg_version: PgMajorVersion) -> anyhow::Result<PathBuf> {
self.pg_dir(pg_version, "bin")
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
pub fn pg_lib_dir(&self, pg_version: PgMajorVersion) -> anyhow::Result<PathBuf> {
self.pg_dir(pg_version, "lib")
}

View File

@@ -22,6 +22,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use safekeeper_api::PgMajorVersion;
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -607,7 +608,7 @@ impl PageServerNode {
timeline_id: TimelineId,
base: (Lsn, PathBuf),
pg_wal: Option<(Lsn, PathBuf)>,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<()> {
// Init base reader
let (start_lsn, base_tarfile_path) = base;

View File

@@ -6,6 +6,8 @@ use std::str::FromStr;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use crate::background_process;
use crate::local_env::{LocalEnv, NeonStorageControllerConf};
use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
@@ -22,6 +24,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Method, Response};
use safekeeper_api::PgMajorVersion;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -31,9 +34,6 @@ use utils::auth::{Claims, Scope, encode_from_key_file};
use utils::id::{NodeId, TenantId};
use whoami::username;
use crate::background_process;
use crate::local_env::{LocalEnv, NeonStorageControllerConf};
pub struct StorageController {
env: LocalEnv,
private_key: Option<Pem>,
@@ -48,7 +48,7 @@ pub struct StorageController {
const COMMAND: &str = "storage_controller";
const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
const STORAGE_CONTROLLER_POSTGRES_VERSION: PgMajorVersion = PgMajorVersion::PG16;
const DB_NAME: &str = "storage_controller";
@@ -184,9 +184,15 @@ impl StorageController {
/// to other versions if that one isn't found. Some automated tests create circumstances
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 16, 15, 14];
const PREFER_VERSIONS: [PgMajorVersion; 5] = [
STORAGE_CONTROLLER_POSTGRES_VERSION,
PgMajorVersion::PG16,
PgMajorVersion::PG15,
PgMajorVersion::PG14,
PgMajorVersion::PG17,
];
for v in prefer_versions {
for v in PREFER_VERSIONS {
let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
if tokio::fs::try_exists(&path).await? {
return Ok(path);
@@ -636,6 +642,18 @@ impl StorageController {
args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
}
let mut envs = vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
];
if let Some(posthog_config) = &self.config.posthog_config {
envs.push((
"POSTHOG_CONFIG".to_string(),
serde_json::to_string(posthog_config)?,
));
}
println!("Starting storage controller");
background_process::start_process(
@@ -643,10 +661,7 @@ impl StorageController {
&instance_dir,
&self.env.storage_controller_bin(),
args,
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
envs,
background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)),
&start_args.start_timeout,
|| async {

View File

@@ -18,6 +18,7 @@ bytes.workspace = true
byteorder.workspace = true
utils.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -63,7 +63,8 @@ impl Display for NodeMetadata {
}
}
/// PostHog integration config.
/// PostHog integration config. This is used in pageserver, storcon, and neon_local.
/// Ensure backward compatibility when adding new fields.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
@@ -76,7 +77,9 @@ pub struct PostHogConfig {
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
/// Refresh interval for the feature flag spec
/// Refresh interval for the feature flag spec.
/// The storcon will push the feature flag spec to the pageserver. If the pageserver does not receive
/// the spec for `refresh_interval`, it will fetch the spec from the PostHog API.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub refresh_interval: Option<Duration>,

View File

@@ -11,6 +11,7 @@ use std::time::{Duration, SystemTime};
#[cfg(feature = "testing")]
use camino::Utf8PathBuf;
use postgres_versioninfo::PgMajorVersion;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
pub use utilization::PageserverUtilization;
@@ -398,7 +399,7 @@ pub enum TimelineCreateRequestMode {
// inherits the ancestor's pg_version. Earlier code wasn't
// using a flattened enum, so, it was an accepted field, and
// we continue to accept it by having it here.
pg_version: Option<u32>,
pg_version: Option<PgMajorVersion>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
read_only: bool,
},
@@ -410,7 +411,7 @@ pub enum TimelineCreateRequestMode {
Bootstrap {
#[serde(default)]
existing_initdb_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
pg_version: Option<PgMajorVersion>,
},
}
@@ -1573,7 +1574,7 @@ pub struct TimelineInfo {
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
pub pg_version: u32,
pub pg_version: PgMajorVersion,
pub state: TimelineState,

View File

@@ -19,6 +19,7 @@ serde.workspace = true
postgres_ffi_types.workspace = true
utils.workspace = true
tracing.workspace = true
postgres_versioninfo.workspace = true
[dev-dependencies]
env_logger.workspace = true

View File

@@ -4,6 +4,7 @@ use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use postgres_ffi::v17::wal_generator::LogicalMessageGenerator;
use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_versioninfo::PgMajorVersion;
use pprof::criterion::{Output, PProfProfiler};
use utils::lsn::Lsn;
@@ -32,7 +33,7 @@ fn bench_complete_record(c: &mut Criterion) {
let value_size = LogicalMessageGenerator::make_value_size(size, PREFIX);
let value = vec![1; value_size];
let mut decoder = WalStreamDecoder::new(Lsn(0), 170000);
let mut decoder = WalStreamDecoder::new(Lsn(0), PgMajorVersion::PG17);
let msg = LogicalMessageGenerator::new(PREFIX, &value)
.next()
.unwrap()

View File

@@ -14,6 +14,8 @@ use bytes::Bytes;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
pub use postgres_versioninfo::PgMajorVersion;
macro_rules! postgres_ffi {
($version:ident) => {
#[path = "."]
@@ -91,21 +93,22 @@ macro_rules! dispatch_pgversion {
$version => $code,
default = $invalid_pgver_handling,
pgversions = [
14 : v14,
15 : v15,
16 : v16,
17 : v17,
$crate::PgMajorVersion::PG14 => v14,
$crate::PgMajorVersion::PG15 => v15,
$crate::PgMajorVersion::PG16 => v16,
$crate::PgMajorVersion::PG17 => v17,
]
)
};
($pgversion:expr => $code:expr,
default = $default:expr,
pgversions = [$($sv:literal : $vsv:ident),+ $(,)?]) => {
match ($pgversion) {
pgversions = [$($sv:pat => $vsv:ident),+ $(,)?]) => {
match ($pgversion.clone().into()) {
$($sv => {
use $crate::$vsv as pgv;
$code
},)+
#[allow(unreachable_patterns)]
_ => {
$default
}
@@ -179,9 +182,9 @@ macro_rules! enum_pgversion {
$($variant ( $crate::$md::$t )),+
}
impl self::$name {
pub fn pg_version(&self) -> u32 {
pub fn pg_version(&self) -> PgMajorVersion {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
pgv::bindings::MY_PGVERSION
})
}
}
@@ -195,15 +198,15 @@ macro_rules! enum_pgversion {
};
{name = $name:ident,
path = $p:ident,
typ = $t:ident,
$(typ = $t:ident,)?
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ($crate::$md::$p::$t)),+
$($variant $(($crate::$md::$p::$t))?),+
}
impl $name {
pub fn pg_version(&self) -> u32 {
pub fn pg_version(&self) -> PgMajorVersion {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
pgv::bindings::MY_PGVERSION
})
}
}
@@ -249,22 +252,21 @@ pub use v14::xlog_utils::{
try_from_pg_timestamp,
};
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool {
pub fn bkpimage_is_compressed(bimg_info: u8, version: PgMajorVersion) -> bool {
dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
}
pub fn generate_wal_segment(
segno: u64,
system_id: u64,
pg_version: u32,
pg_version: PgMajorVersion,
lsn: Lsn,
) -> Result<Bytes, SerializeError> {
assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
dispatch_pgversion!(
pg_version,
pgv::xlog_utils::generate_wal_segment(segno, system_id, lsn),
Err(SerializeError::BadInput)
pgv::xlog_utils::generate_wal_segment(segno, system_id, lsn)
)
}
@@ -272,7 +274,7 @@ pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<(Bytes, u64, bool)> {
dispatch_pgversion!(
pg_version,
@@ -352,6 +354,7 @@ pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
pub mod waldecoder {
use std::num::NonZeroU32;
use crate::PgMajorVersion;
use bytes::{Buf, Bytes, BytesMut};
use thiserror::Error;
use utils::lsn::Lsn;
@@ -369,7 +372,7 @@ pub mod waldecoder {
pub struct WalStreamDecoder {
pub lsn: Lsn,
pub pg_version: u32,
pub pg_version: PgMajorVersion,
pub inputbuf: BytesMut,
pub state: State,
}
@@ -382,7 +385,7 @@ pub mod waldecoder {
}
impl WalStreamDecoder {
pub fn new(lsn: Lsn, pg_version: u32) -> WalStreamDecoder {
pub fn new(lsn: Lsn, pg_version: PgMajorVersion) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
pg_version,

View File

@@ -1,3 +1,7 @@
use crate::PgMajorVersion;
pub const MY_PGVERSION: PgMajorVersion = PgMajorVersion::PG14;
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;

View File

@@ -1,3 +1,7 @@
use crate::PgMajorVersion;
pub const MY_PGVERSION: PgMajorVersion = PgMajorVersion::PG15;
pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8;
pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00;

View File

@@ -1,3 +1,7 @@
use crate::PgMajorVersion;
pub const MY_PGVERSION: PgMajorVersion = PgMajorVersion::PG16;
pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8;
pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00;

View File

@@ -1,3 +1,7 @@
use crate::PgMajorVersion;
pub const MY_PGVERSION: PgMajorVersion = PgMajorVersion::PG17;
pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8;
pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00;

View File

@@ -9,8 +9,8 @@ use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId,
TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, PgMajorVersion,
RepOriginId, TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
@@ -199,20 +199,17 @@ impl DecodedWALRecord {
/// Check if this WAL record represents a legacy "copy" database creation, which populates new relations
/// by reading other existing relations' data blocks. This is more complex to apply than new-style database
/// creations which simply include all the desired blocks in the WAL, so we need a helper function to detect this case.
pub fn is_dbase_create_copy(&self, pg_version: u32) -> bool {
pub fn is_dbase_create_copy(&self, pg_version: PgMajorVersion) -> bool {
if self.xl_rmid == pg_constants::RM_DBASE_ID {
let info = self.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
match pg_version {
14 => {
PgMajorVersion::PG14 => {
// Postgres 14 database creations are always the legacy kind
info == crate::v14::bindings::XLOG_DBASE_CREATE
}
15 => info == crate::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
16 => info == crate::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
17 => info == crate::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY,
_ => {
panic!("Unsupported postgres version {pg_version}")
}
PgMajorVersion::PG15 => info == crate::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY,
PgMajorVersion::PG16 => info == crate::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY,
PgMajorVersion::PG17 => info == crate::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY,
}
} else {
false
@@ -248,7 +245,7 @@ impl DecodedWALRecord {
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<()> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
@@ -1106,9 +1103,9 @@ pub struct XlClogTruncate {
}
impl XlClogTruncate {
pub fn decode(buf: &mut Bytes, pg_version: u32) -> XlClogTruncate {
pub fn decode(buf: &mut Bytes, pg_version: PgMajorVersion) -> XlClogTruncate {
XlClogTruncate {
pageno: if pg_version < 17 {
pageno: if pg_version < PgMajorVersion::PG17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32

View File

@@ -11,9 +11,9 @@ use super::super::waldecoder::WalStreamDecoder;
use super::bindings::{
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
MY_PGVERSION
};
use super::wal_generator::LogicalMessageGenerator;
use super::PG_MAJORVERSION;
use crate::pg_constants;
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
@@ -233,7 +233,7 @@ pub fn find_end_of_wal(
let mut result = start_lsn;
let mut curr_lsn = start_lsn;
let mut buf = [0u8; XLOG_BLCKSZ];
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
let pg_version = MY_PGVERSION;
debug!("find_end_of_wal PG_VERSION: {}", pg_version);
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);

View File

@@ -4,6 +4,7 @@ use std::str::FromStr;
use anyhow::*;
use clap::{Arg, ArgMatches, Command, value_parser};
use postgres::Client;
use postgres_ffi::PgMajorVersion;
use wal_craft::*;
fn main() -> Result<()> {
@@ -48,7 +49,7 @@ fn main() -> Result<()> {
Some(("with-initdb", arg_matches)) => {
let cfg = Conf {
pg_version: *arg_matches
.get_one::<u32>("pg-version")
.get_one::<PgMajorVersion>("pg-version")
.context("'pg-version' is required")?,
pg_distrib_dir: arg_matches
.get_one::<PathBuf>("pg-distrib-dir")

View File

@@ -9,8 +9,8 @@ use log::*;
use postgres::Client;
use postgres::types::PgLsn;
use postgres_ffi::{
WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD,
XLOG_SIZE_OF_XLOG_SHORT_PHD,
PgMajorVersion, WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD,
XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
macro_rules! xlog_utils_test {
@@ -29,7 +29,7 @@ macro_rules! xlog_utils_test {
postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
pub struct Conf {
pub pg_version: u32,
pub pg_version: PgMajorVersion,
pub pg_distrib_dir: PathBuf,
pub datadir: PathBuf,
}
@@ -52,11 +52,7 @@ impl Conf {
pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
let path = self.pg_distrib_dir.clone();
#[allow(clippy::manual_range_patterns)]
match self.pg_version {
14 | 15 | 16 | 17 => Ok(path.join(format!("v{}", self.pg_version))),
_ => bail!("Unsupported postgres version: {}", self.pg_version),
}
Ok(path.join(self.pg_version.v_str()))
}
fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {

View File

@@ -24,7 +24,7 @@ fn init_logging() {
fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
use crate::*;
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
let pg_version = MY_PGVERSION;
// Craft some WAL
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))

View File

@@ -9,4 +9,5 @@ anyhow.workspace = true
tokio.workspace = true
camino.workspace = true
thiserror.workspace = true
postgres_versioninfo.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -7,12 +7,13 @@
use std::fmt;
use camino::Utf8Path;
use postgres_versioninfo::PgMajorVersion;
pub struct RunInitdbArgs<'a> {
pub superuser: &'a str,
pub locale: &'a str,
pub initdb_bin: &'a Utf8Path,
pub pg_version: u32,
pub pg_version: PgMajorVersion,
pub library_search_path: &'a Utf8Path,
pub pgdata: &'a Utf8Path,
}
@@ -79,12 +80,16 @@ pub async fn do_run_initdb(args: RunInitdbArgs<'_>) -> Result<(), Error> {
.stderr(std::process::Stdio::piped());
// Before version 14, only the libc provide was available.
if pg_version > 14 {
if pg_version > PgMajorVersion::PG14 {
// Version 17 brought with it a builtin locale provider which only provides
// C and C.UTF-8. While being safer for collation purposes since it is
// guaranteed to be consistent throughout a major release, it is also more
// performant.
let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" };
let locale_provider = if pg_version >= PgMajorVersion::PG17 {
"builtin"
} else {
"libc"
};
initdb_command.args(["--locale-provider", locale_provider]);
}

View File

@@ -0,0 +1,12 @@
[package]
name = "postgres_versioninfo"
version = "0.1.0"
edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
thiserror.workspace = true
serde.workspace = true
serde_repr.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,175 @@
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::fmt::{Display, Formatter};
use std::str::FromStr;
/// An enum with one variant for each major version of PostgreSQL that we support.
///
#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Deserialize_repr, Serialize_repr)]
#[repr(u32)]
pub enum PgMajorVersion {
PG14 = 14,
PG15 = 15,
PG16 = 16,
PG17 = 17,
// !!! When you add a new PgMajorVersion, don't forget to update PgMajorVersion::ALL
}
/// A full PostgreSQL version ID, in MMmmbb numerical format (Major/minor/bugfix)
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
#[repr(transparent)]
pub struct PgVersionId(u32);
impl PgVersionId {
pub const UNKNOWN: PgVersionId = PgVersionId(0);
pub fn from_full_pg_version(version: u32) -> PgVersionId {
match version {
0 => PgVersionId(version), // unknown version
140000..180000 => PgVersionId(version),
_ => panic!("Invalid full PostgreSQL version ID {version}"),
}
}
}
impl Display for PgVersionId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
u32::fmt(&self.0, f)
}
}
impl Serialize for PgVersionId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
u32::serialize(&self.0, serializer)
}
}
impl<'de> Deserialize<'de> for PgVersionId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
u32::deserialize(deserializer).map(PgVersionId)
}
fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
where
D: Deserializer<'de>,
{
u32::deserialize_in_place(deserializer, &mut place.0)
}
}
impl PgMajorVersion {
/// Get the numerical representation of the represented Major Version
pub const fn major_version_num(&self) -> u32 {
match self {
PgMajorVersion::PG14 => 14,
PgMajorVersion::PG15 => 15,
PgMajorVersion::PG16 => 16,
PgMajorVersion::PG17 => 17,
}
}
/// Get the contents of this version's PG_VERSION file.
///
/// The PG_VERSION file is used to determine the PostgreSQL version that currently
/// owns the data in a PostgreSQL data directory.
pub fn versionfile_string(&self) -> &'static str {
match self {
PgMajorVersion::PG14 => "14",
PgMajorVersion::PG15 => "15",
PgMajorVersion::PG16 => "16\x0A",
PgMajorVersion::PG17 => "17\x0A",
}
}
/// Get the v{version} string of this major PostgreSQL version.
///
/// Because this was hand-coded in various places, this was moved into a shared
/// implementation.
pub fn v_str(&self) -> String {
match self {
PgMajorVersion::PG14 => "v14",
PgMajorVersion::PG15 => "v15",
PgMajorVersion::PG16 => "v16",
PgMajorVersion::PG17 => "v17",
}
.to_string()
}
/// All currently supported major versions of PostgreSQL.
pub const ALL: &'static [PgMajorVersion] = &[
PgMajorVersion::PG14,
PgMajorVersion::PG15,
PgMajorVersion::PG16,
PgMajorVersion::PG17,
];
}
impl Display for PgMajorVersion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
PgMajorVersion::PG14 => "PgMajorVersion::PG14",
PgMajorVersion::PG15 => "PgMajorVersion::PG15",
PgMajorVersion::PG16 => "PgMajorVersion::PG16",
PgMajorVersion::PG17 => "PgMajorVersion::PG17",
})
}
}
#[derive(Debug, thiserror::Error)]
#[allow(dead_code)]
pub struct InvalidPgVersion(u32);
impl Display for InvalidPgVersion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "InvalidPgVersion({})", self.0)
}
}
impl TryFrom<PgVersionId> for PgMajorVersion {
type Error = InvalidPgVersion;
fn try_from(value: PgVersionId) -> Result<Self, Self::Error> {
Ok(match value.0 / 10000 {
14 => PgMajorVersion::PG14,
15 => PgMajorVersion::PG15,
16 => PgMajorVersion::PG16,
17 => PgMajorVersion::PG17,
_ => return Err(InvalidPgVersion(value.0)),
})
}
}
impl From<PgMajorVersion> for PgVersionId {
fn from(value: PgMajorVersion) -> Self {
PgVersionId((value as u32) * 10000)
}
}
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
pub struct PgMajorVersionParseError(String);
impl Display for PgMajorVersionParseError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PgMajorVersionParseError({})", self.0)
}
}
impl FromStr for PgMajorVersion {
type Err = PgMajorVersionParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"14" => PgMajorVersion::PG14,
"15" => PgMajorVersion::PG15,
"16" => PgMajorVersion::PG16,
"17" => PgMajorVersion::PG17,
_ => return Err(PgMajorVersionParseError(s.to_string())),
})
}
}

View File

@@ -1,17 +1,22 @@
//! A background loop that fetches feature flags from PostHog and updates the feature store.
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use arc_swap::ArcSwap;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info_span};
use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig};
use crate::{
CaptureEvent, FeatureStore, LocalEvaluationResponse, PostHogClient, PostHogClientConfig,
};
/// A background loop that fetches feature flags from PostHog and updates the feature store.
pub struct FeatureResolverBackgroundLoop {
posthog_client: PostHogClient,
feature_store: ArcSwap<FeatureStore>,
feature_store: ArcSwap<(SystemTime, Arc<FeatureStore>)>,
cancel: CancellationToken,
}
@@ -19,11 +24,35 @@ impl FeatureResolverBackgroundLoop {
pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
Self {
posthog_client: PostHogClient::new(config),
feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
feature_store: ArcSwap::new(Arc::new((
SystemTime::UNIX_EPOCH,
Arc::new(FeatureStore::new()),
))),
cancel: shutdown_pageserver,
}
}
/// Update the feature store with a new feature flag spec bypassing the normal refresh loop.
pub fn update(&self, spec: String) -> anyhow::Result<()> {
let resp: LocalEvaluationResponse = serde_json::from_str(&spec)?;
self.update_feature_store_nofail(resp, "http_propagate");
Ok(())
}
fn update_feature_store_nofail(&self, resp: LocalEvaluationResponse, source: &'static str) {
let project_id = self.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
self.feature_store
.store(Arc::new((SystemTime::now(), Arc::new(feature_store))));
tracing::info!("Feature flag updated from {}", source);
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec from {}: {}", source, e);
}
}
}
pub fn spawn(
self: Arc<Self>,
handle: &tokio::runtime::Handle,
@@ -47,6 +76,17 @@ impl FeatureResolverBackgroundLoop {
_ = ticker.tick() => {}
_ = cancel.cancelled() => break
}
{
let last_update = this.feature_store.load().0;
if let Ok(elapsed) = last_update.elapsed() {
if elapsed < refresh_period {
tracing::debug!(
"Skipping feature flag refresh because it's too soon"
);
continue;
}
}
}
let resp = match this
.posthog_client
.get_feature_flags_local_evaluation()
@@ -58,16 +98,7 @@ impl FeatureResolverBackgroundLoop {
continue;
}
};
let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec: {}", e);
}
}
this.update_feature_store_nofail(resp, "refresh_loop");
}
tracing::info!("PostHog feature resolver stopped");
}
@@ -92,6 +123,6 @@ impl FeatureResolverBackgroundLoop {
}
pub fn feature_store(&self) -> Arc<FeatureStore> {
self.feature_store.load_full()
self.feature_store.load().1.clone()
}
}

View File

@@ -544,17 +544,8 @@ impl PostHogClient {
self.config.server_api_key.starts_with("phs_")
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
/// - <https://posthog.com/docs/api/feature-flags#get-api-projects-project_id-feature_flags-local_evaluation>
/// - <https://posthog.com/docs/feature-flags/local-evaluation>
///
/// The handling logic in [`FeatureStore`] mostly follows the Python API implementation.
/// See `_compute_flag_locally` in <https://github.com/PostHog/posthog-python/blob/master/posthog/client.py>
pub async fn get_feature_flags_local_evaluation(
&self,
) -> anyhow::Result<LocalEvaluationResponse> {
/// Get the raw JSON spec, same as `get_feature_flags_local_evaluation` but without parsing.
pub async fn get_feature_flags_local_evaluation_raw(&self) -> anyhow::Result<String> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
// OR
@@ -588,7 +579,22 @@ impl PostHogClient {
body
));
}
Ok(serde_json::from_str(&body)?)
Ok(body)
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
/// - <https://posthog.com/docs/api/feature-flags#get-api-projects-project_id-feature_flags-local_evaluation>
/// - <https://posthog.com/docs/feature-flags/local-evaluation>
///
/// The handling logic in [`FeatureStore`] mostly follows the Python API implementation.
/// See `_compute_flag_locally` in <https://github.com/PostHog/posthog-python/blob/master/posthog/client.py>
pub async fn get_feature_flags_local_evaluation(
&self,
) -> Result<LocalEvaluationResponse, anyhow::Error> {
let raw = self.get_feature_flags_local_evaluation_raw().await?;
Ok(serde_json::from_str(&raw)?)
}
/// Capture an event. This will only be used to report the feature flag usage back to PostHog, though

View File

@@ -12,7 +12,9 @@ use tokio::net::TcpStream;
use crate::connect::connect;
use crate::connect_raw::{RawConnection, connect_raw};
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{MakeTlsConnect, TlsConnect, TlsStream};
use crate::{Client, Connection, Error};
/// TLS configuration.
@@ -238,7 +240,7 @@ impl Config {
connect(tls, self).await
}
pub async fn connect_raw<S, T>(
pub async fn tls_and_authenticate<S, T>(
&self,
stream: S,
tls: T,
@@ -247,7 +249,19 @@ impl Config {
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
connect_raw(stream, tls, self).await
let stream = connect_tls(stream, self.ssl_mode, tls).await?;
connect_raw(stream, self).await
}
pub async fn authenticate<S, T>(
&self,
stream: MaybeTlsStream<S, T>,
) -> Result<RawConnection<S, T>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsStream + Unpin,
{
connect_raw(stream, self).await
}
}

View File

@@ -9,6 +9,7 @@ use crate::codec::BackendMessage;
use crate::config::Host;
use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::connect_tls::connect_tls;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
@@ -44,13 +45,14 @@ where
T: TlsConnect<TcpStream>,
{
let socket = connect_socket(host_addr, host, port, config.connect_timeout).await?;
let stream = connect_tls(socket, config.ssl_mode, tls).await?;
let RawConnection {
stream,
parameters,
delayed_notice,
process_id,
secret_key,
} = connect_raw(socket, tls, config).await?;
} = connect_raw(stream, config).await?;
let socket_config = SocketConfig {
host_addr,

View File

@@ -16,9 +16,8 @@ use tokio_util::codec::Framed;
use crate::Error;
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{self, AuthKeys, Config};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{TlsConnect, TlsStream};
use crate::tls::TlsStream;
pub struct StartupStream<S, T> {
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
@@ -87,16 +86,13 @@ pub struct RawConnection<S, T> {
}
pub async fn connect_raw<S, T>(
stream: S,
tls: T,
stream: MaybeTlsStream<S, T>,
config: &Config,
) -> Result<RawConnection<S, T::Stream>, Error>
) -> Result<RawConnection<S, T>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
T: TlsStream + Unpin,
{
let stream = connect_tls(stream, config.ssl_mode, tls).await?;
let mut stream = StartupStream {
inner: Framed::new(stream, PostgresCodec),
buf: BackendMessages::empty(),

View File

@@ -10,6 +10,7 @@ const_format.workspace = true
serde.workspace = true
serde_json.workspace = true
postgres_ffi.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
tokio.workspace = true
utils.workspace = true

View File

@@ -8,6 +8,8 @@ pub mod membership;
/// Public API types
pub mod models;
pub use postgres_versioninfo::{PgMajorVersion, PgVersionId};
/// Consensus logical timestamp. Note: it is a part of sk control file.
pub type Term = u64;
/// With this term timeline is created initially. It
@@ -20,7 +22,7 @@ pub const INITIAL_TERM: Term = 0;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerInfo {
/// Postgres server version
pub pg_version: u32,
pub pg_version: PgVersionId,
pub system_id: SystemId,
pub wal_seg_size: u32,
}

View File

@@ -4,6 +4,7 @@ use std::net::SocketAddr;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use postgres_versioninfo::PgVersionId;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
@@ -23,8 +24,7 @@ pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mconf: Configuration,
/// In the PG_VERSION_NUM macro format, like 140017.
pub pg_version: u32,
pub pg_version: PgVersionId,
pub system_id: Option<u64>,
// By default WAL_SEGMENT_SIZE
pub wal_seg_size: Option<u32>,

View File

@@ -10,7 +10,7 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, WAL_SEGMENT_SIZE};
use pprof::criterion::{Output, PProfProfiler};
use remote_storage::{
DownloadOpts, GenericRemoteStorage, ListingMode, RemoteStorageConfig, RemoteStorageKind,
@@ -115,7 +115,7 @@ struct BenchmarkData {
#[derive(Deserialize)]
struct BenchmarkMetadata {
pg_version: u32,
pg_version: PgMajorVersion,
start_lsn: Lsn,
}

View File

@@ -7,8 +7,8 @@ use bytes::{Buf, Bytes};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::pg_constants;
use postgres_ffi::walrecord::*;
use postgres_ffi::{PgMajorVersion, pg_constants};
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
use utils::lsn::Lsn;
@@ -24,7 +24,7 @@ impl InterpretedWalRecord {
buf: Bytes,
shards: &[ShardIdentity],
next_record_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(buf, &mut decoded, pg_version)?;
@@ -78,7 +78,7 @@ impl MetadataRecord {
decoded: &DecodedWALRecord,
shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
next_record_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<()> {
// Note: this doesn't actually copy the bytes since
// the [`Bytes`] type implements it via a level of indirection.
@@ -193,7 +193,7 @@ impl MetadataRecord {
fn decode_heapam_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -205,7 +205,7 @@ impl MetadataRecord {
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
match pg_version {
14 => {
PgMajorVersion::PG14 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -272,7 +272,7 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
15 => {
PgMajorVersion::PG15 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -339,7 +339,7 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
16 => {
PgMajorVersion::PG16 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -406,7 +406,7 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
17 => {
PgMajorVersion::PG17 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -473,7 +473,6 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
_ => {}
}
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
@@ -500,7 +499,7 @@ impl MetadataRecord {
fn decode_neonmgr_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -514,7 +513,7 @@ impl MetadataRecord {
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match pg_version {
16 | 17 => {
PgMajorVersion::PG16 | PgMajorVersion::PG17 => {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
match info {
@@ -574,7 +573,7 @@ impl MetadataRecord {
info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
}
}
_ => anyhow::bail!(
PgMajorVersion::PG15 | PgMajorVersion::PG14 => anyhow::bail!(
"Neon RMGR has no known compatibility with PostgreSQL version {}",
pg_version
),
@@ -629,116 +628,121 @@ impl MetadataRecord {
fn decode_dbase_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
// TODO: Refactor this to avoid the duplication between postgres versions.
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
if pg_version == 14 {
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(buf);
tracing::debug!("XLOG_DBASE_CREATE v14");
match pg_version {
PgMajorVersion::PG14 => {
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(buf);
tracing::debug!("XLOG_DBASE_CREATE v14");
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
return Ok(Some(record));
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
} else if pg_version == 15 {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
PgMajorVersion::PG15 => {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
} else if pg_version == 16 {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
PgMajorVersion::PG16 => {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
} else if pg_version == 17 {
if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
PgMajorVersion::PG17 => {
if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
}
@@ -748,12 +752,12 @@ impl MetadataRecord {
fn decode_clog_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
let pageno = if pg_version < 17 {
let pageno = if pg_version < PgMajorVersion::PG17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
@@ -765,7 +769,7 @@ impl MetadataRecord {
ClogZeroPage { segno, rpageno },
))))
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
assert_eq!(info, pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(buf, pg_version);
Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
@@ -838,14 +842,14 @@ impl MetadataRecord {
fn decode_multixact_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
let pageno = if pg_version < 17 {
let pageno = if pg_version < PgMajorVersion::PG17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32

View File

@@ -13,7 +13,7 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
use postgres_ffi::{BLCKSZ, PgMajorVersion, page_is_new, page_set_lsn, pg_constants};
use serde::{Deserialize, Serialize};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
@@ -139,7 +139,7 @@ impl SerializedValueBatch {
decoded: DecodedWALRecord,
shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
next_record_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<()> {
// First determine how big the buffers need to be and allocate it up-front.
// This duplicates some of the work below, but it's empirically much faster.
@@ -267,7 +267,7 @@ impl SerializedValueBatch {
fn estimate_buffer_size(
decoded: &DecodedWALRecord,
shard: &ShardIdentity,
pg_version: u32,
pg_version: PgMajorVersion,
) -> usize {
let mut estimate: usize = 0;
@@ -303,7 +303,11 @@ impl SerializedValueBatch {
estimate
}
fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
fn block_is_image(
decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock,
pg_version: PgMajorVersion,
) -> bool {
blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID

File diff suppressed because one or more lines are too long

View File

@@ -18,6 +18,7 @@ workspace_hack = { version = "0.1", path = "../../workspace_hack" }
tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio.workspace = true
postgres_versioninfo.workspace = true
futures.workspace = true
tokio-util.workspace = true
anyhow.workspace = true

View File

@@ -7,6 +7,7 @@ use detach_ancestor::AncestorDetached;
use http_utils::error::HttpErrorBody;
use pageserver_api::models::*;
use pageserver_api::shard::TenantShardId;
use postgres_versioninfo::PgMajorVersion;
pub use reqwest::Body as ReqwestBody;
use reqwest::{IntoUrl, Method, StatusCode, Url};
use utils::id::{TenantId, TimelineId};
@@ -745,9 +746,11 @@ impl Client {
timeline_id: TimelineId,
base_lsn: Lsn,
end_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
basebackup_tarball: ReqwestBody,
) -> Result<()> {
let pg_version = pg_version.major_version_num();
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
self.mgmt_api_endpoint,
@@ -841,4 +844,13 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn update_feature_flag_spec(&self, spec: String) -> Result<()> {
let uri = format!("{}/v1/feature_flag_spec", self.mgmt_api_endpoint);
self.request(Method::POST, uri, spec)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -20,7 +20,8 @@ use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};
use postgres_ffi::{
BLCKSZ, PG_TLI, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName, dispatch_pgversion, pg_constants,
BLCKSZ, PG_TLI, PgMajorVersion, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName,
dispatch_pgversion, pg_constants,
};
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
@@ -619,10 +620,7 @@ where
};
if spcnode == GLOBALTABLESPACE_OID {
let pg_version_str = match self.timeline.pg_version {
14 | 15 => self.timeline.pg_version.to_string(),
ver => format!("{ver}\x0A"),
};
let pg_version_str = self.timeline.pg_version.versionfile_string();
let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
self.ar
.append(&header, pg_version_str.as_bytes())
@@ -679,10 +677,7 @@ where
if let Some(img) = relmap_img {
let dst_path = format!("base/{dbnode}/PG_VERSION");
let pg_version_str = match self.timeline.pg_version {
14 | 15 => self.timeline.pg_version.to_string(),
ver => format!("{ver}\x0A"),
};
let pg_version_str = self.timeline.pg_version.versionfile_string();
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
self.ar
.append(&header, pg_version_str.as_bytes())
@@ -713,7 +708,7 @@ where
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = if self.timeline.pg_version < 17 {
let path = if self.timeline.pg_version < PgMajorVersion::PG17 {
format!("pg_twophase/{xid:>08X}")
} else {
format!("pg_twophase/{xid:>016X}")

View File

@@ -11,7 +11,7 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail, ensure};
use anyhow::{Context, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::config::{
@@ -22,6 +22,7 @@ use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pem::Pem;
use postgres_backend::AuthType;
use postgres_ffi::PgMajorVersion;
use remote_storage::{RemotePath, RemoteStorageConfig};
use reqwest::Url;
use storage_broker::Uri;
@@ -338,20 +339,16 @@ impl PageServerConf {
//
// Postgres distribution paths
//
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
pub fn pg_distrib_dir(&self, pg_version: PgMajorVersion) -> anyhow::Result<Utf8PathBuf> {
let path = self.pg_distrib_dir.clone();
#[allow(clippy::manual_range_patterns)]
match pg_version {
14 | 15 | 16 | 17 => Ok(path.join(format!("v{pg_version}"))),
_ => bail!("Unsupported postgres version: {}", pg_version),
}
Ok(path.join(pg_version.v_str()))
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
pub fn pg_bin_dir(&self, pg_version: PgMajorVersion) -> anyhow::Result<Utf8PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
pub fn pg_lib_dir(&self, pg_version: PgMajorVersion) -> anyhow::Result<Utf8PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
}

View File

@@ -31,6 +31,13 @@ impl FeatureResolver {
}
}
pub fn update(&self, spec: String) -> anyhow::Result<()> {
if let Some(inner) = &self.inner {
inner.update(spec)?;
}
Ok(())
}
pub fn spawn(
conf: &PageServerConf,
shutdown_pageserver: CancellationToken,

View File

@@ -41,6 +41,7 @@ use pageserver_api::models::{
TopTenantShardItem, TopTenantShardsRequest, TopTenantShardsResponse,
};
use pageserver_api::shard::{ShardCount, TenantShardId};
use postgres_ffi::PgMajorVersion;
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
use scopeguard::defer;
use serde_json::json;
@@ -3385,7 +3386,7 @@ async fn put_tenant_timeline_import_basebackup(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let base_lsn: Lsn = must_parse_query_param(&request, "base_lsn")?;
let end_lsn: Lsn = must_parse_query_param(&request, "end_lsn")?;
let pg_version: u32 = must_parse_query_param(&request, "pg_version")?;
let pg_version: PgMajorVersion = must_parse_query_param(&request, "pg_version")?;
check_permission(&request, Some(tenant_id))?;
@@ -3742,6 +3743,20 @@ async fn force_override_feature_flag_for_testing_delete(
json_response(StatusCode::OK, ())
}
async fn update_feature_flag_spec(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let body = json_request(&mut request).await?;
let state = get_state(&request);
state
.feature_resolver
.update(body)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -4127,5 +4142,8 @@ pub fn make_router(
.delete("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - delete", r, force_override_feature_flag_for_testing_delete)
})
.post("/v1/feature_flag_spec", |r| {
api_handler(r, update_feature_flag_spec)
})
.any(handler_404))
}

View File

@@ -38,6 +38,7 @@ pub mod walredo;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use postgres_ffi::PgMajorVersion;
use tenant::mgr::{BackgroundPurges, TenantManager};
use tenant::secondary;
use tracing::{info, info_span};
@@ -51,7 +52,7 @@ use tracing::{info, info_span};
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 17;
pub const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -25,7 +25,7 @@ use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
use pageserver_api::models::RelSizeMigration;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{BLCKSZ, TimestampTz, TransactionId};
use postgres_ffi::{BLCKSZ, PgMajorVersion, TimestampTz, TransactionId};
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi_types::{Oid, RepOriginId};
use serde::{Deserialize, Serialize};
@@ -1081,7 +1081,7 @@ impl Timeline {
// fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
if self.pg_version >= 17 {
if self.pg_version >= PgMajorVersion::PG17 {
Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
} else {
Ok(TwoPhaseDirectory::des(&buf)?
@@ -1613,7 +1613,7 @@ impl DatadirModification<'_> {
.push((DirectoryKind::Db, MetricsUpdate::Set(0)));
self.put(DBDIR_KEY, Value::Image(buf.into()));
let buf = if self.tline.pg_version >= 17 {
let buf = if self.tline.pg_version >= PgMajorVersion::PG17 {
TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
xids: HashSet::new(),
})
@@ -1967,7 +1967,7 @@ impl DatadirModification<'_> {
) -> Result<(), WalIngestError> {
// Add it to the directory entry
let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
if !dir.xids.insert(xid) {
Err(WalIngestErrorKind::FileAlreadyExists(xid))?;
@@ -2383,7 +2383,7 @@ impl DatadirModification<'_> {
) -> Result<(), WalIngestError> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let newdirbuf = if self.tline.pg_version >= PgMajorVersion::PG17 {
let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
if !dir.xids.remove(&xid) {

View File

@@ -38,6 +38,7 @@ use pageserver_api::models::{
WalRedoManagerStatus,
};
use pageserver_api::shard::{ShardIdentity, ShardStripeSize, TenantShardId};
use postgres_ffi::PgMajorVersion;
use remote_storage::{DownloadError, GenericRemoteStorage, TimeoutOrCancel};
use remote_timeline_client::index::GcCompactionState;
use remote_timeline_client::manifest::{
@@ -497,7 +498,7 @@ impl WalRedoManager {
lsn: Lsn,
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, wal_decoder::models::record::NeonWalRecord)>,
pg_version: u32,
pg_version: PgMajorVersion,
redo_attempt_type: RedoAttemptType,
) -> Result<bytes::Bytes, walredo::Error> {
match self {
@@ -933,7 +934,7 @@ pub(crate) enum CreateTimelineParams {
pub(crate) struct CreateTimelineParamsBootstrap {
pub(crate) new_timeline_id: TimelineId,
pub(crate) existing_initdb_timeline_id: Option<TimelineId>,
pub(crate) pg_version: u32,
pub(crate) pg_version: PgMajorVersion,
}
/// NB: See comment on [`CreateTimelineIdempotency::Branch`] for why there's no `pg_version` here.
@@ -971,7 +972,7 @@ pub(crate) enum CreateTimelineIdempotency {
/// NB: special treatment, see comment in [`Self`].
FailWithConflict,
Bootstrap {
pg_version: u32,
pg_version: PgMajorVersion,
},
/// NB: branches always have the same `pg_version` as their ancestor.
/// While [`pageserver_api::models::TimelineCreateRequestMode::Branch::pg_version`]
@@ -2541,7 +2542,7 @@ impl TenantShard {
self: &Arc<Self>,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
ctx: &RequestContext,
) -> anyhow::Result<(UninitializedTimeline, RequestContext)> {
anyhow::ensure!(
@@ -2593,7 +2594,7 @@ impl TenantShard {
self: &Arc<Self>,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let (uninit_tl, ctx) = self
@@ -2632,7 +2633,7 @@ impl TenantShard {
self: &Arc<Self>,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
ctx: &RequestContext,
in_memory_layer_desc: Vec<timeline::InMemoryLayerTestDesc>,
delta_layer_desc: Vec<timeline::DeltaLayerTestDesc>,
@@ -2898,7 +2899,7 @@ impl TenantShard {
Lsn(0),
initdb_lsn,
initdb_lsn,
15,
PgMajorVersion::PG15,
);
this.prepare_new_timeline(
new_timeline_id,
@@ -5090,7 +5091,7 @@ impl TenantShard {
pub(crate) async fn bootstrap_timeline_test(
self: &Arc<Self>,
timeline_id: TimelineId,
pg_version: u32,
pg_version: PgMajorVersion,
load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
@@ -5232,7 +5233,7 @@ impl TenantShard {
async fn bootstrap_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
pg_version: u32,
pg_version: PgMajorVersion,
load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext,
) -> Result<CreateTimelineResult, CreateTimelineError> {
@@ -5770,7 +5771,7 @@ impl TenantShard {
async fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Utf8Path,
pg_version: u32,
pg_version: PgMajorVersion,
cancel: &CancellationToken,
) -> Result<(), InitdbError> {
let initdb_bin_path = conf
@@ -6051,7 +6052,7 @@ pub(crate) mod harness {
lsn: Lsn,
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
_pg_version: PgMajorVersion,
_redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, walredo::Error> {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
@@ -6223,7 +6224,7 @@ mod tests {
async fn randomize_timeline(
tenant: &Arc<TenantShard>,
new_timeline_id: TimelineId,
pg_version: u32,
pg_version: PgMajorVersion,
spec: TestTimelineSpecification,
random: &mut rand::rngs::StdRng,
ctx: &RequestContext,

View File

@@ -18,6 +18,7 @@
//! [`IndexPart`]: super::remote_timeline_client::index::IndexPart
use anyhow::ensure;
use postgres_ffi::PgMajorVersion;
use serde::{Deserialize, Serialize};
use utils::bin_ser::{BeSer, SerializeError};
use utils::id::TimelineId;
@@ -136,7 +137,7 @@ struct TimelineMetadataBodyV2 {
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -167,7 +168,7 @@ impl TimelineMetadata {
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> Self {
Self {
hdr: TimelineMetadataHeader {
@@ -215,7 +216,7 @@ impl TimelineMetadata {
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
pg_version: PgMajorVersion::PG14, // All timelines created before this version had pg_version 14
};
hdr.format_version = METADATA_FORMAT_VERSION;
@@ -317,7 +318,7 @@ impl TimelineMetadata {
self.body.initdb_lsn
}
pub fn pg_version(&self) -> u32 {
pub fn pg_version(&self) -> PgMajorVersion {
self.body.pg_version
}
@@ -331,7 +332,7 @@ impl TimelineMetadata {
Lsn::from_hex("00000000").unwrap(),
Lsn::from_hex("00000000").unwrap(),
Lsn::from_hex("00000000").unwrap(),
0,
PgMajorVersion::PG14,
);
let bytes = instance.to_bytes().unwrap();
Self::from_bytes(&bytes).unwrap()
@@ -545,7 +546,7 @@ mod tests {
Lsn(0),
Lsn(0),
Lsn(0),
14, // All timelines created before this version had pg_version 14
PgMajorVersion::PG14, // All timelines created before this version had pg_version 14
);
assert_eq!(
@@ -565,7 +566,7 @@ mod tests {
Lsn(0),
// Updating this version to 17 will cause the test to fail at the
// next assert_eq!().
16,
PgMajorVersion::PG16,
);
let expected_bytes = vec![
/* TimelineMetadataHeader */

View File

@@ -427,8 +427,8 @@ impl GcBlocking {
#[cfg(test)]
mod tests {
use postgres_ffi::PgMajorVersion;
use std::str::FromStr;
use utils::id::TimelineId;
use super::*;
@@ -831,7 +831,7 @@ mod tests {
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
@@ -893,7 +893,7 @@ mod tests {
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: Some(parse_naive_datetime("2023-04-29T09:00:00.123000000")),
@@ -957,7 +957,7 @@ mod tests {
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
@@ -1033,7 +1033,7 @@ mod tests {
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
@@ -1114,7 +1114,7 @@ mod tests {
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
@@ -1199,7 +1199,7 @@ mod tests {
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
@@ -1287,7 +1287,7 @@ mod tests {
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
PgMajorVersion::PG14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),

View File

@@ -1622,11 +1622,6 @@ impl DeltaLayerIterator<'_> {
pub(crate) mod test {
use std::collections::BTreeMap;
use bytes::Bytes;
use itertools::MinMaxResult;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*;
use crate::DEFAULT_PG_VERSION;
use crate::context::DownloadBehavior;
@@ -1636,6 +1631,11 @@ pub(crate) mod test {
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{TenantShard, Timeline};
use bytes::Bytes;
use itertools::MinMaxResult;
use postgres_ffi::PgMajorVersion;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
/// Construct an index for a fictional delta layer and and then
/// traverse in order to plan vectored reads for a query. Finally,
@@ -1995,7 +1995,7 @@ pub(crate) mod test {
let (tenant, ctx) = h.load().await;
let ctx = &ctx;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
.create_test_timeline(TimelineId::generate(), Lsn(0x10), PgMajorVersion::PG14, ctx)
.await
.unwrap();
let ctx = &ctx.with_scope_timeline(&timeline);

View File

@@ -1,6 +1,7 @@
use std::time::UNIX_EPOCH;
use pageserver_api::key::{CONTROLFILE_KEY, Key};
use postgres_ffi::PgMajorVersion;
use tokio::task::JoinSet;
use utils::completion::{self, Completion};
use utils::id::TimelineId;
@@ -45,7 +46,7 @@ async fn smoke_test() {
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
14,
PgMajorVersion::PG14,
&ctx,
Default::default(), // in-memory layers
Default::default(),
@@ -256,7 +257,12 @@ async fn evict_and_wait_on_wanted_deleted() {
let (tenant, ctx) = h.load().await;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.create_test_timeline(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
)
.await
.unwrap();
@@ -341,7 +347,12 @@ fn read_wins_pending_eviction() {
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.create_test_timeline(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
)
.await
.unwrap();
let ctx = ctx.with_scope_timeline(&timeline);
@@ -474,7 +485,12 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.create_test_timeline(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
)
.await
.unwrap();
let ctx = ctx.with_scope_timeline(&timeline);
@@ -644,7 +660,12 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let (tenant, ctx) = h.load().await;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.create_test_timeline(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
)
.await
.unwrap();
let ctx = ctx.with_scope_timeline(&timeline);
@@ -730,7 +751,12 @@ async fn evict_and_wait_does_not_wait_for_download() {
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.create_test_timeline(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
)
.await
.unwrap();
let ctx = ctx.with_scope_timeline(&timeline);
@@ -836,7 +862,12 @@ async fn eviction_cancellation_on_drop() {
let (tenant, ctx) = h.load().await;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.create_test_timeline(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
)
.await
.unwrap();

View File

@@ -58,7 +58,7 @@ use pageserver_api::reltag::{BlockNumber, RelTag};
use pageserver_api::shard::{ShardIdentity, ShardIndex, ShardNumber, TenantShardId};
use postgres_connection::PgConnectionConfig;
use postgres_ffi::v14::xlog_utils;
use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
use postgres_ffi::{PgMajorVersion, WAL_SEGMENT_SIZE, to_pg_timestamp};
use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
@@ -225,7 +225,7 @@ pub struct Timeline {
/// to shards, and is constant through the lifetime of this Timeline.
shard_identity: ShardIdentity,
pub pg_version: u32,
pub pg_version: PgMajorVersion,
/// The tuple has two elements.
/// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
@@ -2913,7 +2913,7 @@ impl Timeline {
shard_identity: ShardIdentity,
walredo_mgr: Option<Arc<super::WalRedoManager>>,
resources: TimelineResources,
pg_version: u32,
pg_version: PgMajorVersion,
state: TimelineState,
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
create_idempotency: crate::tenant::CreateTimelineIdempotency,
@@ -7593,6 +7593,7 @@ mod tests {
use std::sync::Arc;
use pageserver_api::key::Key;
use postgres_ffi::PgMajorVersion;
use std::iter::Iterator;
use tracing::Instrument;
use utils::id::TimelineId;
@@ -7667,7 +7668,7 @@ mod tests {
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
14,
PgMajorVersion::PG14,
&ctx,
Vec::new(), // in-memory layers
delta_layers,
@@ -7803,7 +7804,7 @@ mod tests {
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
14,
PgMajorVersion::PG14,
&ctx,
Vec::new(), // in-memory layers
delta_layers,
@@ -7863,7 +7864,12 @@ mod tests {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.create_test_timeline(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
)
.await
.unwrap();

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Context;
use bytes::Bytes;
use postgres_ffi::ControlFileData;
use postgres_ffi::{ControlFileData, PgMajorVersion};
use remote_storage::{
Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
ListingObject, RemotePath, RemoteStorageConfig,
@@ -264,7 +264,7 @@ impl ControlFile {
pub(crate) fn base_lsn(&self) -> Lsn {
Lsn(self.control_file_data.checkPoint).align()
}
pub(crate) fn pg_version(&self) -> u32 {
pub(crate) fn pg_version(&self) -> PgMajorVersion {
self.try_pg_version()
.expect("prepare() checks that try_pg_version doesn't error")
}
@@ -274,13 +274,14 @@ impl ControlFile {
pub(crate) fn control_file_buf(&self) -> &Bytes {
&self.control_file_buf
}
fn try_pg_version(&self) -> anyhow::Result<u32> {
fn try_pg_version(&self) -> anyhow::Result<PgMajorVersion> {
Ok(match self.control_file_data.catalog_version_no {
// thesea are from catversion.h
202107181 => 14,
202209061 => 15,
202307071 => 16,
202406281 => 17,
202107181 => PgMajorVersion::PG14,
202209061 => PgMajorVersion::PG15,
202307071 => PgMajorVersion::PG16,
202406281 => PgMajorVersion::PG17,
catversion => {
anyhow::bail!("unrecognized catalog version {catversion}")
}

View File

@@ -275,12 +275,20 @@ pub(super) async fn handle_walreceiver_connection(
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
let walingest_res = select! {
walingest_res = walingest_future => walingest_res,
_ = cancellation.cancelled() => {
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
debug!("Connection cancelled");
return Err(WalReceiverError::Cancelled);
},
};
let mut walingest = walingest_res.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let (format, compression) = match protocol {
PostgresClientProtocol::Interpreted {

View File

@@ -32,8 +32,8 @@ use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::walrecord::*;
use postgres_ffi::{
TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
fsm_logical_to_physical, pg_constants,
PgMajorVersion, TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion,
enum_pgversion_dispatch, fsm_logical_to_physical, pg_constants,
};
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use tracing::*;
@@ -781,7 +781,7 @@ impl WalIngest {
) -> Result<(), WalIngestError> {
let (xact_common, is_commit, is_prepared) = match record {
XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
let xid: u64 = if modification.tline.pg_version >= 17 {
let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
self.adjust_to_full_transaction_id(xl_xid)?
} else {
xl_xid as u64
@@ -886,7 +886,7 @@ impl WalIngest {
xl_xid, parsed.xid, lsn,
);
let xid: u64 = if modification.tline.pg_version >= 17 {
let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
self.adjust_to_full_transaction_id(parsed.xid)?
} else {
parsed.xid as u64
@@ -1241,7 +1241,7 @@ impl WalIngest {
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let oldest_active_xid = if pg_version >= 17 {
let oldest_active_xid = if pg_version >= PgMajorVersion::PG17 {
let mut oldest_active_full_xid = cp.nextXid.value;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if xid < oldest_active_full_xid {
@@ -1475,10 +1475,11 @@ impl WalIngest {
const fn rate_limiter(
&self,
pg_version: u32,
pg_version: PgMajorVersion,
) -> Option<&Lazy<Mutex<RateLimit>>> {
const MIN_PG_VERSION: u32 = 14;
const MAX_PG_VERSION: u32 = 17;
const MIN_PG_VERSION: u32 = PgMajorVersion::PG14.major_version_num();
const MAX_PG_VERSION: u32 = PgMajorVersion::PG17.major_version_num();
let pg_version = pg_version.major_version_num();
if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
return None;
@@ -1603,6 +1604,7 @@ async fn get_relsize(
#[cfg(test)]
mod tests {
use anyhow::Result;
use postgres_ffi::PgMajorVersion;
use postgres_ffi::RELSEG_SIZE;
use super::*;
@@ -1625,7 +1627,7 @@ mod tests {
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
for i in 14..=16 {
for i in PgMajorVersion::ALL {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
});
@@ -2335,7 +2337,7 @@ mod tests {
// 5. Grep sk logs for "restart decoder" to get startpoint
// 6. Run just the decoder from this test to get the endpoint.
// It's the last LSN the decoder will output.
let pg_version = 15; // The test data was generated by pg15
let pg_version = PgMajorVersion::PG15; // The test data was generated by pg15
let path = "test_data/sk_wal_segment_from_pgbench";
let wal_segment_path = format!("{path}/000000010000000000000001.zst");
let source_initdb_path = format!("{path}/{INITDB_PATH}");

View File

@@ -33,6 +33,7 @@ use bytes::{Bytes, BytesMut};
use pageserver_api::key::Key;
use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::PgMajorVersion;
use tracing::*;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
@@ -165,7 +166,7 @@ impl PostgresRedoManager {
lsn: Lsn,
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
pg_version: PgMajorVersion,
redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, Error> {
if records.is_empty() {
@@ -232,7 +233,7 @@ impl PostgresRedoManager {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn ping(&self, pg_version: u32) -> Result<(), Error> {
pub async fn ping(&self, pg_version: PgMajorVersion) -> Result<(), Error> {
self.do_with_walredo_process(pg_version, |proc| async move {
proc.ping(Duration::from_secs(1))
.await
@@ -342,7 +343,7 @@ impl PostgresRedoManager {
O,
>(
&self,
pg_version: u32,
pg_version: PgMajorVersion,
closure: F,
) -> Result<O, Error> {
let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
@@ -442,7 +443,7 @@ impl PostgresRedoManager {
base_img_lsn: Lsn,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
pg_version: PgMajorVersion,
max_retry_attempts: u32,
) -> Result<Bytes, Error> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
@@ -572,6 +573,7 @@ mod tests {
use bytes::Bytes;
use pageserver_api::key::Key;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::PgMajorVersion;
use tracing::Instrument;
use utils::id::TenantId;
use utils::lsn::Lsn;
@@ -586,7 +588,7 @@ mod tests {
let h = RedoHarness::new().unwrap();
h.manager
.ping(14)
.ping(PgMajorVersion::PG14)
.instrument(h.span())
.await
.expect("ping should work");
@@ -612,7 +614,7 @@ mod tests {
Lsn::from_str("0/16E2408").unwrap(),
None,
short_records(),
14,
PgMajorVersion::PG14,
RedoAttemptType::ReadPage,
)
.instrument(h.span())
@@ -641,7 +643,7 @@ mod tests {
Lsn::from_str("0/16E2408").unwrap(),
None,
short_records(),
14,
PgMajorVersion::PG14,
RedoAttemptType::ReadPage,
)
.instrument(h.span())
@@ -663,7 +665,7 @@ mod tests {
Lsn::INVALID,
None,
short_records(),
16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
PgMajorVersion::PG16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
RedoAttemptType::ReadPage,
)
.instrument(h.span())

View File

@@ -12,7 +12,7 @@ use anyhow::Context;
use bytes::Bytes;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use postgres_ffi::{BLCKSZ, PgMajorVersion};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{Instrument, debug, error, instrument};
use utils::lsn::Lsn;
@@ -54,11 +54,11 @@ impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
#[instrument(skip_all,fields(pg_version=pg_version.major_version_num()))]
pub(crate) fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();

View File

@@ -6,7 +6,6 @@ use std::collections::BTreeMap;
use std::pin::pin;
use std::sync::Mutex;
use futures::future::Either;
use scopeguard::ScopeGuard;
use tokio::sync::oneshot::error::TryRecvError;
@@ -49,37 +48,67 @@ impl<P: QueueProcessing> BatchQueue<P> {
}
}
pub async fn call(&self, req: P::Req) -> P::Res {
/// Perform a single request-response process, this may be batched internally.
///
/// This function is not cancel safe.
pub async fn call<R>(
&self,
req: P::Req,
cancelled: impl Future<Output = R>,
) -> Result<P::Res, R> {
let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req);
let guard = scopeguard::guard(id, move |id| {
let mut inner = self.inner.lock_propagate_poison();
if inner.queue.remove(&id).is_some() {
tracing::debug!("batched task cancelled before completion");
}
});
let mut cancelled = pin!(cancelled);
let resp = loop {
// try become the leader, or try wait for success.
let mut processor = match futures::future::select(rx, pin!(self.processor.lock())).await
{
// we got the resp.
Either::Left((resp, _)) => break resp.ok(),
// we are the leader.
Either::Right((p, rx_)) => {
rx = rx_;
p
}
let mut processor = tokio::select! {
// try become leader.
p = self.processor.lock() => p,
// wait for success.
resp = &mut rx => break resp.ok(),
// wait for cancellation.
cancel = cancelled.as_mut() => {
let mut inner = self.inner.lock_propagate_poison();
if inner.queue.remove(&id).is_some() {
tracing::warn!("batched task cancelled before completion");
}
return Err(cancel);
},
};
tracing::debug!(id, "batch: became leader");
let (reqs, resps) = self.inner.lock_propagate_poison().get_batch(&processor);
// snitch incase the task gets cancelled.
let cancel_safety = scopeguard::guard((), |()| {
if !std::thread::panicking() {
tracing::error!(
id,
"batch: leader cancelled, despite not being cancellation safe"
);
}
});
// apply a batch.
// if this is cancelled, jobs will not be completed and will panic.
let values = processor.apply(reqs).await;
// good: we didn't get cancelled.
ScopeGuard::into_inner(cancel_safety);
if values.len() != resps.len() {
tracing::error!(
"batch: invalid response size, expected={}, got={}",
resps.len(),
values.len()
);
}
// send response values.
for (tx, value) in std::iter::zip(resps, values) {
// sender hung up but that's fine.
drop(tx.send(value));
if tx.send(value).is_err() {
// receiver hung up but that's fine.
}
}
match rx.try_recv() {
@@ -98,10 +127,9 @@ impl<P: QueueProcessing> BatchQueue<P> {
}
};
// already removed.
ScopeGuard::into_inner(guard);
tracing::debug!(id, "batch: job completed");
resp.expect("no response found. batch processer should not panic")
Ok(resp.expect("no response found. batch processer should not panic"))
}
}
@@ -125,6 +153,8 @@ impl<P: QueueProcessing> BatchQueueInner<P> {
self.queue.insert(id, BatchJob { req, res: tx });
tracing::debug!(id, "batch: registered job in the queue");
(id, rx)
}
@@ -132,15 +162,19 @@ impl<P: QueueProcessing> BatchQueueInner<P> {
let batch_size = p.batch_size(self.queue.len());
let mut reqs = Vec::with_capacity(batch_size);
let mut resps = Vec::with_capacity(batch_size);
let mut ids = Vec::with_capacity(batch_size);
while reqs.len() < batch_size {
let Some((_, job)) = self.queue.pop_first() else {
let Some((id, job)) = self.queue.pop_first() else {
break;
};
reqs.push(job.req);
resps.push(job.res);
ids.push(id);
}
tracing::debug!(ids=?ids, "batch: acquired jobs");
(reqs, resps)
}
}

View File

@@ -1,5 +1,6 @@
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::pin::pin;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
@@ -98,7 +99,6 @@ impl Pipeline {
impl CancelKeyOp {
fn register(&self, pipe: &mut Pipeline) {
#[allow(clippy::used_underscore_binding)]
match self {
CancelKeyOp::StoreCancelKey { key, value, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
@@ -224,6 +224,7 @@ impl CancellationHandler {
}
}
/// This is not cancel safe
async fn get_cancel_key(
&self,
key: CancelKeyData,
@@ -240,16 +241,21 @@ impl CancellationHandler {
};
const TIMEOUT: Duration = Duration::from_secs(5);
let result = timeout(TIMEOUT, tx.call((guard, op)))
.await
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
.map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
CancelError::InternalError
})?;
let result = timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
// cannot be cancelled
.unwrap_or_else(|x| match x {})
.map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
CancelError::InternalError
})?;
let cancel_state_str = String::from_owned_redis_value(result).map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
@@ -271,6 +277,8 @@ impl CancellationHandler {
/// Will fetch IP allowlist internally.
///
/// return Result primarily for tests
///
/// This is not cancel safe
pub(crate) async fn cancel_session<T: ControlPlaneApi>(
&self,
key: CancelKeyData,
@@ -394,6 +402,8 @@ impl Session {
/// Ensure the cancel key is continously refreshed,
/// but stop when the channel is dropped.
///
/// This is not cancel safe
pub(crate) async fn maintain_cancel_key(
&self,
session_id: uuid::Uuid,
@@ -401,27 +411,6 @@ impl Session {
cancel_closure: &CancelClosure,
compute_config: &ComputeConfig,
) {
futures::future::select(
std::pin::pin!(self.maintain_redis_cancel_key(cancel_closure)),
cancel,
)
.await;
if let Err(err) = cancel_closure
.try_cancel_query(compute_config)
.boxed()
.await
{
tracing::warn!(
?session_id,
?err,
"could not cancel the query in the database"
);
}
}
// Ensure the cancel key is continously refreshed.
async fn maintain_redis_cancel_key(&self, cancel_closure: &CancelClosure) -> ! {
let Some(tx) = self.cancellation_handler.tx.get() else {
tracing::warn!("cancellation handler is not available");
// don't exit, as we only want to exit if cancelled externally.
@@ -432,6 +421,8 @@ impl Session {
.expect("serialising to json string should not fail")
.into_boxed_str();
let mut cancel = pin!(cancel);
loop {
let guard = Metrics::get()
.proxy
@@ -449,9 +440,35 @@ impl Session {
"registering cancellation key"
);
if tx.call((guard, op)).await.is_ok() {
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
match tx.call((guard, op), cancel.as_mut()).await {
Ok(Ok(_)) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registered cancellation key"
);
// wait before continuing.
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
}
// retry immediately.
Ok(Err(error)) => {
tracing::warn!(?error, "error registering cancellation key");
}
Err(Err(_cancelled)) => break,
}
}
if let Err(err) = cancel_closure
.try_cancel_query(compute_config)
.boxed()
.await
{
tracing::warn!(
?session_id,
?err,
"could not cancel the query in the database"
);
}
}
}

View File

@@ -6,7 +6,7 @@ use std::net::{IpAddr, SocketAddr};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use postgres_client::config::{AuthKeys, SslMode};
use postgres_client::config::{AuthKeys, ChannelBinding, SslMode};
use postgres_client::maybe_tls_stream::MaybeTlsStream;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{NoTls, RawCancelToken, RawConnection};
@@ -33,12 +33,51 @@ use crate::types::Host;
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
#[derive(Debug, Error)]
pub(crate) enum ConnectionError {
pub(crate) enum PostgresError {
/// This error doesn't seem to reveal any secrets; for instance,
/// `postgres_client::error::Kind` doesn't contain ip addresses and such.
#[error("{COULD_NOT_CONNECT}: {0}")]
Postgres(#[from] postgres_client::Error),
}
impl UserFacingError for PostgresError {
fn to_string_client(&self) -> String {
match self {
// This helps us drop irrelevant library-specific prefixes.
// TODO: propagate severity level and other parameters.
PostgresError::Postgres(err) => match err.as_db_error() {
Some(err) => {
let msg = err.message();
if msg.starts_with("unsupported startup parameter: ")
|| msg.starts_with("unsupported startup parameter in options: ")
{
format!(
"{msg}. Please use unpooled connection or remove this parameter from the startup package. More details: https://neon.tech/docs/connect/connection-errors#unsupported-startup-parameter"
)
} else {
msg.to_owned()
}
}
None => err.to_string(),
},
}
}
}
impl ReportableError for PostgresError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
PostgresError::Postgres(e) if e.as_db_error().is_some() => {
crate::error::ErrorKind::Postgres
}
PostgresError::Postgres(_) => crate::error::ErrorKind::Compute,
}
}
}
#[derive(Debug, Error)]
pub(crate) enum ConnectionError {
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] TlsError),
@@ -52,22 +91,6 @@ pub(crate) enum ConnectionError {
impl UserFacingError for ConnectionError {
fn to_string_client(&self) -> String {
match self {
// This helps us drop irrelevant library-specific prefixes.
// TODO: propagate severity level and other parameters.
ConnectionError::Postgres(err) => match err.as_db_error() {
Some(err) => {
let msg = err.message();
if msg.starts_with("unsupported startup parameter: ")
|| msg.starts_with("unsupported startup parameter in options: ")
{
format!("{msg}. Please use unpooled connection or remove this parameter from the startup package. More details: https://neon.tech/docs/connect/connection-errors#unsupported-startup-parameter")
} else {
msg.to_owned()
}
}
None => err.to_string(),
},
ConnectionError::WakeComputeError(err) => err.to_string_client(),
ConnectionError::TooManyConnectionAttempts(_) => {
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
@@ -80,10 +103,6 @@ impl UserFacingError for ConnectionError {
impl ReportableError for ConnectionError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ConnectionError::Postgres(e) if e.as_db_error().is_some() => {
crate::error::ErrorKind::Postgres
}
ConnectionError::Postgres(_) => crate::error::ErrorKind::Compute,
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(),
@@ -110,6 +129,8 @@ pub(crate) struct AuthInfo {
auth: Option<Auth>,
server_params: StartupMessageParams,
channel_binding: ChannelBinding,
/// Console redirect sets user and database, we shouldn't re-use those from the params.
skip_db_user: bool,
}
@@ -133,6 +154,8 @@ impl AuthInfo {
auth: pw.map(|pw| Auth::Password(pw.as_bytes().to_owned())),
server_params,
skip_db_user: true,
// pg-sni-router is a mitm so this would fail.
channel_binding: ChannelBinding::Disable,
}
}
@@ -146,6 +169,7 @@ impl AuthInfo {
},
server_params: StartupMessageParams::default(),
skip_db_user: false,
channel_binding: ChannelBinding::Prefer,
}
}
}
@@ -168,6 +192,7 @@ impl AuthInfo {
Some(Auth::Password(pw)) => config.password(pw),
None => &mut config,
};
config.channel_binding(self.channel_binding);
for (k, v) in self.server_params.iter() {
config.set_param(k, v);
}
@@ -206,6 +231,56 @@ impl AuthInfo {
}
}
}
pub async fn authenticate(
&self,
ctx: &RequestContext,
compute: &mut ComputeConnection,
user_info: ComputeUserInfo,
) -> Result<PostgresSettings, PostgresError> {
// client config with stubbed connect info.
// TODO(conrad): should we rewrite this to bypass tokio-postgres2 entirely,
// utilising pqproto.rs.
let mut tmp_config = postgres_client::Config::new(String::new(), 0);
// We have already established SSL if necessary.
tmp_config.ssl_mode(SslMode::Disable);
let tmp_config = self.enrich(tmp_config);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let connection = tmp_config
.tls_and_authenticate(&mut compute.stream, NoTls)
.await?;
drop(pause);
let RawConnection {
stream: _,
parameters,
delayed_notice,
process_id,
secret_key,
} = connection;
tracing::Span::current().record("pid", tracing::field::display(process_id));
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(
compute.socket_addr,
RawCancelToken {
ssl_mode: compute.ssl_mode,
process_id,
secret_key,
},
compute.hostname.to_string(),
user_info,
);
Ok(PostgresSettings {
params: parameters,
cancel_closure,
delayed_notice,
})
}
}
impl ConnectInfo {
@@ -268,51 +343,42 @@ impl ConnectInfo {
pub type RustlsStream = <ComputeConfig as MakeTlsConnect<tokio::net::TcpStream>>::Stream;
pub type MaybeRustlsStream = MaybeTlsStream<tokio::net::TcpStream, RustlsStream>;
pub(crate) struct PostgresConnection {
/// Socket connected to a compute node.
pub(crate) stream: MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
// TODO(conrad): we don't need to parse these.
// These are just immediately forwarded back to the client.
// We could instead stream them out instead of reading them into memory.
pub struct PostgresSettings {
/// PostgreSQL connection parameters.
pub(crate) params: std::collections::HashMap<String, String>,
pub params: std::collections::HashMap<String, String>,
/// Query cancellation token.
pub(crate) cancel_closure: CancelClosure,
/// Labels for proxy's metrics.
pub(crate) aux: MetricsAuxInfo,
pub cancel_closure: CancelClosure,
/// Notices received from compute after authenticating
pub(crate) delayed_notice: Vec<NoticeResponseBody>,
pub delayed_notice: Vec<NoticeResponseBody>,
}
pub(crate) guage: NumDbConnectionsGuard<'static>,
pub struct ComputeConnection {
/// Socket connected to a compute node.
pub stream: MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
/// Labels for proxy's metrics.
pub aux: MetricsAuxInfo,
pub hostname: Host,
pub ssl_mode: SslMode,
pub socket_addr: SocketAddr,
pub guage: NumDbConnectionsGuard<'static>,
}
impl ConnectInfo {
/// Connect to a corresponding compute node.
pub(crate) async fn connect(
pub async fn connect(
&self,
ctx: &RequestContext,
aux: MetricsAuxInfo,
auth: &AuthInfo,
aux: &MetricsAuxInfo,
config: &ComputeConfig,
user_info: ComputeUserInfo,
) -> Result<PostgresConnection, ConnectionError> {
let mut tmp_config = auth.enrich(self.to_postgres_client_config());
// we setup SSL early in `ConnectInfo::connect_raw`.
tmp_config.ssl_mode(SslMode::Disable);
) -> Result<ComputeConnection, ConnectionError> {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream) = self.connect_raw(config).await?;
let connection = tmp_config.connect_raw(stream, NoTls).await?;
drop(pause);
let RawConnection {
stream,
parameters,
delayed_notice,
process_id,
secret_key,
} = connection;
tracing::Span::current().record("pid", tracing::field::display(process_id));
tracing::Span::current().record("compute_id", tracing::field::display(&aux.compute_id));
let MaybeTlsStream::Raw(stream) = stream.into_inner();
// TODO: lots of useful info but maybe we can move it elsewhere (eg traces?)
info!(
@@ -324,25 +390,12 @@ impl ConnectInfo {
ctx.get_testodrome_id().unwrap_or_default(),
);
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(
socket_addr,
RawCancelToken {
ssl_mode: self.ssl_mode,
process_id,
secret_key,
},
self.host.to_string(),
user_info,
);
let connection = PostgresConnection {
let connection = ComputeConnection {
stream,
params: parameters,
delayed_notice,
cancel_closure,
aux,
socket_addr,
hostname: self.host.clone(),
ssl_mode: self.ssl_mode,
aux: aux.clone(),
guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()),
};

View File

@@ -218,11 +218,9 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
};
auth_info.set_startup_params(&params, true);
let node = connect_to_compute(
let mut node = connect_to_compute(
ctx,
&TcpMechanism {
user_info,
auth: auth_info,
locks: &config.connect_compute_locks,
},
&node_info,
@@ -232,9 +230,14 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) })
.await?;
let pg_settings = auth_info
.authenticate(ctx, &mut node, user_info)
.or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) })
.await?;
let session = cancellation_handler.get_key();
prepare_client_connection(&node, *session.key(), &mut stream);
prepare_client_connection(&pg_settings, *session.key(), &mut stream);
let stream = stream.flush_and_into_inner().await?;
let session_id = ctx.session_id();
@@ -244,7 +247,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.maintain_cancel_key(
session_id,
cancel,
&node.cancel_closure,
&pg_settings.cancel_closure,
&config.connect_to_compute,
)
.await;

View File

@@ -76,13 +76,9 @@ impl NodeInfo {
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
auth: &compute::AuthInfo,
config: &ComputeConfig,
user_info: ComputeUserInfo,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.conn_info
.connect(ctx, self.aux.clone(), auth, config, user_info)
.await
) -> Result<compute::ComputeConnection, compute::ConnectionError> {
self.conn_info.connect(ctx, &self.aux, config).await
}
}

View File

@@ -2,8 +2,7 @@ use async_trait::async_trait;
use tokio::time;
use tracing::{debug, info, warn};
use crate::auth::backend::ComputeUserInfo;
use crate::compute::{self, AuthInfo, COULD_NOT_CONNECT, PostgresConnection};
use crate::compute::{self, COULD_NOT_CONNECT, ComputeConnection};
use crate::config::{ComputeConfig, RetryConfig};
use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError;
@@ -50,15 +49,13 @@ pub(crate) trait ConnectMechanism {
}
pub(crate) struct TcpMechanism {
pub(crate) auth: AuthInfo,
/// connect_to_compute concurrency lock
pub(crate) locks: &'static ApiLocks<Host>,
pub(crate) user_info: ComputeUserInfo,
}
#[async_trait]
impl ConnectMechanism for TcpMechanism {
type Connection = PostgresConnection;
type Connection = ComputeConnection;
type ConnectError = compute::ConnectionError;
type Error = compute::ConnectionError;
@@ -71,13 +68,9 @@ impl ConnectMechanism for TcpMechanism {
ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo,
config: &ComputeConfig,
) -> Result<PostgresConnection, Self::Error> {
) -> Result<ComputeConnection, Self::Error> {
let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
permit.release_result(
node_info
.connect(ctx, &self.auth, config, self.user_info.clone())
.await,
)
permit.release_result(node_info.connect(ctx, config).await)
}
}

View File

@@ -357,24 +357,28 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
let res = connect_to_compute(
ctx,
&TcpMechanism {
user_info: creds.info.clone(),
auth: auth_info,
locks: &config.connect_compute_locks,
},
&auth::Backend::ControlPlane(cplane, creds.info),
&auth::Backend::ControlPlane(cplane, creds.info.clone()),
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.await;
let node = match res {
let mut node = match res {
Ok(node) => node,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
};
let pg_settings = auth_info.authenticate(ctx, &mut node, creds.info).await;
let pg_settings = match pg_settings {
Ok(pg_settings) => pg_settings,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
};
let session = cancellation_handler.get_key();
prepare_client_connection(&node, *session.key(), &mut stream);
prepare_client_connection(&pg_settings, *session.key(), &mut stream);
let stream = stream.flush_and_into_inner().await?;
let session_id = ctx.session_id();
@@ -384,7 +388,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
.maintain_cancel_key(
session_id,
cancel,
&node.cancel_closure,
&pg_settings.cancel_closure,
&config.connect_to_compute,
)
.await;
@@ -413,19 +417,19 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
/// Finish client connection initialization: confirm auth success, send params, etc.
pub(crate) fn prepare_client_connection(
node: &compute::PostgresConnection,
settings: &compute::PostgresSettings,
cancel_key_data: CancelKeyData,
stream: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) {
// Forward all deferred notices to the client.
for notice in &node.delayed_notice {
for notice in &settings.delayed_notice {
stream.write_raw(notice.as_bytes().len(), b'N', |buf| {
buf.extend_from_slice(notice.as_bytes());
});
}
// Forward all postgres connection params to the client.
for (name, value) in &node.params {
for (name, value) in &settings.params {
stream.write_message(BeMessage::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),

View File

@@ -99,7 +99,6 @@ impl ShouldRetryWakeCompute for postgres_client::Error {
impl CouldRetry for compute::ConnectionError {
fn could_retry(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.could_retry(),
compute::ConnectionError::TlsError(err) => err.could_retry(),
compute::ConnectionError::WakeComputeError(err) => err.could_retry(),
compute::ConnectionError::TooManyConnectionAttempts(_) => false,
@@ -109,7 +108,6 @@ impl CouldRetry for compute::ConnectionError {
impl ShouldRetryWakeCompute for compute::ConnectionError {
fn should_retry_wake_compute(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.should_retry_wake_compute(),
// the cache entry was not checked for validity
compute::ConnectionError::TooManyConnectionAttempts(_) => false,
_ => true,

View File

@@ -169,7 +169,7 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
.dbname("db")
.password("password")
.ssl_mode(SslMode::Require)
.connect_raw(server, client_config.make_tls_connect()?)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -252,7 +252,7 @@ async fn connect_failure(
.dbname("db")
.password("password")
.ssl_mode(SslMode::Require)
.connect_raw(server, client_config.make_tls_connect()?)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.await
.err()
.context("client shouldn't be able to connect")?;

View File

@@ -199,7 +199,7 @@ async fn handshake_tls_is_enforced_by_proxy() -> anyhow::Result<()> {
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Disable)
.connect_raw(server, NoTls)
.tls_and_authenticate(server, NoTls)
.await
.err() // -> Option<E>
.context("client shouldn't be able to connect")?;
@@ -228,7 +228,7 @@ async fn handshake_tls() -> anyhow::Result<()> {
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Require)
.connect_raw(server, client_config.make_tls_connect()?)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -245,7 +245,7 @@ async fn handshake_raw() -> anyhow::Result<()> {
.dbname("earth")
.set_param("options", "project=generic-project-name")
.ssl_mode(SslMode::Prefer)
.connect_raw(server, NoTls)
.tls_and_authenticate(server, NoTls)
.await?;
proxy.await?
@@ -293,7 +293,7 @@ async fn scram_auth_good(#[case] password: &str) -> anyhow::Result<()> {
.dbname("db")
.password(password)
.ssl_mode(SslMode::Require)
.connect_raw(server, client_config.make_tls_connect()?)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -317,7 +317,7 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
.dbname("db")
.password("password")
.ssl_mode(SslMode::Require)
.connect_raw(server, client_config.make_tls_connect()?)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -344,7 +344,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
.dbname("db")
.password(&password) // no password will match the mocked secret
.ssl_mode(SslMode::Require)
.connect_raw(server, client_config.make_tls_connect()?)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.await
.err() // -> Option<E>
.context("client shouldn't be able to connect")?;

View File

@@ -23,9 +23,8 @@ impl KeyPrefix {
#[cfg(test)]
mod tests {
use crate::pqproto::id_to_cancel_key;
use super::*;
use crate::pqproto::id_to_cancel_key;
#[test]
fn test_build_redis_key() {

View File

@@ -399,7 +399,7 @@ async fn collect_metrics_iteration(
fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
format!(
"year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
"year={year:04}/month={month:02}/day={day:02}/hour={hour:02}/{hour:02}:{minute:02}:{second:02}Z",
year = now.year(),
month = now.month(),
day = now.day(),
@@ -461,7 +461,7 @@ async fn upload_backup_events(
real_now.second().into(),
real_now.nanosecond(),
));
let path = format!("{path_prefix}_{id}.json.gz");
let path = format!("{path_prefix}_{id}.ndjson.gz");
let remote_path = match RemotePath::from_string(&path) {
Ok(remote_path) => remote_path,
Err(e) => {
@@ -471,9 +471,12 @@ async fn upload_backup_events(
// TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
// Use sync compression in blocking threadpool.
let data = serde_json::to_vec(chunk).context("serialize metrics")?;
let mut encoder = GzipEncoder::new(Vec::new());
encoder.write_all(&data).await.context("compress metrics")?;
for event in chunk.events.iter() {
let data = serde_json::to_vec(event).context("serialize metrics")?;
encoder.write_all(&data).await.context("compress metrics")?;
encoder.write_all(b"\n").await.context("compress metrics")?;
}
encoder.shutdown().await.context("compress metrics")?;
let compressed_data: Bytes = encoder.get_ref().clone().into();
backoff::retry(
@@ -499,7 +502,7 @@ async fn upload_backup_events(
#[cfg(test)]
mod tests {
use std::fs;
use std::io::BufReader;
use std::io::{BufRead, BufReader};
use std::sync::{Arc, Mutex};
use anyhow::Error;
@@ -673,11 +676,22 @@ mod tests {
{
let path = local_fs_path.join(&path_prefix).to_string();
if entry.path().to_str().unwrap().starts_with(&path) {
let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
BufReader::new(fs::File::open(entry.into_path()).unwrap()),
))
.unwrap();
stored_chunks.push(chunk);
let file = fs::File::open(entry.into_path()).unwrap();
let decoder = flate2::bufread::GzDecoder::new(BufReader::new(file));
let reader = BufReader::new(decoder);
let mut events: Vec<Event<Extra, String>> = Vec::new();
for line in reader.lines() {
let line = line.unwrap();
let event: Event<Extra, String> = serde_json::from_str(&line).unwrap();
events.push(event);
}
let report = Report {
events: Cow::Owned(events),
};
stored_chunks.push(report);
}
}
storage_test_dir.close().ok();

View File

@@ -58,6 +58,7 @@ metrics.workspace = true
pem.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
postgres_versioninfo.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
safekeeper_api.workspace = true

View File

@@ -18,7 +18,8 @@ use metrics::set_build_info_metric;
use remote_storage::RemoteStorageConfig;
use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
@@ -138,6 +139,11 @@ struct Args {
/// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
#[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
max_offloader_lag: u64,
/* BEGIN_HADRON */
/// Safekeeper will re-elect a new offloader if the current backup lagging for more than this value in bytes
#[arg(long, default_value_t = DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES)]
max_reelect_offloader_lag_bytes: u64,
/* END_HADRON */
/// Number of max parallel WAL segments to be offloaded to remote storage.
#[arg(long, default_value = "5")]
wal_backup_parallel_jobs: usize,
@@ -391,6 +397,9 @@ async fn main() -> anyhow::Result<()> {
peer_recovery_enabled: args.peer_recovery,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,
/* BEGIN_HADRON */
max_reelect_offloader_lag_bytes: args.max_reelect_offloader_lag_bytes,
/* END_HADRON */
wal_backup_enabled: !args.disable_wal_backup,
backup_parallel_jobs: args.wal_backup_parallel_jobs,
pg_auth,

View File

@@ -2,6 +2,7 @@
use std::vec;
use anyhow::{Result, bail};
use postgres_versioninfo::PgVersionId;
use pq_proto::SystemId;
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
use safekeeper_api::{ServerInfo, Term};
@@ -46,7 +47,7 @@ struct SafeKeeperStateV1 {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerInfoV2 {
/// Postgres server version
pub pg_version: u32,
pub pg_version: PgVersionId,
pub system_id: SystemId,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
@@ -75,7 +76,7 @@ pub struct SafeKeeperStateV2 {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerInfoV3 {
/// Postgres server version
pub pg_version: u32,
pub pg_version: PgVersionId,
pub system_id: SystemId,
#[serde(with = "hex")]
pub tenant_id: TenantId,
@@ -444,13 +445,13 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
} else if version == 6 {
info!("reading safekeeper control file version {}", version);
let mut oldstate = TimelinePersistentState::des(&buf[..buf.len()])?;
if oldstate.server.pg_version != 0 {
if oldstate.server.pg_version != PgVersionId::UNKNOWN {
return Ok(oldstate);
}
// set pg_version to the default v14
info!("setting pg_version to 140005");
oldstate.server.pg_version = 140005;
oldstate.server.pg_version = PgVersionId::from_full_pg_version(140005);
return Ok(oldstate);
} else if version == 7 {
@@ -547,6 +548,7 @@ pub fn downgrade_v10_to_v9(state: &TimelinePersistentState) -> TimelinePersisten
mod tests {
use std::str::FromStr;
use postgres_versioninfo::PgMajorVersion;
use utils::Hex;
use utils::id::NodeId;
@@ -563,7 +565,7 @@ mod tests {
epoch: 43,
},
server: ServerInfoV2 {
pg_version: 14,
pg_version: PgVersionId::from(PgMajorVersion::PG14),
system_id: 0x1234567887654321,
tenant_id,
timeline_id,
@@ -586,8 +588,8 @@ mod tests {
0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// epoch
0x2b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// pg_version
0x0e, 0x00, 0x00, 0x00,
// pg_version = 140000
0xE0, 0x22, 0x02, 0x00,
// system_id
0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
// tenant_id
@@ -626,7 +628,7 @@ mod tests {
}]),
},
server: ServerInfoV2 {
pg_version: 14,
pg_version: PgVersionId::from(PgMajorVersion::PG14),
system_id: 0x1234567887654321,
tenant_id,
timeline_id,
@@ -646,7 +648,7 @@ mod tests {
let expected = [
0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x0e, 0x00, 0x00, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56,
0x00, 0x00, 0x00, 0x00, 0xE0, 0x22, 0x02, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56,
0x34, 0x12, 0xcf, 0x04, 0x80, 0x92, 0x97, 0x07, 0xee, 0x75, 0x37, 0x23, 0x37, 0xef,
0xaa, 0x5e, 0xcf, 0x96, 0x11, 0x2d, 0xed, 0x66, 0x42, 0x2a, 0xa5, 0xe9, 0x53, 0xe5,
0x44, 0x0f, 0xa5, 0x42, 0x7a, 0xc4, 0x78, 0x56, 0x34, 0x12, 0xc4, 0x7a, 0x42, 0xa5,
@@ -675,7 +677,7 @@ mod tests {
}]),
},
server: ServerInfoV3 {
pg_version: 14,
pg_version: PgVersionId::from(PgMajorVersion::PG14),
system_id: 0x1234567887654321,
tenant_id,
timeline_id,
@@ -695,7 +697,7 @@ mod tests {
let expected = [
0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x0e, 0x00, 0x00, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56,
0x00, 0x00, 0x00, 0x00, 0xE0, 0x22, 0x02, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56,
0x34, 0x12, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x66, 0x30, 0x34,
0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37,
0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
@@ -731,7 +733,7 @@ mod tests {
}]),
},
server: ServerInfo {
pg_version: 14,
pg_version: PgVersionId::from(PgMajorVersion::PG14),
system_id: 0x1234567887654321,
wal_seg_size: 0x12345678,
},
@@ -765,7 +767,7 @@ mod tests {
0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34, 0x2a, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x29, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x0e, 0x00, 0x00, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12, 0x78, 0x56,
0xE0, 0x22, 0x02, 0x00, 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12, 0x78, 0x56,
0x34, 0x12, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x63, 0x34, 0x37, 0x61,
0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39,
0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,

View File

@@ -61,6 +61,9 @@ pub mod defaults {
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
/* BEGIN_HADRON */
pub const DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
/* END_HADRON */
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
@@ -99,6 +102,9 @@ pub struct SafeKeeperConf {
pub peer_recovery_enabled: bool,
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
/* BEGIN_HADRON */
pub max_reelect_offloader_lag_bytes: u64,
/* END_HADRON */
pub backup_parallel_jobs: usize,
pub wal_backup_enabled: bool,
pub pg_auth: Option<Arc<JwtAuth>>,
@@ -151,6 +157,9 @@ impl SafeKeeperConf {
sk_auth_token: None,
heartbeat_timeout: Duration::new(5, 0),
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
/* BEGIN_HADRON */
max_reelect_offloader_lag_bytes: defaults::DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES,
/* END_HADRON */
current_thread_runtime: false,
walsenders_keep_horizon: false,
partial_backup_timeout: Duration::from_secs(0),

View File

@@ -138,6 +138,15 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_backup_errors_total counter")
});
/* BEGIN_HADRON */
pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_backup_reelect_leader_total",
"Number of times the backup leader was reelected"
)
.expect("Failed to register safekeeper_backup_reelect_leader_total counter")
});
/* END_HADRON */
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",

View File

@@ -9,6 +9,7 @@ use anyhow::{Context, Result, bail};
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
use postgres_versioninfo::{PgMajorVersion, PgVersionId};
use pq_proto::SystemId;
use safekeeper_api::membership::{
INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
@@ -29,7 +30,7 @@ use crate::{control_file, wal_storage};
pub const SK_PROTO_VERSION_2: u32 = 2;
pub const SK_PROTO_VERSION_3: u32 = 3;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
pub const UNKNOWN_SERVER_VERSION: PgVersionId = PgVersionId::UNKNOWN;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct TermLsn {
@@ -218,7 +219,7 @@ pub struct ProposerGreeting {
pub timeline_id: TimelineId,
pub mconf: membership::Configuration,
/// Postgres server version
pub pg_version: u32,
pub pg_version: PgVersionId,
pub system_id: SystemId,
pub wal_seg_size: u32,
}
@@ -229,7 +230,7 @@ pub struct ProposerGreetingV2 {
/// proposer-acceptor protocol version
pub protocol_version: u32,
/// Postgres server version
pub pg_version: u32,
pub pg_version: PgVersionId,
pub proposer_id: PgUuid,
pub system_id: SystemId,
pub timeline_id: TimelineId,
@@ -511,7 +512,7 @@ impl ProposerAcceptorMessage {
tenant_id,
timeline_id,
mconf,
pg_version,
pg_version: PgVersionId::from_full_pg_version(pg_version),
system_id,
wal_seg_size,
};
@@ -961,7 +962,8 @@ where
* because safekeepers parse WAL headers and the format
* may change between versions.
*/
if msg.pg_version / 10000 != self.state.server.pg_version / 10000
if PgMajorVersion::try_from(msg.pg_version)?
!= PgMajorVersion::try_from(self.state.server.pg_version)?
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
bail!(
@@ -1748,7 +1750,7 @@ mod tests {
}]),
},
server: ServerInfo {
pg_version: 14,
pg_version: PgVersionId::from_full_pg_version(140000),
system_id: 0x1234567887654321,
wal_seg_size: 0x12345678,
},

View File

@@ -8,8 +8,8 @@ use futures::StreamExt;
use futures::future::Either;
use pageserver_api::shard::ShardIdentity;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::waldecoder::{WalDecodeError, WalStreamDecoder};
use postgres_ffi::{PgMajorVersion, get_current_timestamp};
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::error::SendError;
@@ -78,7 +78,7 @@ pub(crate) struct InterpretedWalReader {
shard_senders: HashMap<ShardIdentity, smallvec::SmallVec<[ShardSenderState; 1]>>,
shard_notification_rx: Option<tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>>,
state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
pg_version: u32,
pg_version: PgMajorVersion,
}
/// A handle for [`InterpretedWalReader`] which allows for interacting with it
@@ -258,7 +258,7 @@ impl InterpretedWalReader {
start_pos: Lsn,
tx: tokio::sync::mpsc::Sender<Batch>,
shard: ShardIdentity,
pg_version: u32,
pg_version: PgMajorVersion,
appname: &Option<String>,
) -> InterpretedWalReaderHandle {
let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
@@ -322,7 +322,7 @@ impl InterpretedWalReader {
start_pos: Lsn,
tx: tokio::sync::mpsc::Sender<Batch>,
shard: ShardIdentity,
pg_version: u32,
pg_version: PgMajorVersion,
shard_notification_rx: Option<
tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>,
>,
@@ -718,7 +718,7 @@ mod tests {
use std::time::Duration;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_ffi::MAX_SEND_SIZE;
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion};
use tokio::sync::mpsc::error::TryRecvError;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
@@ -734,7 +734,7 @@ mod tests {
const SIZE: usize = 8 * 1024;
const MSG_COUNT: usize = 200;
const PG_VERSION: u32 = 17;
const PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
const SHARD_COUNT: u8 = 2;
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
@@ -876,7 +876,7 @@ mod tests {
const SIZE: usize = 8 * 1024;
const MSG_COUNT: usize = 200;
const PG_VERSION: u32 = 17;
const PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
const SHARD_COUNT: u8 = 2;
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
@@ -1025,7 +1025,7 @@ mod tests {
const SIZE: usize = 64 * 1024;
const MSG_COUNT: usize = 10;
const PG_VERSION: u32 = 17;
const PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
const SHARD_COUNT: u8 = 2;
const WAL_READER_BATCH_SIZE: usize = 8192;
@@ -1148,7 +1148,7 @@ mod tests {
const SIZE: usize = 8 * 1024;
const MSG_COUNT: usize = 10;
const PG_VERSION: u32 = 17;
const PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
let env = Env::new(true).unwrap();

View File

@@ -12,7 +12,7 @@ use futures::FutureExt;
use itertools::Itertools;
use parking_lot::Mutex;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
use postgres_ffi::{MAX_SEND_SIZE, TimestampTz, get_current_timestamp};
use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, TimestampTz, get_current_timestamp};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use safekeeper_api::Term;
use safekeeper_api::models::{
@@ -559,7 +559,9 @@ impl SafekeeperPostgresHandler {
format,
compression,
} => {
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
let pg_version =
PgMajorVersion::try_from(tli.tli.get_state().await.1.server.pg_version)
.unwrap();
let end_watch_view = end_watch.view();
let wal_residence_guard = tli.wal_residence_guard().await?;
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(2);

View File

@@ -7,6 +7,7 @@ use std::time::SystemTime;
use anyhow::{Result, bail};
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_versioninfo::{PgMajorVersion, PgVersionId};
use safekeeper_api::membership::Configuration;
use safekeeper_api::models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse};
use safekeeper_api::{INITIAL_TERM, ServerInfo, Term};
@@ -149,8 +150,8 @@ impl TimelinePersistentState {
&TenantTimelineId::empty(),
Configuration::empty(),
ServerInfo {
pg_version: 170000, /* Postgres server version (major * 10000) */
system_id: 0, /* Postgres system identifier */
pg_version: PgVersionId::from(PgMajorVersion::PG17),
system_id: 0, /* Postgres system identifier */
wal_seg_size: WAL_SEGMENT_SIZE as u32,
},
Lsn::INVALID,

View File

@@ -26,7 +26,9 @@ use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::{backoff, pausable_failpoint};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
use crate::metrics::{
BACKED_UP_SEGMENTS, BACKUP_ERRORS, BACKUP_REELECT_LEADER_COUNT, WAL_BACKUP_TASKS,
};
use crate::timeline::WalResidentTimeline;
use crate::timeline_manager::{Manager, StateSnapshot};
use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
@@ -70,8 +72,7 @@ pub(crate) async fn update_task(
need_backup: bool,
state: &StateSnapshot,
) {
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
let (offloader, election_dbg_str) = hadron_determine_offloader(mgr, state);
let elected_me = Some(mgr.conf.my_id) == offloader;
let should_task_run = need_backup && elected_me;
@@ -127,6 +128,71 @@ async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
}
}
/* BEGIN_HADRON */
// On top of the neon determine_offloader, we also check if the current offloader is lagging behind too much.
// If it is, we re-elect a new offloader. This mitigates the below issue. It also helps distribute the load across SKs.
//
// We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs.
// wal_backup task continously failing to upload a full segment while the segment remains partial on the disk.
// The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space.
// See go/sk-ood-xlog-switch for more details.
//
// To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much.
// Each SK makes the decision locally but they are aware of each other's commit and backup lsns.
//
// determine_offloader will pick a SK. say SK-1.
// Each SK checks
// -- if commit_lsn - back_lsn > threshold,
// -- -- remove SK-1 from the candidate and call determine_offloader again.
// SK-1 will step down and all SKs will elect the same leader again.
// After the backup is caught up, the leader will become SK-1 again.
fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option<NodeId>, String) {
let mut offloader: Option<NodeId>;
let mut election_dbg_str: String;
let caughtup_peers_count: usize;
(offloader, election_dbg_str, caughtup_peers_count) =
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
if offloader.is_none() || caughtup_peers_count <= 1 {
return (offloader, election_dbg_str);
}
let offloader_sk_id = offloader.unwrap();
let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn);
if backup_lag.is_none() {
info!("Backup lag is None. Skipping re-election.");
return (offloader, election_dbg_str);
}
let backup_lag = backup_lag.unwrap().0;
if backup_lag < mgr.conf.max_reelect_offloader_lag_bytes {
info!(
"Backup lag {} is lower than the threshold {}. Skipping re-election.",
backup_lag, mgr.conf.max_reelect_offloader_lag_bytes
);
return (offloader, election_dbg_str);
}
info!(
"Electing a new leader: Backup lag is too high backup lsn lag {} threshold {}: {}",
backup_lag, mgr.conf.max_reelect_offloader_lag_bytes, election_dbg_str
);
BACKUP_REELECT_LEADER_COUNT.inc();
// Remove the current offloader if lag is too high.
let new_peers: Vec<_> = state
.peers
.iter()
.filter(|p| p.sk_id != offloader_sk_id)
.cloned()
.collect();
(offloader, election_dbg_str, _) =
determine_offloader(&new_peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
(offloader, election_dbg_str)
}
/* END_HADRON */
/// The goal is to ensure that normally only one safekeepers offloads. However,
/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
/// time we have several ones as they PUT the same files. Also,
@@ -141,13 +207,13 @@ fn determine_offloader(
wal_backup_lsn: Lsn,
ttid: TenantTimelineId,
conf: &SafeKeeperConf,
) -> (Option<NodeId>, String) {
) -> (Option<NodeId>, String, usize) {
// TODO: remove this once we fill newly joined safekeepers since backup_lsn.
let capable_peers = alive_peers
.iter()
.filter(|p| p.local_start_lsn <= wal_backup_lsn);
match capable_peers.clone().map(|p| p.commit_lsn).max() {
None => (None, "no connected peers to elect from".to_string()),
None => (None, "no connected peers to elect from".to_string(), 0),
Some(max_commit_lsn) => {
let threshold = max_commit_lsn
.checked_sub(conf.max_offloader_lag_bytes)
@@ -175,6 +241,7 @@ fn determine_offloader(
capable_peers_dbg,
caughtup_peers.len()
),
caughtup_peers.len(),
)
}
}

View File

@@ -19,6 +19,7 @@ use futures::future::BoxFuture;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo, dispatch_pgversion};
use postgres_versioninfo::{PgMajorVersion, PgVersionId};
use pq_proto::SystemId;
use remote_storage::RemotePath;
use std::sync::Arc;
@@ -92,7 +93,7 @@ pub struct PhysicalStorage {
/// Size of WAL segment in bytes.
wal_seg_size: usize,
pg_version: u32,
pg_version: PgVersionId,
system_id: u64,
/// Written to disk, but possibly still in the cache and not fully persisted.
@@ -180,7 +181,7 @@ impl PhysicalStorage {
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
let version = state.server.pg_version / 10000;
let version = PgMajorVersion::try_from(state.server.pg_version).unwrap();
dispatch_pgversion!(
version,
@@ -226,7 +227,10 @@ impl PhysicalStorage {
write_record_lsn: write_lsn,
flush_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
decoder: WalStreamDecoder::new(
write_lsn,
PgMajorVersion::try_from(state.server.pg_version).unwrap(),
),
file: None,
pending_wal_truncation: true,
})
@@ -408,7 +412,7 @@ impl Storage for PhysicalStorage {
let segno = init_lsn.segment_number(self.wal_seg_size);
let (mut file, _) = self.open_or_create(segno).await?;
let major_pg_version = self.pg_version / 10000;
let major_pg_version = PgMajorVersion::try_from(self.pg_version).unwrap();
let wal_seg =
postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
file.seek(SeekFrom::Start(0)).await?;
@@ -654,7 +658,7 @@ pub struct WalReader {
// pos is in the same segment as timeline_start_lsn.
timeline_start_lsn: Lsn,
// integer version number of PostgreSQL, e.g. 14; 15; 16
pg_version: u32,
pg_version: PgMajorVersion,
system_id: SystemId,
timeline_start_segment: Option<Bytes>,
}
@@ -697,7 +701,7 @@ impl WalReader {
wal_backup,
local_start_lsn: state.local_start_lsn,
timeline_start_lsn: state.timeline_start_lsn,
pg_version: state.server.pg_version / 10000,
pg_version: PgMajorVersion::try_from(state.server.pg_version).unwrap(),
system_id: state.server.system_id,
timeline_start_segment: None,
})

View File

@@ -159,6 +159,9 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
heartbeat_timeout: Duration::from_secs(0),
remote_storage: None,
max_offloader_lag_bytes: 0,
/* BEGIN_HADRON */
max_reelect_offloader_lag_bytes: 0,
/* END_HADRON */
wal_backup_enabled: false,
listen_pg_addr_tenant_only: None,
advertise_pg_addr: None,

View File

@@ -7,8 +7,8 @@ use anyhow::Result;
use bytes::{Buf, BytesMut};
use futures::future::BoxFuture;
use parking_lot::Mutex;
use postgres_ffi::XLogSegNo;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{PgMajorVersion, XLogSegNo};
use safekeeper::metrics::WalStorageMetrics;
use safekeeper::state::TimelinePersistentState;
use safekeeper::{control_file, wal_storage};
@@ -142,7 +142,7 @@ impl DiskWALStorage {
write_lsn,
write_record_lsn: flush_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(flush_lsn, 16),
decoder: WalStreamDecoder::new(flush_lsn, PgMajorVersion::PG16),
unflushed_bytes: BytesMut::new(),
disk,
})
@@ -151,7 +151,7 @@ impl DiskWALStorage {
fn find_end_of_wal(disk: Arc<TimelineDisk>, start_lsn: Lsn) -> Result<Lsn> {
let mut buf = [0; 8192];
let mut pos = start_lsn.0;
let mut decoder = WalStreamDecoder::new(start_lsn, 16);
let mut decoder = WalStreamDecoder::new(start_lsn, PgMajorVersion::PG16);
let mut result = start_lsn;
loop {
disk.wal.lock().read(pos, &mut buf);
@@ -204,7 +204,7 @@ impl wal_storage::Storage for DiskWALStorage {
self.decoder.available(),
startpos,
);
self.decoder = WalStreamDecoder::new(startpos, 16);
self.decoder = WalStreamDecoder::new(startpos, PgMajorVersion::PG16);
}
self.decoder.feed_bytes(buf);
loop {
@@ -242,7 +242,7 @@ impl wal_storage::Storage for DiskWALStorage {
self.write_record_lsn = end_pos;
self.flush_record_lsn = end_pos;
self.unflushed_bytes.clear();
self.decoder = WalStreamDecoder::new(end_pos, 16);
self.decoder = WalStreamDecoder::new(end_pos, PgMajorVersion::PG16);
Ok(())
}

View File

@@ -27,6 +27,7 @@ governor.workspace = true
hex.workspace = true
hyper0.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
json-structural-diff.workspace = true
lasso.workspace = true
@@ -34,6 +35,7 @@ once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
posthog_client_lite.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true

View File

@@ -14,11 +14,13 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
use hyper0::Uri;
use metrics::BuildInfo;
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::config::PostHogConfig;
use reqwest::Certificate;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::feature_flag::FeatureFlagService;
use storage_controller::service::{
Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
@@ -252,6 +254,8 @@ struct Secrets {
peer_jwt_token: Option<String>,
}
const POSTHOG_CONFIG_ENV: &str = "POSTHOG_CONFIG";
impl Secrets {
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
@@ -409,6 +413,18 @@ async fn async_main() -> anyhow::Result<()> {
None => Vec::new(),
};
let posthog_config = if let Ok(json) = std::env::var(POSTHOG_CONFIG_ENV) {
let res: Result<PostHogConfig, _> = serde_json::from_str(&json);
if let Ok(config) = res {
Some(config)
} else {
tracing::warn!("Invalid posthog config: {json}");
None
}
} else {
None
};
let config = Config {
pageserver_jwt_token: secrets.pageserver_jwt_token,
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
@@ -455,6 +471,7 @@ async fn async_main() -> anyhow::Result<()> {
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications,
timeline_safekeeper_count: args.timeline_safekeeper_count,
posthog_config: posthog_config.clone(),
#[cfg(feature = "testing")]
kick_secondary_downloads: args.kick_secondary_downloads,
};
@@ -537,6 +554,23 @@ async fn async_main() -> anyhow::Result<()> {
)
});
let feature_flag_task = if let Some(posthog_config) = posthog_config {
let service = service.clone();
let cancel = CancellationToken::new();
let cancel_bg = cancel.clone();
let task = tokio::task::spawn(
async move {
let feature_flag_service = FeatureFlagService::new(service, posthog_config);
let feature_flag_service = Arc::new(feature_flag_service);
feature_flag_service.run(cancel_bg).await
}
.instrument(tracing::info_span!("feature_flag_service")),
);
Some((task, cancel))
} else {
None
};
// Wait until we receive a signal
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
@@ -584,6 +618,12 @@ async fn async_main() -> anyhow::Result<()> {
chaos_jh.await.ok();
}
// If we were running the feature flag service, stop that so that we're not calling into Service while it shuts down
if let Some((feature_flag_task, feature_flag_cancel)) = feature_flag_task {
feature_flag_cancel.cancel();
feature_flag_task.await.ok();
}
service.shutdown().await;
tracing::info!("Service shutdown complete");

View File

@@ -376,4 +376,13 @@ impl PageserverClient {
.await
)
}
pub(crate) async fn update_feature_flag_spec(&self, spec: String) -> Result<()> {
measured_request!(
"update_feature_flag_spec",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.update_feature_flag_spec(spec).await
)
}
}

View File

@@ -1,5 +1,6 @@
pub mod chaos_injector;
mod context_iterator;
pub mod feature_flag;
pub(crate) mod safekeeper_reconciler;
mod safekeeper_service;
@@ -25,6 +26,7 @@ use futures::stream::FuturesUnordered;
use http_utils::error::ApiError;
use hyper::Uri;
use itertools::Itertools;
use pageserver_api::config::PostHogConfig;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
@@ -471,6 +473,9 @@ pub struct Config {
/// Safekeepers will be choosen from different availability zones.
pub timeline_safekeeper_count: i64,
/// PostHog integration config
pub posthog_config: Option<PostHogConfig>,
#[cfg(feature = "testing")]
pub kick_secondary_downloads: bool,
}

View File

@@ -0,0 +1,117 @@
use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use pageserver_api::config::PostHogConfig;
use pageserver_client::mgmt_api;
use posthog_client_lite::{PostHogClient, PostHogClientConfig};
use reqwest::StatusCode;
use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken;
use crate::{pageserver_client::PageserverClient, service::Service};
pub struct FeatureFlagService {
service: Arc<Service>,
config: PostHogConfig,
client: PostHogClient,
http_client: reqwest::Client,
}
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
impl FeatureFlagService {
pub fn new(service: Arc<Service>, config: PostHogConfig) -> Self {
let client = PostHogClient::new(PostHogClientConfig {
project_id: config.project_id.clone(),
server_api_key: config.server_api_key.clone(),
client_api_key: config.client_api_key.clone(),
private_api_url: config.private_api_url.clone(),
public_api_url: config.public_api_url.clone(),
});
Self {
service,
config,
client,
http_client: reqwest::Client::new(),
}
}
async fn refresh(self: Arc<Self>, cancel: CancellationToken) -> Result<(), anyhow::Error> {
let nodes = {
let inner = self.service.inner.read().unwrap();
inner.nodes.clone()
};
let feature_flag_spec = self.client.get_feature_flags_local_evaluation_raw().await?;
let stream = futures::stream::iter(nodes.values().cloned()).map(|node| {
let this = self.clone();
let feature_flag_spec = feature_flag_spec.clone();
async move {
let res = async {
let client = PageserverClient::new(
node.get_id(),
this.http_client.clone(),
node.base_url(),
// TODO: what if we rotate the token during storcon lifetime?
this.service.config.pageserver_jwt_token.as_deref(),
);
client.update_feature_flag_spec(feature_flag_spec).await?;
tracing::info!(
"Updated {}({}) with feature flag spec",
node.get_id(),
node.base_url()
);
Ok::<_, mgmt_api::Error>(())
};
if let Err(e) = res.await {
if let mgmt_api::Error::ApiError(status, _) = e {
if status == StatusCode::NOT_FOUND {
// This is expected during deployments where the API is not available, so we can ignore it
return;
}
}
tracing::warn!(
"Failed to update feature flag spec for {}: {e}",
node.get_id()
);
}
}
});
let mut stream = stream.buffer_unordered(8);
while stream.next().await.is_some() {
if cancel.is_cancelled() {
return Ok(());
}
}
Ok(())
}
pub async fn run(self: Arc<Self>, cancel: CancellationToken) {
let refresh_interval = self
.config
.refresh_interval
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL);
let mut interval = tokio::time::interval(refresh_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
tracing::info!(
"Starting feature flag service with refresh interval: {:?}",
refresh_interval
);
loop {
tokio::select! {
_ = interval.tick() => {}
_ = cancel.cancelled() => {
break;
}
}
let res = self.clone().refresh(cancel.clone()).await;
if let Err(e) = res {
tracing::error!("Failed to refresh feature flags: {e:#?}");
}
}
}
}

View File

@@ -18,6 +18,7 @@ use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::PgVersionId;
use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -44,7 +45,7 @@ impl Service {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: u32,
pg_version: PgVersionId,
timeline_persistence: &TimelinePersistence,
) -> Result<Vec<NodeId>, ApiError> {
// If quorum is reached, return if we are outside of a specified timeout
@@ -219,7 +220,7 @@ impl Service {
read_only: bool,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000;
let pg_version = PgVersionId::from(timeline_info.pg_version);
// Initially start_lsn is determined by last_record_lsn in pageserver
// response as it does initdb. However, later we persist it and in sk
// creation calls replace with the value from the timeline row if it

View File

@@ -172,7 +172,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
env.initial_tenant,
env.initial_timeline,
MembershipConfiguration(generation=1, members=[sk.safekeeper_id()], new_members=None),
int(env.pg_version),
int(env.pg_version) * 10000,
Lsn(0),
None,
)

View File

@@ -418,7 +418,7 @@ def test_sql_exporter_metrics_e2e(
pg_user = conn_options["user"]
pg_dbname = conn_options["dbname"]
pg_application_name = f"sql_exporter{stem_suffix}"
connstr = f"postgresql://{pg_user}@{pg_host}:{pg_port}/{pg_dbname}?sslmode=disable&application_name={pg_application_name}"
connstr = f"postgresql://{pg_user}@{pg_host}:{pg_port}/{pg_dbname}?sslmode=disable&application_name={pg_application_name}&pgaudit.log=none"
def escape_go_filepath_match_characters(s: str) -> str:
"""

View File

@@ -671,12 +671,6 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu
"""
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# On the new mode, the test runs into a cancellation issue, i.e. the walproposer can't shut down
# as it is hang-waiting on the timeline_checkpoint call in WalIngest::new.
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
# turn off background tasks so that they don't interfere with the downloads
env = neon_env_builder.init_start(
initial_tenant_conf={