Compare commits

...

21 Commits

Author SHA1 Message Date
Alex Chi Z
ab7e5fbf95 feat(pageserver): add PostHog config section
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-07 14:34:21 +08:00
Tristan Partin
0ef6851219 Make the audience claim in compute JWTs a vector (#11845)
According to RFC 7519, `aud` is generally an array of StringOrURI, but
in special cases may be a single StringOrURI value. To accomodate future
control plane work where a single token may work for multiple services,
make the claim a vector.

Link: https://www.rfc-editor.org/rfc/rfc7519#section-4.1.3

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-06 22:19:15 +00:00
Mikhail
5c356c63eb endpoint_storage compute_ctl integration (#11550)
Add `/lfc/(prewarm|offload)` routes to `compute_ctl` which interact with
endpoint storage.

Add `prewarm_lfc_on_startup` spec option which, if enabled, downloads
LFC prewarm data on compute startup.

Resolves: https://github.com/neondatabase/cloud/issues/26343
2025-05-06 22:02:12 +00:00
Suhas Thalanki
384e3df2ad fix: pinned anon extension to v2.1.0 (#11844)
## Problem

Currently the setup for `anon` v2 in the compute image downloads the
latest version of the extension. This can be problematic as on a compute
start/restart it can download a version that is newer than what we have
tested and potentially break things, hence not giving us the ability to
control when the extension is updated.

We were also using `v2.2.0`, which is not ready for production yet and
has been clarified by the maintainer.

Additional context:
https://gitlab.com/dalibo/postgresql_anonymizer/-/issues/530

## Summary of changes

Changed the URL from which we download the `anon` extension to point to
`v2.1.0` instead of `latest`.
2025-05-06 21:52:15 +00:00
Tristan Partin
f9b3a2e059 Add scoping to compute_ctl JWT claims (#11639)
Currently we only have an admin scope which allows a user to bypass the
compute_id check. When the admin scope is provided, validate the
audience of the JWT to be "compute".

Closes: https://github.com/neondatabase/cloud/issues/27614

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-06 19:51:10 +00:00
Jakub Kołodziejczak
79ee78ea32 feat(compute): enable audit logs for pg_session_jwt extension (#11829)
related to https://github.com/neondatabase/cloud/issues/28480
related to https://github.com/neondatabase/pg_session_jwt/pull/36

cc @MihaiBojin @conradludgate @lneves12
2025-05-06 15:18:50 +00:00
Erik Grinaker
0e0ad073bf storcon: fix split aborts removing other tenants (#11837)
## Problem

When aborting a split, the code accidentally removes all other tenant
shards from the in-memory map that have the same shard count as the
aborted split, causing "tenant not found" errors. It will recover on a
storcon restart, when it loads the persisted state. This issue has been
present for at least a year.

Resolves https://github.com/neondatabase/cloud/issues/28589.

## Summary of changes

Only remove shards belonging to the relevant tenant when aborting a
split.

Also adds a regression test.
2025-05-06 13:57:34 +00:00
Alex Chi Z.
6827f2f58c fix(pageserver): only keep iter_with_options API, improve docs in gc-compact (#11804)
## Problem

Address comments in https://github.com/neondatabase/neon/pull/11709

## Summary of changes

- remove `iter` API, users always need to specify buffer size depending
on the expected memory usage.
- several doc improvements

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2025-05-06 12:27:16 +00:00
Peter Bendel
c82e363ed9 cleanup orphan projects created by python tests, too (#11836)
## Problem

- some projects are created during GitHub workflows but not by action
project_create but by python test scripts.

If the python test fails the project is not deleted

## Summary of changes

- make sure we cleanup those python created projects a few days after
they are no longer used, too
2025-05-06 12:26:13 +00:00
Alexander Bayandin
50dc2fae77 compute-node.Dockerfile: remove layer with duplicated name (#11807)
## Problem

Two `rust-extensions-build-pgrx14` layers were added independently in
two different PRs, and the layers are exactly the same

## Summary of changes
- Remove one of `rust-extensions-build-pgrx14` layers
2025-05-06 10:52:21 +00:00
Folke Behrens
62ac5b94b3 proxy: Include the exp/nbf timestamps in the errors (#11828)
## Problem

It's difficult to tell when the JWT expired from current logs and error
messages.

## Summary of changes

Add exp/nbf timestamps to the respective error variants.
Also use checked_add when deserializing a SystemTime from JWT.

Related to INC-509
2025-05-06 09:28:25 +00:00
Konstantin Knizhnik
f0e7b3e0ef Use unlogged build for gist_indexsortbuild_flush_ready_pages (#11753)
## Problem

See https://github.com/neondatabase/neon/issues/11718

GIST index can be constructed in two ways: GIST_SORTED_BUILD and
GIST_BUFFERING.
We used unlogged build in the second case but not in the first.

## Summary of changes

Use unlogged build in `gist_indexsortbuild_flush_ready_pages`

Correspondent Postgres PRsL:
https://github.com/neondatabase/postgres/pull/624
https://github.com/neondatabase/postgres/pull/625
https://github.com/neondatabase/postgres/pull/626

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2025-05-06 07:24:27 +00:00
Dmitrii Kovalkov
c6ff18affc cosmetics(pgxn/neon): WP code small clean up (#11824)
## Problem
Some small cosmetic changes I made while reading the code. Should not
affect anything.

## Summary of changes
- Remove `n_votes` field because it's not used anymore
- Explicitly initialize `safekeepers_generation` with
`INVALID_GENERATION` if the generation is not present (the struct is
zero-initialized anyway, but the explicit initialization is better IMHO)
- Access SafekeeperId via pointer `sk_id` created above
2025-05-06 06:51:51 +00:00
Heikki Linnakangas
16ca74a3f4 Add SAFETY comment on libc::sysconf() call (#11581)
I got an 'undocumented_unsafe_blocks' clippy warning about it. Not sure
why I got the warning now and not before, but in any case a comment is a
good idea.
2025-05-06 06:49:23 +00:00
Peter Bendel
cb67f9a651 delete orphan left over projects (#11826)
## Problem

sometimes our benchmarking GitHub workflow is terminated by side-effects
beyond our control (e.g. GitHub runner looses connection to server) and
then we have left-over Neon projects created during the workflow

[Example where GitHub runner lost connection and project was not
deleted](https://github.com/neondatabase/neon/actions/runs/14017400543/job/39244816485)

Fixes https://github.com/neondatabase/cloud/issues/28546

## Summary of changes

- Add a cleanup step that cleans up left-over projects
- also give each project created during workflows a name that references
the testcase and GitHub runid

## Example run (test of new job steps)


https://github.com/neondatabase/neon/actions/runs/14837092399/job/41650741922#step:6:63

---------

Co-authored-by: a-masterov <72613290+a-masterov@users.noreply.github.com>
2025-05-05 14:30:13 +00:00
devin-ai-integration[bot]
baf425a2cd [pageserver/virtual_file] impr: Improve OpenOptions API ergonomics (#11789)
# Improve OpenOptions API ergonomics

Closes #11787

This PR improves the OpenOptions API ergonomics by:

1. Making OpenOptions methods take and return owned Self instead of &mut
self
2. Changing VirtualFile::open_with_options_v2 to take an owned
OpenOptions
3. Removing unnecessary .clone() and .to_owned() calls

These changes make the API more idiomatic Rust by leveraging the builder
pattern with owned values, which is cleaner and more ergonomic than the
previous approach.

Link to Devin run:
https://app.devin.ai/sessions/c2a4b24f7aca40a3b3777f4259bf8ee1
Requested by: christian@neon.tech

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: christian@neon.tech <christian@neon.tech>
2025-05-05 13:06:37 +00:00
Alex Chi Z.
0b243242df fix(test): allow flush error in gc-compaction tests (#11822)
## Problem

Part of https://github.com/neondatabase/neon/issues/11762

## Summary of changes

While #11762 needs some work to refactor the error propagating thing, we
can do a hacky fix for the gc-compaction tests to allow flush error
during shutdown. It does not affect correctness.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-05 12:15:22 +00:00
Conrad Ludgate
6131d86ec9 proxy: allow invalid SNI (#11792)
## Problem

Some PrivateLink customers are unable to use Private DNS. As such they
use an invalid domain name to address Neon. We currently are rejecting
those connections because we cannot resolve the correct certificate.

## Summary of changes

1. Ensure a certificate is always returned.
2. If there is an SNI field, use endpoint fallback if it doesn't match.

I suggest reviewing each commit separately.
2025-05-05 11:18:55 +00:00
Konstantin Knizhnik
4b9087651c Checked that stored LwLSN >= FirstNormalUnloggedLSN (#11750)
## Problem

Undo unintended change 60b9fb1baf

## Summary of changes

Add assert that we are not storing fake LSN in LwLSN.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-02 19:27:59 +00:00
Konstantin Knizhnik
79699aebc8 Reserve in file descriptor pool sockets used for connections to page servers (#11798)
## Problem

See https://github.com/neondatabase/neon/issues/11790

The neon extension opens extensions to the pageservers, which consumes
file descriptors. Postgres has a mechanism to count how many FDs are in
use, but it doesn't know about those FDs. We should call
ReserveExternalFD() or AcquireExternalFD() to account for them.

## Summary of changes

Call `ReserveExternalFD()` for each shard

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Mikhail Kot <mikhail@neon.tech>
2025-05-02 14:36:10 +00:00
Alexander Bayandin
22290eb7ba CI: notify relevant team about release deploy failures (#11797)
## Problem

We notify only Storage team about failed deploys, but Compute and Proxy
teams can also benefit from that

## Summary of changes
- Adjust `notify-storage-release-deploy-failure` to notify the relevant
team about failed deploy
2025-05-02 12:46:21 +00:00
71 changed files with 1322 additions and 444 deletions

View File

@@ -33,9 +33,14 @@ config-variables:
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID
- SLACK_COMPUTE_CHANNEL_ID
- SLACK_ON_CALL_DEVPROD_STREAM
- SLACK_ON_CALL_QA_STAGING_STREAM
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_ONCALL_COMPUTE_GROUP
- SLACK_ONCALL_PROXY_GROUP
- SLACK_ONCALL_STORAGE_GROUP
- SLACK_PROXY_CHANNEL_ID
- SLACK_RUST_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- SLACK_UPCOMING_RELEASE_CHANNEL_ID

View File

@@ -53,6 +53,77 @@ concurrency:
cancel-in-progress: true
jobs:
cleanup:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
env:
ORG_ID: org-solitary-dew-09443886
LIMIT: 100
SEARCH: "GITHUB_RUN_ID="
BASE_URL: https://console-stage.neon.build/api/v2
DRY_RUN: "false" # Set to "true" to just test out the workflow
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cleanup inactive Neon projects left over from prior runs
env:
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
run: |
set -euo pipefail
NOW=$(date -u +%s)
DAYS_AGO=$((NOW - 5 * 86400))
REQUEST_URL="$BASE_URL/projects?limit=$LIMIT&search=$(printf '%s' "$SEARCH" | jq -sRr @uri)&org_id=$ORG_ID"
echo "Requesting project list from:"
echo "$REQUEST_URL"
response=$(curl -s -X GET "$REQUEST_URL" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}" )
echo "Response:"
echo "$response" | jq .
projects_to_delete=$(echo "$response" | jq --argjson cutoff "$DAYS_AGO" '
.projects[]
| select(.compute_last_active_at != null)
| select((.compute_last_active_at | fromdateiso8601) < $cutoff)
| {id, name, compute_last_active_at}
')
if [ -z "$projects_to_delete" ]; then
echo "No projects eligible for deletion."
exit 0
fi
echo "Projects that will be deleted:"
echo "$projects_to_delete" | jq -r '.id'
if [ "$DRY_RUN" = "false" ]; then
echo "$projects_to_delete" | jq -r '.id' | while read -r project_id; do
echo "Deleting project: $project_id"
curl -s -X DELETE "$BASE_URL/projects/$project_id" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
done
else
echo "Dry run enabled — no projects were deleted."
fi
bench:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

View File

@@ -1434,10 +1434,10 @@ jobs:
;;
esac
notify-storage-release-deploy-failure:
needs: [ deploy ]
notify-release-deploy-failure:
needs: [ meta, deploy ]
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
if: contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), needs.meta.outputs.run-kind) && needs.deploy.result != 'success' && always()
runs-on: ubuntu-22.04
steps:
- name: Harden the runner (Audit all outbound calls)
@@ -1445,15 +1445,40 @@ jobs:
with:
egress-policy: audit
- name: Post release-deploy failure to team-storage slack channel
- name: Post release-deploy failure to team slack channel
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
env:
TEAM_ONCALL: >-
${{
fromJSON(format('{
"storage-release": "<!subteam^{0}|@oncall-storage>",
"compute-release": "<!subteam^{1}|@oncall-compute>",
"proxy-release": "<!subteam^{2}|@oncall-proxy>"
}',
vars.SLACK_ONCALL_STORAGE_GROUP,
vars.SLACK_ONCALL_COMPUTE_GROUP,
vars.SLACK_ONCALL_PROXY_GROUP
))[needs.meta.outputs.run-kind]
}}
CHANNEL: >-
${{
fromJSON(format('{
"storage-release": "{0}",
"compute-release": "{1}",
"proxy-release": "{2}"
}',
vars.SLACK_STORAGE_CHANNEL_ID,
vars.SLACK_COMPUTE_CHANNEL_ID,
vars.SLACK_PROXY_CHANNEL_ID
))[needs.meta.outputs.run-kind]
}}
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
channel: ${{ env.CHANNEL }}
text: |
🔴 <!subteam^S06CJ87UMNY|@oncall-storage>: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
🔴 ${{ env.TEAM_ONCALL }}: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
promote-compatibility-data:

2
Cargo.lock generated
View File

@@ -1284,6 +1284,7 @@ name = "compute_tools"
version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
@@ -1420,6 +1421,7 @@ dependencies = [
"clap",
"comfy-table",
"compute_api",
"endpoint_storage",
"futures",
"http-utils",
"humantime",

View File

@@ -243,6 +243,7 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }

View File

@@ -1084,23 +1084,12 @@ RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
ARG PG_VERSION
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
# Version 14 is now required by a few
# Version 14 is now required by a few
# This layer should be used as a base for new pgrx extensions,
# and eventually get merged with `rust-extensions-build`
#
@@ -1333,8 +1322,8 @@ ARG PG_VERSION
# Do not update without approve from proxy team
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
WORKDIR /ext-src
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.1.tar.gz -O pg_session_jwt.tar.gz && \
echo "62fec9e472cb805c53ba24a0765afdb8ea2720cfc03ae7813e61687b36d1b0ad pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
@@ -1362,7 +1351,8 @@ COPY compute/patches/anon_v2.patch .
# This is an experimental extension, never got to real production.
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/latest/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/2.1.0/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
echo "48e7f5ae2f1ca516df3da86c5c739d48dd780a4e885705704ccaad0faa89d6c0 pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt && \
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "=0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \

View File

@@ -10,6 +10,7 @@ default = []
testing = ["fail/failpoints"]
[dependencies]
async-compression.workspace = true
base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true

View File

@@ -1,17 +1,10 @@
use std::collections::HashMap;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
};
@@ -25,6 +18,16 @@ use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::spawn;
use tracing::{Instrument, debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
@@ -150,6 +153,9 @@ pub struct ComputeState {
/// set up the span relationship ourselves.
pub startup_span: Option<tracing::span::Span>,
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_offload_state: LfcOffloadState,
pub metrics: ComputeMetrics,
}
@@ -163,6 +169,8 @@ impl ComputeState {
pspec: None,
startup_span: None,
metrics: ComputeMetrics::default(),
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
}
}
@@ -198,6 +206,8 @@ pub struct ParsedSpec {
pub pageserver_connstr: String,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
pub endpoint_storage_addr: Option<SocketAddr>,
pub endpoint_storage_token: Option<String>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
@@ -251,6 +261,18 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.or(Err("invalid timeline id"))?
};
let endpoint_storage_addr: Option<SocketAddr> = spec
.endpoint_storage_addr
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
.unwrap_or_default()
.parse()
.ok();
let endpoint_storage_token = spec
.endpoint_storage_token
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
Ok(ParsedSpec {
spec,
pageserver_connstr,
@@ -258,6 +280,8 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
storage_auth_token,
tenant_id,
timeline_id,
endpoint_storage_addr,
endpoint_storage_token,
})
}
}
@@ -736,6 +760,9 @@ impl ComputeNode {
// Log metrics so that we can search for slow operations in logs
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
if pspec.spec.prewarm_lfc_on_startup {
self.prewarm_lfc();
}
Ok(())
}

View File

@@ -0,0 +1,202 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
use compute_api::responses::LfcOffloadState;
use compute_api::responses::LfcPrewarmState;
use http::StatusCode;
use reqwest::Client;
use std::sync::Arc;
use tokio::{io::AsyncReadExt, spawn};
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
pub struct LfcPrewarmStateWithProgress {
#[serde(flatten)]
base: LfcPrewarmState,
total: i32,
prewarmed: i32,
skipped: i32,
}
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
struct EndpointStoragePair {
url: String,
token: String,
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
};
let tenant_id = pspec.tenant_id;
let timeline_id = pspec.timeline_id;
let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
let Some(ref token) = pspec.endpoint_storage_token else {
bail!("pspec.endpoint_storage_token missing")
};
let token = token.clone();
Ok(EndpointStoragePair { url, token })
}
}
impl ComputeNode {
// If prewarm failed, we want to get overall number of segments as well as done ones.
// However, this function should be reliable even if querying postgres failed.
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
info!("requesting LFC prewarm state from postgres");
let mut state = LfcPrewarmStateWithProgress::default();
{
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
}
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
Ok(client) => client,
Err(err) => {
error!(%err, "connecting to postgres");
return state;
}
};
let row = match client
.query_one("select * from get_prewarm_info()", &[])
.await
{
Ok(row) => row,
Err(err) => {
error!(%err, "querying LFC prewarm status");
return state;
}
};
state.total = row.try_get(0).unwrap_or_default();
state.prewarmed = row.try_get(1).unwrap_or_default();
state.skipped = row.try_get(2).unwrap_or_default();
state
}
pub fn lfc_offload_state(&self) -> LfcOffloadState {
self.state.lock().unwrap().lfc_offload_state.clone()
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming =
std::mem::replace(state, LfcPrewarmState::Prewarming)
{
return false;
}
}
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl().await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
error: err.to_string(),
};
});
true
}
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
}
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
let status = res.status();
if status != StatusCode::OK {
bail!("{status} querying endpoint storage")
}
let mut uncompressed = Vec::new();
let lfc_state = res
.bytes()
.await
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())
}
/// Returns false if there is an offload request ongoing, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if let LfcOffloadState::Offloading =
std::mem::replace(state, LfcOffloadState::Offloading)
{
return false;
}
}
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.offload_lfc_impl().await else {
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
};
});
true
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
Err(err) => Err(err).context("writing to endpoint storage"),
}
}
}

View File

@@ -223,6 +223,9 @@ pub fn write_postgres_conf(
// TODO: tune this after performance testing
writeln!(file, "pgaudit.log_rotation_age=5")?;
// Enable audit logs for pg_session_jwt extension
writeln!(file, "pg_session_jwt.audit_log=on")?;
// Add audit shared_preload_libraries, if they are not present.
//
// The caller who sets the flag is responsible for ensuring that the necessary

View File

@@ -1,12 +1,10 @@
use std::collections::HashSet;
use anyhow::{Result, anyhow};
use axum::{RequestExt, body::Body};
use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
};
use compute_api::requests::ComputeClaims;
use compute_api::requests::{COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope};
use futures::future::BoxFuture;
use http::{Request, Response, StatusCode};
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
@@ -25,13 +23,14 @@ pub(in crate::http) struct Authorize {
impl Authorize {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// Nothing is currently required
validation.required_spec_claims = HashSet::new();
validation.validate_exp = true;
// Unused by the control plane
validation.validate_aud = false;
// Unused by the control plane
validation.validate_nbf = false;
// Unused by the control plane
validation.validate_aud = false;
validation.set_audience(&[COMPUTE_AUDIENCE]);
// Nothing is currently required
validation.set_required_spec_claims(&[] as &[&str; 0]);
Self {
compute_id,
@@ -64,11 +63,47 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
};
if data.claims.compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid compute ID in authorization token claims",
));
match data.claims.scope {
// TODO: We should validate audience for every token, but
// instead of this ad-hoc validation, we should turn
// [`Validation::validate_aud`] on. This is merely a stopgap
// while we roll out `aud` deployment. We return a 401
// Unauthorized because when we eventually do use
// [`Validation`], we will hit the above `Err` match arm which
// returns 401 Unauthorized.
Some(ComputeClaimsScope::Admin) => {
let Some(ref audience) = data.claims.audience else {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"missing audience in authorization token claims",
));
};
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid audience in authorization token claims",
));
}
}
// If the scope is not [`ComputeClaimsScope::Admin`], then we
// must validate the compute_id
_ => {
let Some(ref claimed_compute_id) = data.claims.compute_id else {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"missing compute_id in authorization token claims",
));
};
if *claimed_compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"invalid compute ID in authorization token claims",
));
}
}
}
// Make claims available to any subsequent middleware or request

View File

@@ -0,0 +1,39 @@
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
Json(compute.lfc_prewarm_state().await)
}
// Following functions are marked async for axum, as it's more convenient than wrapping these
// in async lambdas at call site
pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadState> {
Json(compute.lfc_offload_state())
}
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(
StatusCode::TOO_MANY_REQUESTS,
"Multiple requests for prewarm are not allowed",
)
}
}
pub(in crate::http) async fn offload(compute: Compute) -> Response {
if compute.offload_lfc() {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(
StatusCode::TOO_MANY_REQUESTS,
"Multiple requests for prewarm offload are not allowed",
)
}
}

View File

@@ -11,6 +11,7 @@ pub(in crate::http) mod extensions;
pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod status;

View File

@@ -23,7 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, metrics, metrics_json, status, terminate,
grants, insights, lfc, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -85,6 +85,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))

View File

@@ -11,6 +11,7 @@ pub mod http;
pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;

View File

@@ -1,7 +1,7 @@
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
IntCounter, IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -97,6 +97,24 @@ pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::
.expect("failed to define a metric")
});
/// Needed as neon.file_cache_prewarm_batch == 0 doesn't mean we never tried to prewarm.
/// On the other hand, LFC_PREWARMED_PAGES is excessive as we can GET /lfc/prewarm
pub(crate) static LFC_PREWARM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarm_requests_total",
"Total number of LFC prewarm requests made by compute_ctl",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOAD_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offload_requests_total",
"Total number of LFC offload requests made by compute_ctl",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -106,5 +124,7 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics.extend(LFC_PREWARM_REQUESTS.collect());
metrics.extend(LFC_OFFLOAD_REQUESTS.collect());
metrics
}

View File

@@ -30,6 +30,7 @@ mod pg_helpers_tests {
r#"fsync = off
wal_level = logical
hot_standby = on
prewarm_lfc_on_startup = off
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
wal_log_hints = on
log_connections = on

View File

@@ -41,7 +41,7 @@ storage_broker.workspace = true
http-utils.workspace = true
utils.workspace = true
whoami.workspace = true
endpoint_storage.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true
tracing.workspace = true

View File

@@ -16,10 +16,11 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
@@ -705,6 +706,9 @@ struct EndpointStopCmdArgs {
struct EndpointGenerateJwtCmdArgs {
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
scope: Option<ComputeClaimsScope>,
}
#[derive(clap::Subcommand)]
@@ -1018,7 +1022,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
})
.collect(),
endpoint_storage: EndpointStorageConf {
port: ENDPOINT_STORAGE_DEFAULT_PORT,
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
@@ -1484,10 +1488,25 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
None
};
let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
+ Duration::from_secs(86400))
.as_secs();
let claims = endpoint_storage::claims::EndpointStorageClaims {
tenant_id: endpoint.tenant_id,
timeline_id: endpoint.timeline_id,
endpoint_id: endpoint_id.to_string(),
exp,
};
let endpoint_storage_token = env.generate_auth_token(&claims)?;
let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
println!("Starting existing endpoint {endpoint_id}...");
endpoint
.start(
&auth_token,
endpoint_storage_token,
endpoint_storage_addr,
safekeepers_generation,
safekeepers,
pageservers,
@@ -1540,12 +1559,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
endpoint.stop(&args.mode, args.destroy)?;
}
EndpointCmd::GenerateJwt(args) => {
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let jwt = endpoint.generate_jwt()?;
let endpoint = {
let endpoint_id = &args.endpoint_id;
cplane
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
};
let jwt = endpoint.generate_jwt(args.scope)?;
print!("{jwt}");
}

View File

@@ -45,7 +45,9 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::{ComputeClaims, ConfigurationRequest};
use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
};
@@ -630,9 +632,17 @@ impl Endpoint {
}
/// Generate a JWT with the correct claims.
pub fn generate_jwt(&self) -> Result<String> {
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
self.env.generate_auth_token(&ComputeClaims {
compute_id: self.endpoint_id.clone(),
audience: match scope {
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
_ => None,
},
compute_id: match scope {
Some(ComputeClaimsScope::Admin) => None,
_ => Some(self.endpoint_id.clone()),
},
scope,
})
}
@@ -640,6 +650,8 @@ impl Endpoint {
pub async fn start(
&self,
auth_token: &Option<String>,
endpoint_storage_token: String,
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
@@ -733,6 +745,9 @@ impl Endpoint {
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
endpoint_storage_addr: Some(endpoint_storage_addr),
endpoint_storage_token: Some(endpoint_storage_token),
prewarm_lfc_on_startup: false,
};
// this strange code is needed to support respec() in tests
@@ -903,7 +918,7 @@ impl Endpoint {
self.external_http_address.port()
),
)
.bearer_auth(self.generate_jwt()?)
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.send()
.await?;
@@ -980,7 +995,7 @@ impl Endpoint {
self.external_http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.bearer_auth(self.generate_jwt()?)
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.body(
serde_json::to_string(&ConfigurationRequest {
spec,

View File

@@ -3,17 +3,19 @@ use crate::local_env::LocalEnv;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::net::SocketAddr;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub const ENDPOINT_STORAGE_DEFAULT_ADDR: SocketAddr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9993);
pub struct EndpointStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
pub addr: SocketAddr,
}
impl EndpointStorage {
@@ -22,7 +24,7 @@ impl EndpointStorage {
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.endpoint_storage.port,
addr: env.endpoint_storage.listen_addr,
}
}
@@ -31,7 +33,7 @@ impl EndpointStorage {
}
fn listen_addr(&self) -> Utf8PathBuf {
format!("127.0.0.1:{}", self.port).into()
format!("{}:{}", self.addr.ip(), self.addr.port()).into()
}
pub fn init(&self) -> Result<()> {

View File

@@ -20,7 +20,9 @@ use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::broker::StorageBroker;
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::endpoint_storage::{
ENDPOINT_STORAGE_DEFAULT_ADDR, ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage,
};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -151,10 +153,10 @@ pub struct NeonLocalInitConf {
pub generate_local_ssl_certs: bool,
}
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct EndpointStorageConf {
pub port: u16,
pub listen_addr: SocketAddr,
}
/// Broker config for cluster internal communication.
@@ -241,6 +243,14 @@ impl Default for NeonStorageControllerConf {
}
}
impl Default for EndpointStorageConf {
fn default() -> Self {
Self {
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
}
}
}
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {

View File

@@ -12,6 +12,7 @@ ERROR: invalid JWT encoding
-- Test creating a session with an expired JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
ERROR: Token used after it has expired
DETAIL: exp=1742564432
-- Test creating a session with a valid JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
jwt_session_init

View File

@@ -343,7 +343,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = endpoint_storage::Claims {
let claims = endpoint_storage::claims::EndpointStorageClaims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
@@ -489,16 +489,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
#[derive(Serialize)]
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<endpoint_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
let claims = endpoint_storage::claims::DeletePrefixClaims {
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
endpoint_id: parts.get(3).map(ToString::to_string),

View File

@@ -0,0 +1,52 @@
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use utils::id::{EndpointId, TenantId, TimelineId};
/// Claims to add, remove, or retrieve endpoint data. Used by compute_ctl
#[derive(Deserialize, Serialize, PartialEq)]
pub struct EndpointStorageClaims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
/// Claims to remove tenant, timeline, or endpoint data. Used by control plane
#[derive(Deserialize, Serialize, PartialEq)]
pub struct DeletePrefixClaims {
pub tenant_id: TenantId,
/// None when tenant is deleted (endpoint_id is also None in this case)
pub timeline_id: Option<TimelineId>,
/// None when timeline is deleted
pub endpoint_id: Option<EndpointId>,
pub exp: u64,
}
impl Display for EndpointStorageClaims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"EndpointClaims(tenant_id={} timeline_id={} endpoint_id={} exp={})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
impl Display for DeletePrefixClaims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DeletePrefixClaims(tenant_id={} timeline_id={} endpoint_id={}, exp={})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.exp
)
}
}

View File

@@ -1,3 +1,5 @@
pub mod claims;
use crate::claims::{DeletePrefixClaims, EndpointStorageClaims};
use anyhow::Result;
use axum::extract::{FromRequestParts, Path};
use axum::response::{IntoResponse, Response};
@@ -13,7 +15,7 @@ use std::result::Result as StdResult;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use utils::id::{TenantId, TimelineId};
use utils::id::{EndpointId, TenantId, TimelineId};
// simplified version of utils::auth::JwtAuth
pub struct JwtAuth {
@@ -79,26 +81,6 @@ pub struct Storage {
pub max_upload_file_limit: usize,
}
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
#[derive(Deserialize, Serialize, PartialEq)]
pub struct Claims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
impl Display for Claims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
#[derive(Deserialize, Serialize)]
struct KeyRequest {
tenant_id: TenantId,
@@ -107,6 +89,13 @@ struct KeyRequest {
path: String,
}
#[derive(Deserialize, Serialize, PartialEq)]
struct PrefixKeyRequest {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<EndpointId>,
}
#[derive(Debug, PartialEq)]
pub struct S3Path {
pub path: RemotePath,
@@ -165,7 +154,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: Claims = state
let claims: EndpointStorageClaims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
@@ -178,7 +167,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
path.endpoint_id.clone()
};
let route = Claims {
let route = EndpointStorageClaims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id,
@@ -193,38 +182,13 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
}
}
#[derive(Deserialize, Serialize, PartialEq)]
pub struct PrefixKeyPath {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub endpoint_id: Option<EndpointId>,
}
impl Display for PrefixKeyPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string())
)
}
}
#[derive(Debug, PartialEq)]
pub struct PrefixS3Path {
pub path: RemotePath,
}
impl From<&PrefixKeyPath> for PrefixS3Path {
fn from(path: &PrefixKeyPath) -> Self {
impl From<&DeletePrefixClaims> for PrefixS3Path {
fn from(path: &DeletePrefixClaims) -> Self {
let timeline_id = path
.timeline_id
.as_ref()
@@ -250,21 +214,27 @@ impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path) = parts
.extract::<Path<PrefixKeyPath>>()
.extract::<Path<PrefixKeyRequest>>()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: PrefixKeyPath = state
let claims: DeletePrefixClaims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "invalid token"))?;
if path != claims {
return Err(unauthorized(path, claims));
let route = DeletePrefixClaims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id: path.endpoint_id,
exp: claims.exp,
};
if route != claims {
return Err(unauthorized(route, claims));
}
Ok((&path).into())
Ok((&route).into())
}
}
@@ -297,7 +267,7 @@ mod tests {
#[test]
fn s3_path() {
let auth = Claims {
let auth = EndpointStorageClaims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
@@ -327,10 +297,11 @@ mod tests {
#[test]
fn prefix_s3_path() {
let mut path = PrefixKeyPath {
let mut path = DeletePrefixClaims {
tenant_id: TENANT_ID,
timeline_id: None,
endpoint_id: None,
exp: 0,
};
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
assert_eq!(

View File

@@ -1,16 +1,58 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use crate::privilege::Privilege;
use crate::responses::ComputeCtlConfig;
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
/// The value to place in the [`ComputeClaims::audience`] claim.
pub static COMPUTE_AUDIENCE: &str = "compute";
/// Available scopes for a compute's JWT.
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ComputeClaimsScope {
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
/// facilities.
Admin,
}
impl FromStr for ComputeClaimsScope {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"admin" => Ok(ComputeClaimsScope::Admin),
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
}
}
}
/// When making requests to the `compute_ctl` external HTTP server, the client
/// must specify a set of claims in `Authorization` header JWTs such that
/// `compute_ctl` can authorize the request.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename = "snake_case")]
pub struct ComputeClaims {
pub compute_id: String,
/// The compute ID that will validate the token. The only case in which this
/// can be [`None`] is if [`Self::scope`] is
/// [`ComputeClaimsScope::Admin`].
pub compute_id: Option<String>,
/// The scope of what the token authorizes.
pub scope: Option<ComputeClaimsScope>,
/// The recipient the token is intended for.
///
/// See [RFC 7519](https://www.rfc-editor.org/rfc/rfc7519#section-4.1.3) for
/// more information.
///
/// TODO: Remove the [`Option`] wrapper when control plane learns to send
/// the claim.
#[serde(rename = "aud")]
pub audience: Option<Vec<String>>,
}
/// Request of the /configure API

View File

@@ -46,6 +46,30 @@ pub struct ExtensionInstallResponse {
pub version: ExtVersion,
}
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState {
#[default]
NotPrewarmed,
Prewarming,
Completed,
Failed {
error: String,
},
}
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState {
#[default]
NotOffloaded,
Offloading,
Completed,
Failed {
error: String,
},
}
/// Response of the /status API
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]

View File

@@ -172,6 +172,15 @@ pub struct ComputeSpec {
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
pub logs_export_host: Option<String>,
/// Address of endpoint storage service
pub endpoint_storage_addr: Option<String>,
/// JWT for authorizing requests to endpoint storage service
pub endpoint_storage_token: Option<String>,
/// If true, download LFC state from endpoint_storage and pass it to Postgres on startup
#[serde(default)]
pub prewarm_lfc_on_startup: bool,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.

View File

@@ -84,6 +84,11 @@
"value": "on",
"vartype": "bool"
},
{
"name": "prewarm_lfc_on_startup",
"value": "off",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501",

View File

@@ -16,6 +16,7 @@ pub struct Collector {
const NMETRICS: usize = 2;
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
// SAFETY: libc::sysconf is safe, it merely returns a value.
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");

View File

@@ -43,6 +43,21 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
/// PostHog integration config
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
project_id: String,
/// Server-side (private) API key
server_api_key: String,
/// Client-side (public) API key
client_api_key: String,
/// Private API URL
private_api_url: String,
/// Public API URL
public_api_url: String,
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer
@@ -182,6 +197,8 @@ pub struct ConfigToml {
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub posthog_config: Option<PostHogConfig>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -659,6 +676,7 @@ impl Default for ConfigToml {
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
posthog_config: None,
}
}
}

View File

@@ -295,6 +295,9 @@ pub struct TenantId(Id);
id_newtype!(TenantId);
/// If needed, reuse small string from proxy/src/types.rc
pub type EndpointId = String;
// A pair uniquely identifying Neon instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TenantTimelineId {

View File

@@ -14,7 +14,7 @@ use std::time::Duration;
use anyhow::{Context, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pem::Pem;
@@ -230,6 +230,9 @@ pub struct PageServerConf {
/// such as authentication requirements for HTTP and PostgreSQL APIs.
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
/// PostHog integration config
pub posthog_config: Option<PostHogConfig>,
}
/// Token for authentication to safekeepers
@@ -404,6 +407,7 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
posthog_config,
} = config_toml;
let mut conf = PageServerConf {
@@ -513,6 +517,7 @@ impl PageServerConf {
}
None => Vec::new(),
},
posthog_config,
};
// ------------------------------------------------------------

View File

@@ -1441,14 +1441,6 @@ impl DeltaLayerInner {
offset
}
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
@@ -1634,7 +1626,6 @@ pub(crate) mod test {
use crate::tenant::disk_btree::tests::TestDisk;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
/// Construct an index for a fictional delta layer and and then
@@ -2311,8 +2302,7 @@ pub(crate) mod test {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut num_items = 0;
for _ in 0..3 {
iter.next_batch().await.unwrap();
@@ -2329,8 +2319,7 @@ pub(crate) mod test {
iter.key_values_batch.clear();
}
// Test if the result is correct
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
assert_delta_iter_equal(&mut iter, &test_deltas).await;
}
}

View File

@@ -157,7 +157,7 @@ mod tests {
.await
.unwrap();
let merge_iter = MergeIterator::create(
let merge_iter = MergeIterator::create_for_testing(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,
@@ -182,7 +182,7 @@ mod tests {
result.extend(test_deltas1[90..100].iter().cloned());
assert_filter_iter_equal(&mut filter_iter, &result).await;
let merge_iter = MergeIterator::create(
let merge_iter = MergeIterator::create_for_testing(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,

View File

@@ -684,14 +684,6 @@ impl ImageLayerInner {
}
}
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub(crate) fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
@@ -1240,7 +1232,6 @@ mod test {
use crate::context::RequestContext;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
#[tokio::test]
@@ -1507,8 +1498,7 @@ mod test {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = img_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut num_items = 0;
for _ in 0..3 {
iter.next_batch().await.unwrap();
@@ -1525,8 +1515,7 @@ mod test {
iter.key_values_batch.clear();
}
// Test if the result is correct
let mut iter = img_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
}
}

View File

@@ -19,14 +19,6 @@ pub(crate) enum LayerRef<'a> {
}
impl<'a> LayerRef<'a> {
#[allow(dead_code)]
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
match self {
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
}
}
fn iter_with_options(
self,
ctx: &'a RequestContext,
@@ -322,6 +314,28 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
}
impl<'a> MergeIterator<'a> {
#[cfg(test)]
pub(crate) fn create_for_testing(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
/// Create a new merge iterator with custom options.
///
/// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
/// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
/// the buffer does not take too much memory.
///
/// The default options for L0 compactions are:
/// - max_read_size: 1024 * 8192 (8MB)
/// - max_batch_size: 1024
///
/// The default options for gc-compaction are:
/// - max_read_size: 128 * 8192 (1MB)
/// - max_batch_size: 128
pub fn create_with_options(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
@@ -351,14 +365,6 @@ impl<'a> MergeIterator<'a> {
}
}
pub fn create(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
while let Some(mut iter) = self.heap.peek_mut() {
if !iter.is_loaded() {
@@ -477,7 +483,7 @@ mod tests {
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
.await
.unwrap();
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
@@ -549,7 +555,7 @@ mod tests {
let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
.await
.unwrap();
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
@@ -670,7 +676,7 @@ mod tests {
// Test with different layer order for MergeIterator::create to ensure the order
// is stable.
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
@@ -682,7 +688,7 @@ mod tests {
);
assert_merge_iter_equal(&mut merge_iter, &expect).await;
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_4.get_as_delta(&ctx).await.unwrap(),

View File

@@ -1994,7 +1994,13 @@ impl Timeline {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
MergeIterator::create_with_options(
&deltas,
&[],
ctx,
1024 * 8192, /* 8 MiB buffer per layer iterator */
1024,
)
};
// This iterator walks through all keys and is needed to calculate size used by each key
@@ -2828,7 +2834,7 @@ impl Timeline {
Ok(())
}
/// Check if the memory usage is within the limit.
/// Check to bail out of gc compaction early if it would use too much memory.
async fn check_memory_usage(
self: &Arc<Self>,
layer_selection: &[Layer],
@@ -2841,7 +2847,8 @@ impl Timeline {
let layer_desc = layer.layer_desc();
if layer_desc.is_delta() {
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
// Multiply the layer size so that tests can pass.
// Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
// use 3MB layer size and we need to account for that).
estimated_memory_usage_mb +=
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_delta_layers += 1;

View File

@@ -14,8 +14,6 @@
use std::fs::File;
use std::io::{Error, ErrorKind};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
@@ -99,7 +97,7 @@ impl VirtualFile {
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
open_options: &OpenOptions,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let mode = get_io_mode();
@@ -112,21 +110,16 @@ impl VirtualFile {
#[cfg(target_os = "linux")]
(IoMode::DirectRw, _) => true,
};
let open_options = open_options.clone();
let open_options = if set_o_direct {
if set_o_direct {
#[cfg(target_os = "linux")]
{
let mut open_options = open_options;
open_options.custom_flags(nix::libc::O_DIRECT);
open_options
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
}
#[cfg(not(target_os = "linux"))]
unreachable!(
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
);
} else {
open_options
};
}
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile { inner, _mode: mode })
}
@@ -530,7 +523,7 @@ impl VirtualFileInner {
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
/// Open a file with given options.
@@ -558,10 +551,11 @@ impl VirtualFileInner {
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options.clone();
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let reopen_options = open_options
.clone()
.create(false)
.create_new(false)
.truncate(false);
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
@@ -1307,7 +1301,7 @@ mod tests {
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
@@ -1374,7 +1368,7 @@ mod tests {
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a
@@ -1393,8 +1387,7 @@ mod tests {
.read(true)
.write(true)
.create(true)
.truncate(true)
.to_owned(),
.truncate(true),
&ctx,
)
.await?;
@@ -1412,12 +1405,7 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = A::open(
path_b.clone(),
OpenOptions::new().read(true).to_owned(),
&ctx,
)
.await?;
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
vfiles.push(vfile);
}
@@ -1466,7 +1454,7 @@ mod tests {
for _ in 0..VIRTUAL_FILES {
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true).clone(),
OpenOptions::new().read(true),
&ctx,
)
.await?;

View File

@@ -1,6 +1,7 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use std::os::fd::OwnedFd;
use std::os::unix::fs::OpenOptionsExt;
use std::path::Path;
use super::io_engine::IoEngine;
@@ -43,7 +44,7 @@ impl OpenOptions {
self.write
}
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
pub fn read(mut self, read: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.read(read);
@@ -56,7 +57,7 @@ impl OpenOptions {
self
}
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
pub fn write(mut self, write: bool) -> Self {
self.write = write;
match &mut self.inner {
Inner::StdFs(x) => {
@@ -70,7 +71,7 @@ impl OpenOptions {
self
}
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
pub fn create(mut self, create: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create(create);
@@ -83,7 +84,7 @@ impl OpenOptions {
self
}
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
pub fn create_new(mut self, create_new: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create_new(create_new);
@@ -96,7 +97,7 @@ impl OpenOptions {
self
}
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
pub fn truncate(mut self, truncate: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.truncate(truncate);
@@ -124,10 +125,8 @@ impl OpenOptions {
}
}
}
}
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
pub fn mode(mut self, mode: u32) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.mode(mode);
@@ -140,7 +139,7 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
self
}
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
pub fn custom_flags(mut self, flags: i32) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);

View File

@@ -26,6 +26,7 @@
#include "portability/instr_time.h"
#include "postmaster/interrupt.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
@@ -79,6 +80,7 @@ int neon_protocol_version = 3;
static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
static int max_sockets;
static int pageserver_response_log_timeout = 10000;
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
@@ -336,6 +338,13 @@ load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p)
pageserver_disconnect(i);
}
pagestore_local_counter = end_update_counter;
/* Reserve file descriptors for sockets */
while (max_sockets < num_shards)
{
max_sockets += 1;
ReserveExternalFD();
}
}
if (num_shards_p)

View File

@@ -4,6 +4,7 @@
#include "miscadmin.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "storage/ipc.h"
#include "storage/shmem.h"
#include "storage/buf_internals.h"
@@ -396,9 +397,10 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
XLogRecPtr
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
{
if (lsn < FirstNormalUnloggedLSN || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
return lsn;
Assert(lsn >= WalSegMinSize);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
lsn = SetLastWrittenLSNForBlockRangeInternal(lsn, rlocator, forknum, from, n_blocks);
LWLockRelease(LastWrittenLsnLock);
@@ -435,7 +437,6 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
NInfoGetRelNumber(relfilenode) == InvalidOid)
return InvalidXLogRecPtr;
BufTagInit(key, relNumber, forknum, blockno, spcOid, dbOid);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
@@ -444,6 +445,10 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
{
XLogRecPtr lsn = lsns[i];
if (lsn == InvalidXLogRecPtr)
continue;
Assert(lsn >= WalSegMinSize);
key.blockNum = blockno + i;
entry = hash_search(lastWrittenLsnCache, &key, HASH_ENTER, &found);
if (found)

View File

@@ -150,7 +150,7 @@ NeonWALReaderFree(NeonWALReader *state)
* fetched from timeline 'tli'.
*
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
* occurs, in which case 'err' has the desciption. Error always closes remote
* occurs, in which case 'err' has the description. Error always closes remote
* connection, if there was any, so socket subscription should be removed.
*
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with

View File

@@ -1989,8 +1989,14 @@ neon_start_unlogged_build(SMgrRelation reln)
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
#if PG_MAJORVERSION_NUM >= 17
/*
* We have to disable this check for pg14-16 because sorted build of GIST index requires
* to perform unlogged build several times
*/
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
#endif
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;

View File

@@ -124,6 +124,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
else
{
wp->safekeepers_generation = INVALID_GENERATION;
host = wp->config->safekeepers_list;
}
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
@@ -756,7 +757,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.members.m[i];
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
if (sk_id->node_id == sk->greetResponse.nodeId)
{
/*
* If mconf or list of safekeepers to connect to changed (the
@@ -781,7 +782,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
if (sk_id->node_id == sk->greetResponse.nodeId)
{
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
{
@@ -1071,7 +1072,6 @@ RecvVoteResponse(Safekeeper *sk)
/* ready for elected message */
sk->state = SS_WAIT_ELECTED;
wp->n_votes++;
/* Are we already elected? */
if (wp->state == WPS_CAMPAIGN)
{

View File

@@ -845,9 +845,6 @@ typedef struct WalProposer
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;

View File

@@ -409,14 +409,22 @@ impl JwkCacheEntryLock {
if let Some(exp) = payload.expiration {
if now >= exp + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired));
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
exp.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
)));
}
}
if let Some(nbf) = payload.not_before {
if nbf >= now + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(
JwtClaimsError::JwtTokenNotYetReadyToUse,
JwtClaimsError::JwtTokenNotYetReadyToUse(
nbf.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
),
));
}
}
@@ -534,10 +542,10 @@ struct JwtPayload<'a> {
#[serde(rename = "aud", default)]
audience: OneOrMany,
/// Expiration - Time after which the JWT expires
#[serde(deserialize_with = "numeric_date_opt", rename = "exp", default)]
#[serde(rename = "exp", deserialize_with = "numeric_date_opt", default)]
expiration: Option<SystemTime>,
/// Not before - Time after which the JWT expires
#[serde(deserialize_with = "numeric_date_opt", rename = "nbf", default)]
/// Not before - Time before which the JWT is not valid
#[serde(rename = "nbf", deserialize_with = "numeric_date_opt", default)]
not_before: Option<SystemTime>,
// the following entries are only extracted for the sake of debug logging.
@@ -609,8 +617,15 @@ impl<'de> Deserialize<'de> for OneOrMany {
}
fn numeric_date_opt<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
let d = <Option<u64>>::deserialize(d)?;
Ok(d.map(|n| SystemTime::UNIX_EPOCH + Duration::from_secs(n)))
<Option<u64>>::deserialize(d)?
.map(|t| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(t))
.ok_or_else(|| {
serde::de::Error::custom(format_args!("timestamp out of bounds: {t}"))
})
})
.transpose()
}
struct JwkRenewalPermit<'a> {
@@ -746,11 +761,11 @@ pub enum JwtClaimsError {
#[error("invalid JWT token audience")]
InvalidJwtTokenAudience,
#[error("JWT token has expired")]
JwtTokenHasExpired,
#[error("JWT token has expired (exp={0})")]
JwtTokenHasExpired(u64),
#[error("JWT token is not yet ready to use")]
JwtTokenNotYetReadyToUse,
#[error("JWT token is not yet ready to use (nbf={0})")]
JwtTokenNotYetReadyToUse(u64),
}
#[allow(dead_code, reason = "Debug use only")]
@@ -1233,14 +1248,14 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
"nbf": now + 60,
"aud": "neon",
}},
error: JwtClaimsError::JwtTokenNotYetReadyToUse,
error: JwtClaimsError::JwtTokenNotYetReadyToUse(now + 60),
},
Test {
body: json! {{
"exp": now - 60,
"aud": ["neon"],
}},
error: JwtClaimsError::JwtTokenHasExpired,
error: JwtClaimsError::JwtTokenHasExpired(now - 60),
},
Test {
body: json! {{

View File

@@ -32,12 +32,6 @@ pub(crate) enum ComputeUserInfoParseError {
option: EndpointId,
},
#[error(
"Common name inferred from SNI ('{}') is not known",
.cn,
)]
UnknownCommonName { cn: String },
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
MalformedProjectName(EndpointId),
}
@@ -66,22 +60,15 @@ impl ComputeUserInfoMaybeEndpoint {
}
}
pub(crate) fn endpoint_sni(
sni: &str,
common_names: &HashSet<String>,
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
let Some((subdomain, common_name)) = sni.split_once('.') else {
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
};
pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<EndpointId> {
let (subdomain, common_name) = sni.split_once('.')?;
if !common_names.contains(common_name) {
return Err(ComputeUserInfoParseError::UnknownCommonName {
cn: common_name.into(),
});
return None;
}
if subdomain == SERVERLESS_DRIVER_SNI {
return Ok(None);
return None;
}
Ok(Some(EndpointId::from(subdomain)))
Some(EndpointId::from(subdomain))
}
impl ComputeUserInfoMaybeEndpoint {
@@ -113,15 +100,8 @@ impl ComputeUserInfoMaybeEndpoint {
})
.map(|name| name.into());
let endpoint_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
endpoint_sni(sni_str, cn)?
} else {
None
}
} else {
None
};
let endpoint_from_domain =
sni.and_then(|sni_str| common_names.and_then(|cn| endpoint_sni(sni_str, cn)));
let endpoint = match (endpoint_option, endpoint_from_domain) {
// Invariant: if we have both project name variants, they should match.
@@ -424,21 +404,34 @@ mod tests {
}
#[test]
fn parse_inconsistent_sni() {
fn parse_unknown_sni() {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let ctx = RequestContext::test();
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.expect_err("should fail");
match err {
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
}
_ => panic!("bad error: {err:?}"),
}
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.unwrap();
assert!(info.endpoint_id.is_none());
}
#[test]
fn parse_unknown_sni_with_options() {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "endpoint=foo-bar-baz-1234"),
]);
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let ctx = RequestContext::test();
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.unwrap();
assert_eq!(info.endpoint_id.as_deref(), Some("foo-bar-baz-1234"));
}
#[test]

View File

@@ -24,9 +24,6 @@ pub(crate) enum HandshakeError {
#[error("protocol violation")]
ProtocolViolation,
#[error("missing certificate")]
MissingCertificate,
#[error("{0}")]
StreamUpgradeError(#[from] StreamUpgradeError),
@@ -42,10 +39,6 @@ impl ReportableError for HandshakeError {
match self {
HandshakeError::EarlyData => crate::error::ErrorKind::User,
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
// This error should not happen, but will if we have no default certificate and
// the client sends no SNI extension.
// If they provide SNI then we can be sure there is a certificate that matches.
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
StreamUpgradeError::Io(_) => crate::error::ErrorKind::ClientDisconnect,
@@ -146,7 +139,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
// try parse endpoint
let ep = conn_info
.server_name()
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
.and_then(|sni| endpoint_sni(sni, &tls.common_names));
if let Some(ep) = ep {
ctx.set_endpoint_id(ep);
}
@@ -161,10 +154,8 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
let (_, tls_server_end_point) = tls
.cert_resolver
.resolve(conn_info.server_name())
.ok_or(HandshakeError::MissingCertificate)?;
let (_, tls_server_end_point) =
tls.cert_resolver.resolve(conn_info.server_name());
stream = PqStream {
framed: Framed {

View File

@@ -98,8 +98,7 @@ fn generate_tls_config<'a>(
.with_no_client_auth()
.with_single_cert(vec![cert.clone()], key.clone_key())?;
let mut cert_resolver = CertResolver::new();
cert_resolver.add_cert(key, vec![cert], true)?;
let cert_resolver = CertResolver::new(key, vec![cert])?;
let common_names = cert_resolver.get_common_names();

View File

@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.3.0";
pub(crate) const EXT_VERSION: &str = "0.3.1";
pub(crate) const EXT_SCHEMA: &str = "auth";
#[derive(Clone)]

View File

@@ -199,8 +199,7 @@ fn get_conn_info(
let endpoint = match connection_url.host() {
Some(url::Host::Domain(hostname)) => {
if let Some(tls) = tls {
endpoint_sni(hostname, &tls.common_names)?
.ok_or(ConnInfoError::MalformedEndpoint)?
endpoint_sni(hostname, &tls.common_names).ok_or(ConnInfoError::MalformedEndpoint)?
} else {
hostname
.split_once('.')

View File

@@ -5,6 +5,7 @@ use anyhow::{Context, bail};
use itertools::Itertools;
use rustls::crypto::ring::{self, sign};
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::sign::CertifiedKey;
use x509_cert::der::{Reader, SliceReader};
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
@@ -25,10 +26,8 @@ pub fn configure_tls(
certs_dir: Option<&String>,
allow_tls_keylogfile: bool,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
// add default certificate
cert_resolver.add_cert_path(key_path, cert_path, true)?;
let mut cert_resolver = CertResolver::parse_new(key_path, cert_path)?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
@@ -40,11 +39,8 @@ pub fn configure_tls(
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver.add_cert_path(
&key_path.to_string_lossy(),
&cert_path.to_string_lossy(),
false,
)?;
cert_resolver
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
}
}
}
@@ -83,92 +79,42 @@ pub fn configure_tls(
})
}
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct CertResolver {
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
default: (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint),
}
impl CertResolver {
pub fn new() -> Self {
Self::default()
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
Self::new(priv_key, cert_chain)
}
fn add_cert_path(
&mut self,
key_path: &str,
cert_path: &str,
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
pub fn new(
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
) -> anyhow::Result<Self> {
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
})?
};
self.add_cert(priv_key, cert_chain, is_default)
let mut certs = HashMap::new();
let default = (cert.clone(), tls_server_end_point);
certs.insert(common_name, (cert, tls_server_end_point));
Ok(Self { certs, default })
}
pub fn add_cert(
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
self.add_cert(priv_key, cert_chain)
}
fn add_cert(
&mut self,
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
is_default: bool,
) -> anyhow::Result<()> {
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let certificate = SliceReader::new(first_cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
let common_name = certificate.tbs_certificate.subject.to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
if is_default {
self.default = Some((cert.clone(), tls_server_end_point));
}
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
self.certs.insert(common_name, (cert, tls_server_end_point));
Ok(())
}
@@ -177,12 +123,82 @@ impl CertResolver {
}
}
fn parse_key_cert(
key_path: &str,
cert_path: &str,
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
)
})?
};
Ok((priv_key, cert_chain))
}
fn process_key_cert(
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
) -> anyhow::Result<(String, Arc<CertifiedKey>, TlsServerEndPoint)> {
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let certificate = SliceReader::new(first_cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
let common_name = certificate.tbs_certificate.subject.to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
Ok((common_name, cert, tls_server_end_point))
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
client_hello: rustls::server::ClientHello<'_>,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
self.resolve(client_hello.server_name()).map(|x| x.0)
Some(self.resolve(client_hello.server_name()).0)
}
}
@@ -190,7 +206,7 @@ impl CertResolver {
pub fn resolve(
&self,
server_name: Option<&str>,
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
) -> (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint) {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
@@ -200,12 +216,17 @@ impl CertResolver {
if let Some(mut sni_name) = server_name {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return Some(cert.clone());
return cert.clone();
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
return None;
// The customer has some custom DNS mapping - just return
// a default certificate.
//
// This will error if the customer uses anything stronger
// than sslmode=require. That's a choice they can make.
return self.default.clone();
}
}
} else {

View File

@@ -5181,7 +5181,8 @@ impl Service {
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
tenants.retain(|_id, s| s.shard.count != *new_shard_count);
tenants
.retain(|id, s| !(id.tenant_id == *tenant_id && s.shard.count == *new_shard_count));
detach_locations
};

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import urllib.parse
from enum import StrEnum
from typing import TYPE_CHECKING, final
import requests
@@ -9,11 +10,23 @@ from requests.auth import AuthBase
from typing_extensions import override
from fixtures.log_helper import log
from fixtures.utils import wait_until
if TYPE_CHECKING:
from requests import PreparedRequest
COMPUTE_AUDIENCE = "compute"
"""
The value to place in the `aud` claim.
"""
@final
class ComputeClaimsScope(StrEnum):
ADMIN = "admin"
@final
class BearerAuth(AuthBase):
"""
@@ -50,6 +63,35 @@ class EndpointHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def prewarm_lfc_status(self) -> dict[str, str]:
res = self.get(f"http://localhost:{self.external_port}/lfc/prewarm")
res.raise_for_status()
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self):
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, error {err}"
wait_until(prewarmed)
def offload_lfc(self):
url = f"http://localhost:{self.external_port}/lfc/offload"
self.post(url).raise_for_status()
def offloaded():
res = self.get(url)
res.raise_for_status()
json = res.json()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, error {err}"
wait_until(offloaded)
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",

View File

@@ -21,6 +21,7 @@ if TYPE_CHECKING:
Any,
)
from fixtures.endpoint.http import ComputeClaimsScope
from fixtures.pg_version import PgVersion
@@ -535,12 +536,16 @@ class NeonLocalCli(AbstractNeonCli):
res.check_returncode()
return res
def endpoint_generate_jwt(self, endpoint_id: str) -> str:
def endpoint_generate_jwt(
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.
"""
args = ["endpoint", "generate-jwt", endpoint_id]
if scope:
args += ["--scope", str(scope)]
cmd = self.raw_cli(args)
cmd.check_returncode()

View File

@@ -51,7 +51,7 @@ from fixtures.common_types import (
TimelineId,
)
from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.endpoint.http import ComputeClaimsScope, EndpointHttpClient
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.neon_cli import NeonLocalCli, Pagectl
@@ -1185,7 +1185,9 @@ class NeonEnv:
"broker": {},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {"port": self.port_distributor.get_port()},
"endpoint_storage": {
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
@@ -4218,7 +4220,7 @@ class Endpoint(PgProtocol, LogUtils):
self.config(config_lines)
self.__jwt = self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id)
self.__jwt = self.generate_jwt()
return self
@@ -4265,6 +4267,14 @@ class Endpoint(PgProtocol, LogUtils):
return self
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.
"""
assert self.endpoint_id is not None
return self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id, scope)
def endpoint_path(self) -> Path:
"""Path to endpoint directory"""
assert self.endpoint_id

View File

@@ -1,4 +1,5 @@
import math # Add this import
import os
import time
import traceback
from pathlib import Path
@@ -87,7 +88,10 @@ def test_cumulative_statistics_persistence(
- insert additional tuples that by itself are not enough to trigger auto-vacuum but in combination with the previous tuples are
- verify that autovacuum is triggered by the combination of tuples inserted before and after endpoint suspension
"""
project = neon_api.create_project(pg_version)
project = neon_api.create_project(
pg_version,
f"Test cumulative statistics persistence, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
endpoint_id = project["endpoints"][0]["id"]

View File

@@ -62,7 +62,9 @@ def test_ro_replica_lag(
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
project = neon_api.create_project(pg_version)
project = neon_api.create_project(
pg_version, f"Test readonly replica lag, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
@@ -195,7 +197,9 @@ def test_replication_start_stop(
pgbench_duration = f"-T{2**num_replicas * configuration_test_time_sec}"
error_occurred = False
project = neon_api.create_project(pg_version)
project = neon_api.create_project(
pg_version, f"Test replication start stop, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])

View File

@@ -206,7 +206,7 @@ class NeonProject:
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}"
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]

View File

@@ -202,6 +202,8 @@ def test_pageserver_gc_compaction_preempt(
env = neon_env_builder.init_start(initial_tenant_conf=conf)
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
env.pageserver.allowed_errors.append(".*flush task cancelled.*")
env.pageserver.allowed_errors.append(".*failed to pipe.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from http.client import FORBIDDEN, UNAUTHORIZED
from typing import TYPE_CHECKING
import jwt
import pytest
from fixtures.endpoint.http import COMPUTE_AUDIENCE, ComputeClaimsScope, EndpointHttpClient
from fixtures.utils import run_only_on_default_postgres
from requests import RequestException
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
def test_compute_no_scope_claim(neon_simple_env: NeonEnv):
"""
Test that if the JWT scope is not admin and no compute_id is specified,
the external HTTP server returns a 403 Forbidden error.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
# Encode nothing in the token
token = jwt.encode({}, env.auth_keys.priv, algorithm="EdDSA")
# Create an admin-scoped HTTP client
client = EndpointHttpClient(
external_port=endpoint.external_http_port,
internal_port=endpoint.internal_http_port,
jwt=token,
)
try:
client.status()
pytest.fail("Exception should have been raised")
except RequestException as e:
assert e.response is not None
assert e.response.status_code == FORBIDDEN
@pytest.mark.parametrize(
"audience",
(COMPUTE_AUDIENCE, "invalid", None),
ids=["with_audience", "with_invalid_audience", "without_audience"],
)
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
def test_compute_admin_scope_claim(neon_simple_env: NeonEnv, audience: str | None):
"""
Test that an admin-scoped JWT can access the compute's external HTTP server
without the compute_id being specified in the claims.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
data: dict[str, str | list[str]] = {"scope": str(ComputeClaimsScope.ADMIN)}
if audience:
data["aud"] = [audience]
token = jwt.encode(data, env.auth_keys.priv, algorithm="EdDSA")
# Create an admin-scoped HTTP client
client = EndpointHttpClient(
external_port=endpoint.external_http_port,
internal_port=endpoint.internal_http_port,
jwt=token,
)
try:
client.status()
if audience != COMPUTE_AUDIENCE:
pytest.fail("Exception should have been raised")
except RequestException as e:
assert e.response is not None
assert e.response.status_code == UNAUTHORIZED

View File

@@ -4,10 +4,12 @@ import pytest
from aiohttp import ClientSession
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import run_only_on_default_postgres
from jwcrypto import jwk, jwt
@pytest.mark.asyncio
@run_only_on_default_postgres("test doesn't use postgres")
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
"""
Inserts, retrieves, and deletes test file using a JWT token
@@ -35,7 +37,6 @@ async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv)
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
headers = {"Authorization": f"Bearer {token}"}
log.info(f"cache key url {key}")
log.info(f"token {token}")
async with ClientSession(headers=headers) as session:
async with session.get(key) as res:

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
#
# Test unlogged build for GIST index
#
def test_gist(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
con = endpoint.connect()
cur = con.cursor()
iterations = 100
for _ in range(iterations):
cur.execute(
"CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off)"
)
cur.execute(
"INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i"
)
cur.execute("CREATE INDEX gist_pvactst ON pvactst USING gist (p)")
cur.execute("VACUUM pvactst")
cur.execute("DROP TABLE pvactst")

View File

@@ -1,11 +1,24 @@
import random
import threading
import time
from enum import Enum
import pytest
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
from prometheus_client.parser import text_string_to_metric_families as prom_parse_impl
class LfcQueryMethod(Enum):
COMPUTE_CTL = False
POSTGRES = True
PREWARM_LABEL = "compute_ctl_lfc_prewarm_requests_total"
OFFLOAD_LABEL = "compute_ctl_lfc_offload_requests_total"
QUERY_OPTIONS = LfcQueryMethod.POSTGRES, LfcQueryMethod.COMPUTE_CTL
def check_pinned_entries(cur):
@@ -19,11 +32,20 @@ def check_pinned_entries(cur):
assert n_pinned == 0
def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
return {
sample.name: sample.value
for family in prom_parse_impl(client.metrics())
for sample in family.samples
if sample.name in (PREWARM_LABEL, OFFLOAD_LABEL)
}
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
@@ -34,30 +56,57 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon version '1.6'")
pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
log.info(f"Inserting {n_records} rows")
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
http_client = endpoint.http_client()
if query is LfcQueryMethod.COMPUTE_CTL:
status = http_client.prewarm_lfc_status()
assert status["status"] == "not_prewarmed"
assert "error" not in status
http_client.offload_lfc()
assert http_client.prewarm_lfc_status()["status"] == "not_prewarmed"
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0}
else:
pg_cur.execute("select get_local_cache_state()")
lfc_state = pg_cur.fetchall()[0][0]
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
cur.execute("alter extension neon update to '1.6'")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
# wait until compute_ctl completes downgrade of extension to default version
time.sleep(1)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("alter extension neon update to '1.6'")
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = pg_cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
pg_cur.execute("select * from get_prewarm_info()")
prewarm_info = pg_cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
total, prewarmed, skipped, _ = prewarm_info
progress = (prewarmed + skipped) * 100 // total
log.info(f"Prewarm progress: {progress}%")
assert lfc_used_pages > 10000
assert (
@@ -66,18 +115,23 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
)
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
lfc_cur.execute("select sum(pk) from t")
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
check_pinned_entries(cur)
check_pinned_entries(pg_cur)
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
if query is LfcQueryMethod.COMPUTE_CTL:
assert http_client.prewarm_lfc_status() == desired
assert prom_parse(http_client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1}
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMethod):
env = neon_simple_env
n_records = 10000
n_threads = 4
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
@@ -87,40 +141,58 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
"neon.file_cache_prewarm_limit=1000000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute(
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon version '1.6'")
pg_cur.execute("CREATE DATABASE lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
lfc_cur.execute(
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
)
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
log.info(f"Inserting {n_records} rows")
lfc_cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
http_client = endpoint.http_client()
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.offload_lfc()
else:
pg_cur.execute("select get_local_cache_state()")
lfc_state = pg_cur.fetchall()[0][0]
running = True
n_prewarms = 0
def workload():
conn = endpoint.connect()
cur = conn.cursor()
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
n_transfers = 0
while running:
src = random.randint(1, n_records)
dst = random.randint(1, n_records)
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
lfc_cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
lfc_cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
n_transfers += 1
log.info(f"Number of transfers: {n_transfers}")
def prewarm():
conn = endpoint.connect()
cur = conn.cursor()
n_prewarms = 0
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
while running:
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
cur.execute("select pg_reload_conf()")
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
pg_cur.execute("alter system set neon.file_cache_size_limit='1MB'")
pg_cur.execute("select pg_reload_conf()")
pg_cur.execute("alter system set neon.file_cache_size_limit='1GB'")
pg_cur.execute("select pg_reload_conf()")
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
nonlocal n_prewarms
n_prewarms += 1
log.info(f"Number of prewarms: {n_prewarms}")
@@ -140,8 +212,10 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
t.join()
prewarm_thread.join()
cur.execute("select sum(balance) from accounts")
total_balance = cur.fetchall()[0][0]
lfc_cur.execute("select sum(balance) from accounts")
total_balance = lfc_cur.fetchall()[0][0]
assert total_balance == 0
check_pinned_entries(cur)
check_pinned_entries(pg_cur)
if query is LfcQueryMethod.COMPUTE_CTL:
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: n_prewarms}

View File

@@ -1334,6 +1334,13 @@ def test_sharding_split_failures(
tenant_id, timeline_id, shard_count=initial_shard_count, placement_policy='{"Attached":1}'
)
# Create bystander tenants with various shard counts. They should not be affected by the aborted
# splits. Regression test for https://github.com/neondatabase/cloud/issues/28589.
bystanders = {} # id → shard_count
for bystander_shard_count in [1, 2, 4, 8]:
id, _ = env.create_tenant(shard_count=bystander_shard_count)
bystanders[id] = bystander_shard_count
env.storage_controller.allowed_errors.extend(
[
# All split failures log a warning when then enqueue the abort operation
@@ -1394,6 +1401,8 @@ def test_sharding_split_failures(
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
if tenant_shard_id.tenant_id != tenant_id:
continue # skip bystanders
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
assert tenant_shard_id.shard_count == initial_shard_count
if loc[1]["mode"] == "Secondary":
@@ -1414,6 +1423,8 @@ def test_sharding_split_failures(
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
if tenant_shard_id.tenant_id != tenant_id:
continue # skip bystanders
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
assert tenant_shard_id.shard_count == split_shard_count
if loc[1]["mode"] == "Secondary":
@@ -1496,6 +1507,12 @@ def test_sharding_split_failures(
# the scheduler reaches an idle state
env.storage_controller.reconcile_until_idle(timeout_secs=30)
# Check that all bystanders are still around.
for bystander_id, bystander_shard_count in bystanders.items():
response = env.storage_controller.tenant_describe(bystander_id)
assert TenantId(response["tenant_id"]) == bystander_id
assert len(response["shards"]) == bystander_shard_count
env.storage_controller.consistency_check()

View File

@@ -5,14 +5,14 @@
],
"v16": [
"16.8",
"37496f87b5324af53c56127e278ee5b1e8435253"
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
],
"v15": [
"15.12",
"8ecb12f21d862dfa39f7204b8f5e1c00a2a225b3"
"b838c8969b7c63f3e637a769656f5f36793b797c"
],
"v14": [
"14.17",
"d3c9d61fb7a362a165dac7060819dd9d6ad68c28"
"c8dab02bfc003ae7bd59096919042d7840f3c194"
]
}