mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
47 Commits
rc/release
...
rel_size_c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9788d7cbd | ||
|
|
374495c041 | ||
|
|
ca4d758504 | ||
|
|
d5a7ecade5 | ||
|
|
93b964f829 | ||
|
|
d0aaec2abb | ||
|
|
d0dc65da12 | ||
|
|
03d635b916 | ||
|
|
5cd7f936f9 | ||
|
|
101e115b38 | ||
|
|
b37bb7d7ed | ||
|
|
bef5954fd7 | ||
|
|
8477d15f95 | ||
|
|
622b3b2993 | ||
|
|
659366060d | ||
|
|
42d93031a1 | ||
|
|
d22377c754 | ||
|
|
6c70789cfd | ||
|
|
7e55497e13 | ||
|
|
40f32ea326 | ||
|
|
1d1502bc16 | ||
|
|
7eb85c56ac | ||
|
|
24d62c647f | ||
|
|
4d2e4b19c3 | ||
|
|
0691b73f53 | ||
|
|
3cf5e1386c | ||
|
|
608afc3055 | ||
|
|
0ef6851219 | ||
|
|
5c356c63eb | ||
|
|
384e3df2ad | ||
|
|
f9b3a2e059 | ||
|
|
79ee78ea32 | ||
|
|
0e0ad073bf | ||
|
|
6827f2f58c | ||
|
|
c82e363ed9 | ||
|
|
50dc2fae77 | ||
|
|
62ac5b94b3 | ||
|
|
f0e7b3e0ef | ||
|
|
c6ff18affc | ||
|
|
16ca74a3f4 | ||
|
|
cb67f9a651 | ||
|
|
baf425a2cd | ||
|
|
0b243242df | ||
|
|
6131d86ec9 | ||
|
|
4b9087651c | ||
|
|
79699aebc8 | ||
|
|
22290eb7ba |
5
.github/actionlint.yml
vendored
5
.github/actionlint.yml
vendored
@@ -33,9 +33,14 @@ config-variables:
|
||||
- REMOTE_STORAGE_AZURE_CONTAINER
|
||||
- REMOTE_STORAGE_AZURE_REGION
|
||||
- SLACK_CICD_CHANNEL_ID
|
||||
- SLACK_COMPUTE_CHANNEL_ID
|
||||
- SLACK_ON_CALL_DEVPROD_STREAM
|
||||
- SLACK_ON_CALL_QA_STAGING_STREAM
|
||||
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
|
||||
- SLACK_ONCALL_COMPUTE_GROUP
|
||||
- SLACK_ONCALL_PROXY_GROUP
|
||||
- SLACK_ONCALL_STORAGE_GROUP
|
||||
- SLACK_PROXY_CHANNEL_ID
|
||||
- SLACK_RUST_CHANNEL_ID
|
||||
- SLACK_STORAGE_CHANNEL_ID
|
||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||
|
||||
71
.github/workflows/benchmarking.yml
vendored
71
.github/workflows/benchmarking.yml
vendored
@@ -53,6 +53,77 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
cleanup:
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
env:
|
||||
ORG_ID: org-solitary-dew-09443886
|
||||
LIMIT: 100
|
||||
SEARCH: "GITHUB_RUN_ID="
|
||||
BASE_URL: https://console-stage.neon.build/api/v2
|
||||
DRY_RUN: "false" # Set to "true" to just test out the workflow
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Cleanup inactive Neon projects left over from prior runs
|
||||
env:
|
||||
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
|
||||
NOW=$(date -u +%s)
|
||||
DAYS_AGO=$((NOW - 5 * 86400))
|
||||
|
||||
REQUEST_URL="$BASE_URL/projects?limit=$LIMIT&search=$(printf '%s' "$SEARCH" | jq -sRr @uri)&org_id=$ORG_ID"
|
||||
|
||||
echo "Requesting project list from:"
|
||||
echo "$REQUEST_URL"
|
||||
|
||||
response=$(curl -s -X GET "$REQUEST_URL" \
|
||||
--header "Accept: application/json" \
|
||||
--header "Content-Type: application/json" \
|
||||
--header "Authorization: Bearer ${API_KEY}" )
|
||||
|
||||
echo "Response:"
|
||||
echo "$response" | jq .
|
||||
|
||||
projects_to_delete=$(echo "$response" | jq --argjson cutoff "$DAYS_AGO" '
|
||||
.projects[]
|
||||
| select(.compute_last_active_at != null)
|
||||
| select((.compute_last_active_at | fromdateiso8601) < $cutoff)
|
||||
| {id, name, compute_last_active_at}
|
||||
')
|
||||
|
||||
if [ -z "$projects_to_delete" ]; then
|
||||
echo "No projects eligible for deletion."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "Projects that will be deleted:"
|
||||
echo "$projects_to_delete" | jq -r '.id'
|
||||
|
||||
if [ "$DRY_RUN" = "false" ]; then
|
||||
echo "$projects_to_delete" | jq -r '.id' | while read -r project_id; do
|
||||
echo "Deleting project: $project_id"
|
||||
curl -s -X DELETE "$BASE_URL/projects/$project_id" \
|
||||
--header "Accept: application/json" \
|
||||
--header "Content-Type: application/json" \
|
||||
--header "Authorization: Bearer ${API_KEY}"
|
||||
done
|
||||
else
|
||||
echo "Dry run enabled — no projects were deleted."
|
||||
fi
|
||||
bench:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
permissions:
|
||||
|
||||
37
.github/workflows/build_and_test.yml
vendored
37
.github/workflows/build_and_test.yml
vendored
@@ -1434,10 +1434,10 @@ jobs:
|
||||
;;
|
||||
esac
|
||||
|
||||
notify-storage-release-deploy-failure:
|
||||
needs: [ deploy ]
|
||||
notify-release-deploy-failure:
|
||||
needs: [ meta, deploy ]
|
||||
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
|
||||
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
|
||||
if: contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), needs.meta.outputs.run-kind) && needs.deploy.result != 'success' && always()
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
@@ -1445,15 +1445,40 @@ jobs:
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- name: Post release-deploy failure to team-storage slack channel
|
||||
- name: Post release-deploy failure to team slack channel
|
||||
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
|
||||
env:
|
||||
TEAM_ONCALL: >-
|
||||
${{
|
||||
fromJSON(format('{
|
||||
"storage-release": "<!subteam^{0}|@oncall-storage>",
|
||||
"compute-release": "<!subteam^{1}|@oncall-compute>",
|
||||
"proxy-release": "<!subteam^{2}|@oncall-proxy>"
|
||||
}',
|
||||
vars.SLACK_ONCALL_STORAGE_GROUP,
|
||||
vars.SLACK_ONCALL_COMPUTE_GROUP,
|
||||
vars.SLACK_ONCALL_PROXY_GROUP
|
||||
))[needs.meta.outputs.run-kind]
|
||||
}}
|
||||
CHANNEL: >-
|
||||
${{
|
||||
fromJSON(format('{
|
||||
"storage-release": "{0}",
|
||||
"compute-release": "{1}",
|
||||
"proxy-release": "{2}"
|
||||
}',
|
||||
vars.SLACK_STORAGE_CHANNEL_ID,
|
||||
vars.SLACK_COMPUTE_CHANNEL_ID,
|
||||
vars.SLACK_PROXY_CHANNEL_ID
|
||||
))[needs.meta.outputs.run-kind]
|
||||
}}
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
|
||||
channel: ${{ env.CHANNEL }}
|
||||
text: |
|
||||
🔴 <!subteam^S06CJ87UMNY|@oncall-storage>: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
|
||||
🔴 ${{ env.TEAM_ONCALL }}: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
|
||||
|
||||
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
|
||||
promote-compatibility-data:
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1284,6 +1284,7 @@ name = "compute_tools"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"aws-config",
|
||||
"aws-sdk-kms",
|
||||
"aws-sdk-s3",
|
||||
@@ -1302,6 +1303,7 @@ dependencies = [
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"indexmap 2.0.1",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
@@ -1420,6 +1422,7 @@ dependencies = [
|
||||
"clap",
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"endpoint_storage",
|
||||
"futures",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
|
||||
@@ -243,6 +243,7 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
|
||||
http-utils = { version = "0.1", path = "./libs/http-utils/" }
|
||||
metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
pageserver = { path = "./pageserver" }
|
||||
|
||||
@@ -1084,23 +1084,12 @@ RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
|
||||
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions pgrx14"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
|
||||
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions pgrx14"
|
||||
#
|
||||
# Version 14 is now required by a few
|
||||
# Version 14 is now required by a few
|
||||
# This layer should be used as a base for new pgrx extensions,
|
||||
# and eventually get merged with `rust-extensions-build`
|
||||
#
|
||||
@@ -1333,8 +1322,8 @@ ARG PG_VERSION
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
WORKDIR /ext-src
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.1.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "62fec9e472cb805c53ba24a0765afdb8ea2720cfc03ae7813e61687b36d1b0ad pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
|
||||
@@ -1362,7 +1351,8 @@ COPY compute/patches/anon_v2.patch .
|
||||
# This is an experimental extension, never got to real production.
|
||||
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/latest/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
|
||||
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/2.1.0/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "48e7f5ae2f1ca516df3da86c5c739d48dd780a4e885705704ccaad0faa89d6c0 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt && \
|
||||
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "=0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
|
||||
@@ -10,6 +10,7 @@ default = []
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
async-compression.workspace = true
|
||||
base64.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
@@ -27,6 +28,7 @@ flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
indexmap.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
|
||||
@@ -60,12 +60,16 @@ use utils::failpoint_support;
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
fn parse_remote_ext_config(arg: &str) -> Result<String> {
|
||||
if arg.starts_with("http") {
|
||||
Ok(arg.trim_end_matches('/').to_string())
|
||||
fn parse_remote_ext_base_url(arg: &str) -> Result<String> {
|
||||
const FALLBACK_PG_EXT_GATEWAY_BASE_URL: &str =
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local";
|
||||
|
||||
Ok(if arg.starts_with("http") {
|
||||
arg
|
||||
} else {
|
||||
Ok("http://pg-ext-s3-gateway".to_string())
|
||||
FALLBACK_PG_EXT_GATEWAY_BASE_URL
|
||||
}
|
||||
.to_owned())
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -74,8 +78,10 @@ struct Cli {
|
||||
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
|
||||
pub pgbin: String,
|
||||
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
|
||||
pub remote_ext_config: Option<String>,
|
||||
/// The base URL for the remote extension storage proxy gateway.
|
||||
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_base_url, alias = "remote-ext-config")]
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
|
||||
/// The port to bind the external listening HTTP server to. Clients running
|
||||
/// outside the compute will talk to the compute through this port. Keep
|
||||
@@ -164,7 +170,7 @@ fn main() -> Result<()> {
|
||||
pgversion: get_pg_version_string(&cli.pgbin),
|
||||
external_http_port: cli.external_http_port,
|
||||
internal_http_port: cli.internal_http_port,
|
||||
ext_remote_storage: cli.remote_ext_config.clone(),
|
||||
remote_ext_base_url: cli.remote_ext_base_url.clone(),
|
||||
resize_swap_on_bind: cli.resize_swap_on_bind,
|
||||
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -265,4 +271,18 @@ mod test {
|
||||
fn verify_cli() {
|
||||
Cli::command().debug_assert()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_pg_ext_gateway_base_url() {
|
||||
let arg = "http://pg-ext-s3-gateway2";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(result, arg);
|
||||
|
||||
let arg = "pg-ext-s3-gateway";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,6 +348,7 @@ async fn run_dump_restore(
|
||||
"--no-security-labels".to_string(),
|
||||
"--no-subscriptions".to_string(),
|
||||
"--no-tablespaces".to_string(),
|
||||
"--no-event-triggers".to_string(),
|
||||
// format
|
||||
"--format".to_string(),
|
||||
"directory".to_string(),
|
||||
|
||||
@@ -1,4 +1,26 @@
|
||||
use std::collections::HashMap;
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
|
||||
LfcPrewarmState,
|
||||
};
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use itertools::Itertools;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
@@ -7,24 +29,6 @@ use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
use tokio::spawn;
|
||||
use tracing::{Instrument, debug, error, info, instrument, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -92,7 +96,7 @@ pub struct ComputeNodeParams {
|
||||
pub internal_http_port: u16,
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub ext_remote_storage: Option<String>,
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
}
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
@@ -150,6 +154,9 @@ pub struct ComputeState {
|
||||
/// set up the span relationship ourselves.
|
||||
pub startup_span: Option<tracing::span::Span>,
|
||||
|
||||
pub lfc_prewarm_state: LfcPrewarmState,
|
||||
pub lfc_offload_state: LfcOffloadState,
|
||||
|
||||
pub metrics: ComputeMetrics,
|
||||
}
|
||||
|
||||
@@ -163,6 +170,8 @@ impl ComputeState {
|
||||
pspec: None,
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
lfc_prewarm_state: LfcPrewarmState::default(),
|
||||
lfc_offload_state: LfcOffloadState::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,6 +207,8 @@ pub struct ParsedSpec {
|
||||
pub pageserver_connstr: String,
|
||||
pub safekeeper_connstrings: Vec<String>,
|
||||
pub storage_auth_token: Option<String>,
|
||||
pub endpoint_storage_addr: Option<SocketAddr>,
|
||||
pub endpoint_storage_token: Option<String>,
|
||||
}
|
||||
|
||||
impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
@@ -251,6 +262,18 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
.or(Err("invalid timeline id"))?
|
||||
};
|
||||
|
||||
let endpoint_storage_addr: Option<SocketAddr> = spec
|
||||
.endpoint_storage_addr
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
|
||||
.unwrap_or_default()
|
||||
.parse()
|
||||
.ok();
|
||||
let endpoint_storage_token = spec
|
||||
.endpoint_storage_token
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
|
||||
|
||||
Ok(ParsedSpec {
|
||||
spec,
|
||||
pageserver_connstr,
|
||||
@@ -258,6 +281,8 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
storage_auth_token,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
endpoint_storage_addr,
|
||||
endpoint_storage_token,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -305,11 +330,39 @@ struct StartVmMonitorResult {
|
||||
impl ComputeNode {
|
||||
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
let conn_conf = postgres::config::Config::from_str(connstr)
|
||||
let mut conn_conf = postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build postgres config from connstr")?;
|
||||
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
|
||||
let mut tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
|
||||
.context("cannot build tokio postgres config from connstr")?;
|
||||
|
||||
// Users can set some configuration parameters per database with
|
||||
// ALTER DATABASE ... SET ...
|
||||
//
|
||||
// There are at least these parameters:
|
||||
//
|
||||
// - role=some_other_role
|
||||
// - default_transaction_read_only=on
|
||||
// - statement_timeout=1, i.e., 1ms, which will cause most of the queries to fail
|
||||
// - search_path=non_public_schema, this should be actually safe because
|
||||
// we don't call any functions in user databases, but better to always reset
|
||||
// it to public.
|
||||
//
|
||||
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
|
||||
// Unset them via connection string options before connecting to the database.
|
||||
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
|
||||
//
|
||||
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
|
||||
// as well. After rolling out this code, we can remove this parameter from control plane.
|
||||
// In the meantime, double-passing is fine, the last value is applied.
|
||||
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
let options = match conn_conf.get_options() {
|
||||
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
|
||||
None => EXTRA_OPTIONS.to_string(),
|
||||
};
|
||||
conn_conf.options(&options);
|
||||
tokio_conn_conf.options(&options);
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
if let Some(spec) = config.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
@@ -736,6 +789,9 @@ impl ComputeNode {
|
||||
// Log metrics so that we can search for slow operations in logs
|
||||
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
|
||||
|
||||
if pspec.spec.prewarm_lfc_on_startup {
|
||||
self.prewarm_lfc();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1422,15 +1478,20 @@ impl ComputeNode {
|
||||
Err(e) => match e.code() {
|
||||
Some(&SqlState::INVALID_PASSWORD)
|
||||
| Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => {
|
||||
// Connect with zenith_admin if cloud_admin could not authenticate
|
||||
// Connect with `zenith_admin` if `cloud_admin` could not authenticate
|
||||
info!(
|
||||
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
|
||||
"cannot connect to Postgres: {}, retrying with 'zenith_admin' username",
|
||||
e
|
||||
);
|
||||
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
|
||||
zenith_admin_conf.application_name("compute_ctl:apply_config");
|
||||
zenith_admin_conf.user("zenith_admin");
|
||||
|
||||
// It doesn't matter what were the options before, here we just want
|
||||
// to connect and create a new superuser role.
|
||||
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
zenith_admin_conf.options(ZENITH_OPTIONS);
|
||||
|
||||
let mut client =
|
||||
zenith_admin_conf.connect(NoTls)
|
||||
.context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?;
|
||||
@@ -1596,9 +1657,7 @@ impl ComputeNode {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
let mut conf =
|
||||
tokio_postgres::Config::from_str(self.params.connstr.as_str()).unwrap();
|
||||
conf.application_name("apply_config");
|
||||
let conf = self.get_tokio_conn_conf(Some("compute_ctl:reconfigure"));
|
||||
let conf = Arc::new(conf);
|
||||
|
||||
let spec = Arc::new(spec.clone());
|
||||
@@ -1838,9 +1897,9 @@ LIMIT 100",
|
||||
real_ext_name: String,
|
||||
ext_path: RemotePath,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let ext_remote_storage =
|
||||
let remote_ext_base_url =
|
||||
self.params
|
||||
.ext_remote_storage
|
||||
.remote_ext_base_url
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
@@ -1902,7 +1961,7 @@ LIMIT 100",
|
||||
let download_size = extension_server::download_extension(
|
||||
&real_ext_name,
|
||||
&ext_path,
|
||||
ext_remote_storage,
|
||||
remote_ext_base_url,
|
||||
&self.params.pgbin,
|
||||
)
|
||||
.await
|
||||
@@ -1937,23 +1996,40 @@ LIMIT 100",
|
||||
tokio::spawn(conn);
|
||||
|
||||
// TODO: support other types of grants apart from schemas?
|
||||
let query = format!(
|
||||
"GRANT {} ON SCHEMA {} TO {}",
|
||||
privileges
|
||||
.iter()
|
||||
// should not be quoted as it's part of the command.
|
||||
// is already sanitized so it's ok
|
||||
.map(|p| p.as_str())
|
||||
.collect::<Vec<&'static str>>()
|
||||
.join(", "),
|
||||
// quote the schema and role name as identifiers to sanitize them.
|
||||
schema_name.pg_quote(),
|
||||
role_name.pg_quote(),
|
||||
);
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
|
||||
// check the role grants first - to gracefully handle read-replicas.
|
||||
let select = "SELECT privilege_type
|
||||
FROM pg_namespace
|
||||
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
|
||||
JOIN pg_user users ON acl.grantee = users.usesysid
|
||||
WHERE users.usename = $1
|
||||
AND nspname = $2";
|
||||
let rows = db_client
|
||||
.query(select, &[role_name, schema_name])
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
.with_context(|| format!("Failed to execute query: {select}"))?;
|
||||
|
||||
let already_granted: HashSet<String> = rows.into_iter().map(|row| row.get(0)).collect();
|
||||
|
||||
let grants = privileges
|
||||
.iter()
|
||||
.filter(|p| !already_granted.contains(p.as_str()))
|
||||
// should not be quoted as it's part of the command.
|
||||
// is already sanitized so it's ok
|
||||
.map(|p| p.as_str())
|
||||
.join(", ");
|
||||
|
||||
if !grants.is_empty() {
|
||||
// quote the schema and role name as identifiers to sanitize them.
|
||||
let schema_name = schema_name.pg_quote();
|
||||
let role_name = role_name.pg_quote();
|
||||
|
||||
let query = format!("GRANT {grants} ON SCHEMA {schema_name} TO {role_name}",);
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2011,7 +2087,7 @@ LIMIT 100",
|
||||
&self,
|
||||
spec: &ComputeSpec,
|
||||
) -> Result<RemoteExtensionMetrics> {
|
||||
if self.params.ext_remote_storage.is_none() {
|
||||
if self.params.remote_ext_base_url.is_none() {
|
||||
return Ok(RemoteExtensionMetrics {
|
||||
num_ext_downloaded: 0,
|
||||
largest_ext_size: 0,
|
||||
|
||||
202
compute_tools/src/compute_prewarm.rs
Normal file
202
compute_tools/src/compute_prewarm.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
use crate::compute::ComputeNode;
|
||||
use anyhow::{Context, Result, bail};
|
||||
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
use compute_api::responses::LfcPrewarmState;
|
||||
use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
use std::sync::Arc;
|
||||
use tokio::{io::AsyncReadExt, spawn};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(serde::Serialize, Default)]
|
||||
pub struct LfcPrewarmStateWithProgress {
|
||||
#[serde(flatten)]
|
||||
base: LfcPrewarmState,
|
||||
total: i32,
|
||||
prewarmed: i32,
|
||||
skipped: i32,
|
||||
}
|
||||
|
||||
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
|
||||
struct EndpointStoragePair {
|
||||
url: String,
|
||||
token: String,
|
||||
}
|
||||
|
||||
const KEY: &str = "lfc_state";
|
||||
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
|
||||
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
|
||||
bail!("pspec.endpoint_id missing")
|
||||
};
|
||||
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
|
||||
bail!("pspec.endpoint_storage_addr missing")
|
||||
};
|
||||
let tenant_id = pspec.tenant_id;
|
||||
let timeline_id = pspec.timeline_id;
|
||||
|
||||
let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
|
||||
let Some(ref token) = pspec.endpoint_storage_token else {
|
||||
bail!("pspec.endpoint_storage_token missing")
|
||||
};
|
||||
let token = token.clone();
|
||||
Ok(EndpointStoragePair { url, token })
|
||||
}
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
// If prewarm failed, we want to get overall number of segments as well as done ones.
|
||||
// However, this function should be reliable even if querying postgres failed.
|
||||
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
|
||||
info!("requesting LFC prewarm state from postgres");
|
||||
let mut state = LfcPrewarmStateWithProgress::default();
|
||||
{
|
||||
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
|
||||
}
|
||||
|
||||
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
error!(%err, "connecting to postgres");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
let row = match client
|
||||
.query_one("select * from get_prewarm_info()", &[])
|
||||
.await
|
||||
{
|
||||
Ok(row) => row,
|
||||
Err(err) => {
|
||||
error!(%err, "querying LFC prewarm status");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
state.total = row.try_get(0).unwrap_or_default();
|
||||
state.prewarmed = row.try_get(1).unwrap_or_default();
|
||||
state.skipped = row.try_get(2).unwrap_or_default();
|
||||
state
|
||||
}
|
||||
|
||||
pub fn lfc_offload_state(&self) -> LfcOffloadState {
|
||||
self.state.lock().unwrap().lfc_offload_state.clone()
|
||||
}
|
||||
|
||||
/// Returns false if there is a prewarm request ongoing, true otherwise
|
||||
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
|
||||
crate::metrics::LFC_PREWARM_REQUESTS.inc();
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
||||
if let LfcPrewarmState::Prewarming =
|
||||
std::mem::replace(state, LfcPrewarmState::Prewarming)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let cloned = self.clone();
|
||||
spawn(async move {
|
||||
let Err(err) = cloned.prewarm_impl().await else {
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
|
||||
return;
|
||||
};
|
||||
error!(%err);
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
|
||||
error: err.to_string(),
|
||||
};
|
||||
});
|
||||
true
|
||||
}
|
||||
|
||||
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.pspec.as_ref().unwrap().try_into()
|
||||
}
|
||||
|
||||
async fn prewarm_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
info!(%url, "requesting LFC state from endpoint storage");
|
||||
|
||||
let request = Client::new().get(&url).bearer_auth(token);
|
||||
let res = request.send().await.context("querying endpoint storage")?;
|
||||
let status = res.status();
|
||||
if status != StatusCode::OK {
|
||||
bail!("{status} querying endpoint storage")
|
||||
}
|
||||
|
||||
let mut uncompressed = Vec::new();
|
||||
let lfc_state = res
|
||||
.bytes()
|
||||
.await
|
||||
.context("getting request body from endpoint storage")?;
|
||||
ZstdDecoder::new(lfc_state.iter().as_slice())
|
||||
.read_to_end(&mut uncompressed)
|
||||
.await
|
||||
.context("decoding LFC state")?;
|
||||
let uncompressed_len = uncompressed.len();
|
||||
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
|
||||
|
||||
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
|
||||
.await
|
||||
.context("loading LFC state into postgres")
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Returns false if there is an offload request ongoing, true otherwise
|
||||
pub fn offload_lfc(self: &Arc<Self>) -> bool {
|
||||
crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_offload_state;
|
||||
if let LfcOffloadState::Offloading =
|
||||
std::mem::replace(state, LfcOffloadState::Offloading)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let cloned = self.clone();
|
||||
spawn(async move {
|
||||
let Err(err) = cloned.offload_lfc_impl().await else {
|
||||
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
|
||||
return;
|
||||
};
|
||||
error!(%err);
|
||||
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
|
||||
error: err.to_string(),
|
||||
};
|
||||
});
|
||||
true
|
||||
}
|
||||
|
||||
async fn offload_lfc_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
info!(%url, "requesting LFC state from postgres");
|
||||
|
||||
let mut compressed = Vec::new();
|
||||
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
.query_one("select get_local_cache_state()", &[])
|
||||
.await
|
||||
.context("querying LFC state")?
|
||||
.try_get::<usize, &[u8]>(0)
|
||||
.context("deserializing LFC state")
|
||||
.map(ZstdEncoder::new)?
|
||||
.read_to_end(&mut compressed)
|
||||
.await
|
||||
.context("compressing LFC state")?;
|
||||
let compressed_len = compressed.len();
|
||||
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
|
||||
|
||||
let request = Client::new().put(url).bearer_auth(token).body(compressed);
|
||||
match request.send().await {
|
||||
Ok(res) if res.status() == StatusCode::OK => Ok(()),
|
||||
Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
|
||||
Err(err) => Err(err).context("writing to endpoint storage"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,6 +223,9 @@ pub fn write_postgres_conf(
|
||||
// TODO: tune this after performance testing
|
||||
writeln!(file, "pgaudit.log_rotation_age=5")?;
|
||||
|
||||
// Enable audit logs for pg_session_jwt extension
|
||||
writeln!(file, "pg_session_jwt.audit_log=on")?;
|
||||
|
||||
// Add audit shared_preload_libraries, if they are not present.
|
||||
//
|
||||
// The caller who sets the flag is responsible for ensuring that the necessary
|
||||
|
||||
@@ -158,14 +158,14 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
|
||||
pub async fn download_extension(
|
||||
ext_name: &str,
|
||||
ext_path: &RemotePath,
|
||||
ext_remote_storage: &str,
|
||||
remote_ext_base_url: &str,
|
||||
pgbin: &str,
|
||||
) -> Result<u64> {
|
||||
info!("Download extension {:?} from {:?}", ext_name, ext_path);
|
||||
|
||||
// TODO add retry logic
|
||||
let download_buffer =
|
||||
match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
|
||||
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
|
||||
Ok(buffer) => buffer,
|
||||
Err(error_message) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -272,8 +272,8 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
|
||||
// Do request to extension storage proxy, e.g.,
|
||||
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
|
||||
// using HTTP GET and return the response body as bytes.
|
||||
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", ext_remote_storage, ext_path);
|
||||
async fn download_extension_tar(remote_ext_base_url: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", remote_ext_base_url, ext_path);
|
||||
let filename = Path::new(ext_path)
|
||||
.file_name()
|
||||
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::{Result, anyhow};
|
||||
use axum::{RequestExt, body::Body};
|
||||
use axum_extra::{
|
||||
TypedHeader,
|
||||
headers::{Authorization, authorization::Bearer},
|
||||
};
|
||||
use compute_api::requests::ComputeClaims;
|
||||
use compute_api::requests::{COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope};
|
||||
use futures::future::BoxFuture;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
|
||||
@@ -25,13 +23,14 @@ pub(in crate::http) struct Authorize {
|
||||
impl Authorize {
|
||||
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
|
||||
let mut validation = Validation::new(Algorithm::EdDSA);
|
||||
// Nothing is currently required
|
||||
validation.required_spec_claims = HashSet::new();
|
||||
validation.validate_exp = true;
|
||||
// Unused by the control plane
|
||||
validation.validate_aud = false;
|
||||
// Unused by the control plane
|
||||
validation.validate_nbf = false;
|
||||
// Unused by the control plane
|
||||
validation.validate_aud = false;
|
||||
validation.set_audience(&[COMPUTE_AUDIENCE]);
|
||||
// Nothing is currently required
|
||||
validation.set_required_spec_claims(&[] as &[&str; 0]);
|
||||
|
||||
Self {
|
||||
compute_id,
|
||||
@@ -64,11 +63,47 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
|
||||
};
|
||||
|
||||
if data.claims.compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
match data.claims.scope {
|
||||
// TODO: We should validate audience for every token, but
|
||||
// instead of this ad-hoc validation, we should turn
|
||||
// [`Validation::validate_aud`] on. This is merely a stopgap
|
||||
// while we roll out `aud` deployment. We return a 401
|
||||
// Unauthorized because when we eventually do use
|
||||
// [`Validation`], we will hit the above `Err` match arm which
|
||||
// returns 401 Unauthorized.
|
||||
Some(ComputeClaimsScope::Admin) => {
|
||||
let Some(ref audience) = data.claims.audience else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"missing audience in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid audience in authorization token claims",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// If the scope is not [`ComputeClaimsScope::Admin`], then we
|
||||
// must validate the compute_id
|
||||
_ => {
|
||||
let Some(ref claimed_compute_id) = data.claims.compute_id else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"missing compute_id in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if *claimed_compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make claims available to any subsequent middleware or request
|
||||
|
||||
@@ -22,7 +22,7 @@ pub(in crate::http) async fn download_extension(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
// Don't even try to download extensions if no remote storage is configured
|
||||
if compute.params.ext_remote_storage.is_none() {
|
||||
if compute.params.remote_ext_base_url.is_none() {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"remote storage is not configured",
|
||||
|
||||
39
compute_tools/src/http/routes/lfc.rs
Normal file
39
compute_tools/src/http/routes/lfc.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
|
||||
use crate::http::JsonResponse;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Json, http::StatusCode};
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
||||
|
||||
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
|
||||
Json(compute.lfc_prewarm_state().await)
|
||||
}
|
||||
|
||||
// Following functions are marked async for axum, as it's more convenient than wrapping these
|
||||
// in async lambdas at call site
|
||||
|
||||
pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadState> {
|
||||
Json(compute.lfc_offload_state())
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
|
||||
if compute.prewarm_lfc() {
|
||||
StatusCode::ACCEPTED.into_response()
|
||||
} else {
|
||||
JsonResponse::error(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
"Multiple requests for prewarm are not allowed",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn offload(compute: Compute) -> Response {
|
||||
if compute.offload_lfc() {
|
||||
StatusCode::ACCEPTED.into_response()
|
||||
} else {
|
||||
JsonResponse::error(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
"Multiple requests for prewarm offload are not allowed",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ pub(in crate::http) mod extensions;
|
||||
pub(in crate::http) mod failpoints;
|
||||
pub(in crate::http) mod grants;
|
||||
pub(in crate::http) mod insights;
|
||||
pub(in crate::http) mod lfc;
|
||||
pub(in crate::http) mod metrics;
|
||||
pub(in crate::http) mod metrics_json;
|
||||
pub(in crate::http) mod status;
|
||||
|
||||
@@ -23,7 +23,7 @@ use super::{
|
||||
middleware::authorize::Authorize,
|
||||
routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
grants, insights, metrics, metrics_json, status, terminate,
|
||||
grants, insights, lfc, metrics, metrics_json, status, terminate,
|
||||
},
|
||||
};
|
||||
use crate::compute::ComputeNode;
|
||||
@@ -85,6 +85,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
|
||||
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
|
||||
@@ -11,6 +11,7 @@ pub mod http;
|
||||
pub mod logger;
|
||||
pub mod catalog;
|
||||
pub mod compute;
|
||||
pub mod compute_prewarm;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
pub mod installed_extensions;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::{
|
||||
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
|
||||
IntCounter, IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -97,6 +97,24 @@ pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
/// Needed as neon.file_cache_prewarm_batch == 0 doesn't mean we never tried to prewarm.
|
||||
/// On the other hand, LFC_PREWARMED_PAGES is excessive as we can GET /lfc/prewarm
|
||||
pub(crate) static LFC_PREWARM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_ctl_lfc_prewarm_requests_total",
|
||||
"Total number of LFC prewarm requests made by compute_ctl",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static LFC_OFFLOAD_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_ctl_lfc_offload_requests_total",
|
||||
"Total number of LFC offload requests made by compute_ctl",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = COMPUTE_CTL_UP.collect();
|
||||
metrics.extend(INSTALLED_EXTENSIONS.collect());
|
||||
@@ -106,5 +124,7 @@ pub fn collect() -> Vec<MetricFamily> {
|
||||
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
|
||||
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
|
||||
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
|
||||
metrics.extend(LFC_PREWARM_REQUESTS.collect());
|
||||
metrics.extend(LFC_OFFLOAD_REQUESTS.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ mod pg_helpers_tests {
|
||||
r#"fsync = off
|
||||
wal_level = logical
|
||||
hot_standby = on
|
||||
prewarm_lfc_on_startup = off
|
||||
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
|
||||
wal_log_hints = on
|
||||
log_connections = on
|
||||
|
||||
@@ -41,7 +41,7 @@ storage_broker.workspace = true
|
||||
http-utils.workspace = true
|
||||
utils.workspace = true
|
||||
whoami.workspace = true
|
||||
|
||||
endpoint_storage.workspace = true
|
||||
compute_api.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -16,10 +16,11 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use clap::Parser;
|
||||
use compute_api::requests::ComputeClaimsScope;
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::broker::StorageBroker;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::{
|
||||
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
|
||||
@@ -643,9 +644,10 @@ struct EndpointStartCmdArgs {
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
help = "Configure the remote extensions storage proxy gateway to request for extensions."
|
||||
help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
|
||||
alias = "remote-ext-config"
|
||||
)]
|
||||
remote_ext_config: Option<String>,
|
||||
remote_ext_base_url: Option<String>,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
@@ -705,6 +707,9 @@ struct EndpointStopCmdArgs {
|
||||
struct EndpointGenerateJwtCmdArgs {
|
||||
#[clap(help = "Postgres endpoint id")]
|
||||
endpoint_id: String,
|
||||
|
||||
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
|
||||
scope: Option<ComputeClaimsScope>,
|
||||
}
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
@@ -1018,7 +1023,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
})
|
||||
.collect(),
|
||||
endpoint_storage: EndpointStorageConf {
|
||||
port: ENDPOINT_STORAGE_DEFAULT_PORT,
|
||||
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
|
||||
},
|
||||
pg_distrib_dir: None,
|
||||
neon_distrib_dir: None,
|
||||
@@ -1410,9 +1415,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
EndpointCmd::Start(args) => {
|
||||
let endpoint_id = &args.endpoint_id;
|
||||
let pageserver_id = args.endpoint_pageserver_id;
|
||||
let remote_ext_config = &args.remote_ext_config;
|
||||
let remote_ext_base_url = &args.remote_ext_base_url;
|
||||
|
||||
let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
|
||||
let default_generation = env
|
||||
.storage_controller
|
||||
.timelines_onto_safekeepers
|
||||
.then_some(1);
|
||||
let safekeepers_generation = args
|
||||
.safekeepers_generation
|
||||
.or(default_generation)
|
||||
.map(SafekeeperGeneration::new);
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
|
||||
@@ -1484,14 +1496,29 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
None
|
||||
};
|
||||
|
||||
let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
|
||||
+ Duration::from_secs(86400))
|
||||
.as_secs();
|
||||
let claims = endpoint_storage::claims::EndpointStorageClaims {
|
||||
tenant_id: endpoint.tenant_id,
|
||||
timeline_id: endpoint.timeline_id,
|
||||
endpoint_id: endpoint_id.to_string(),
|
||||
exp,
|
||||
};
|
||||
|
||||
let endpoint_storage_token = env.generate_auth_token(&claims)?;
|
||||
let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
|
||||
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint
|
||||
.start(
|
||||
&auth_token,
|
||||
endpoint_storage_token,
|
||||
endpoint_storage_addr,
|
||||
safekeepers_generation,
|
||||
safekeepers,
|
||||
pageservers,
|
||||
remote_ext_config.as_ref(),
|
||||
remote_ext_base_url.as_ref(),
|
||||
stripe_size.0 as usize,
|
||||
args.create_test_user,
|
||||
args.start_timeout,
|
||||
@@ -1540,12 +1567,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
endpoint.stop(&args.mode, args.destroy)?;
|
||||
}
|
||||
EndpointCmd::GenerateJwt(args) => {
|
||||
let endpoint_id = &args.endpoint_id;
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
.get(endpoint_id)
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
let jwt = endpoint.generate_jwt()?;
|
||||
let endpoint = {
|
||||
let endpoint_id = &args.endpoint_id;
|
||||
|
||||
cplane
|
||||
.endpoints
|
||||
.get(endpoint_id)
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
|
||||
};
|
||||
|
||||
let jwt = endpoint.generate_jwt(args.scope)?;
|
||||
|
||||
print!("{jwt}");
|
||||
}
|
||||
|
||||
@@ -45,7 +45,9 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use compute_api::requests::{ComputeClaims, ConfigurationRequest};
|
||||
use compute_api::requests::{
|
||||
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
|
||||
};
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
|
||||
};
|
||||
@@ -630,9 +632,17 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
pub fn generate_jwt(&self) -> Result<String> {
|
||||
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
|
||||
self.env.generate_auth_token(&ComputeClaims {
|
||||
compute_id: self.endpoint_id.clone(),
|
||||
audience: match scope {
|
||||
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
|
||||
_ => None,
|
||||
},
|
||||
compute_id: match scope {
|
||||
Some(ComputeClaimsScope::Admin) => None,
|
||||
_ => Some(self.endpoint_id.clone()),
|
||||
},
|
||||
scope,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -640,10 +650,12 @@ impl Endpoint {
|
||||
pub async fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
endpoint_storage_token: String,
|
||||
endpoint_storage_addr: String,
|
||||
safekeepers_generation: Option<SafekeeperGeneration>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
remote_ext_config: Option<&String>,
|
||||
remote_ext_base_url: Option<&String>,
|
||||
shard_stripe_size: usize,
|
||||
create_test_user: bool,
|
||||
start_timeout: Duration,
|
||||
@@ -733,6 +745,9 @@ impl Endpoint {
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
audit_log_level: ComputeAudit::Disabled,
|
||||
logs_export_host: None::<String>,
|
||||
endpoint_storage_addr: Some(endpoint_storage_addr),
|
||||
endpoint_storage_token: Some(endpoint_storage_token),
|
||||
prewarm_lfc_on_startup: false,
|
||||
};
|
||||
|
||||
// this strange code is needed to support respec() in tests
|
||||
@@ -810,8 +825,8 @@ impl Endpoint {
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
|
||||
if let Some(remote_ext_config) = remote_ext_config {
|
||||
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||
if let Some(remote_ext_base_url) = remote_ext_base_url {
|
||||
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
|
||||
}
|
||||
|
||||
let child = cmd.spawn()?;
|
||||
@@ -903,7 +918,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
),
|
||||
)
|
||||
.bearer_auth(self.generate_jwt()?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
@@ -980,7 +995,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
))
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.bearer_auth(self.generate_jwt()?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.body(
|
||||
serde_json::to_string(&ConfigurationRequest {
|
||||
spec,
|
||||
|
||||
@@ -3,17 +3,19 @@ use crate::local_env::LocalEnv;
|
||||
use anyhow::{Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Directory within .neon which will be used by default for LocalFs remote storage.
|
||||
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
|
||||
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
|
||||
pub const ENDPOINT_STORAGE_DEFAULT_ADDR: SocketAddr =
|
||||
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9993);
|
||||
|
||||
pub struct EndpointStorage {
|
||||
pub bin: Utf8PathBuf,
|
||||
pub data_dir: Utf8PathBuf,
|
||||
pub pemfile: Utf8PathBuf,
|
||||
pub port: u16,
|
||||
pub addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl EndpointStorage {
|
||||
@@ -22,7 +24,7 @@ impl EndpointStorage {
|
||||
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
|
||||
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
|
||||
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
|
||||
port: env.endpoint_storage.port,
|
||||
addr: env.endpoint_storage.listen_addr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +33,7 @@ impl EndpointStorage {
|
||||
}
|
||||
|
||||
fn listen_addr(&self) -> Utf8PathBuf {
|
||||
format!("127.0.0.1:{}", self.port).into()
|
||||
format!("{}:{}", self.addr.ip(), self.addr.port()).into()
|
||||
}
|
||||
|
||||
pub fn init(&self) -> Result<()> {
|
||||
|
||||
@@ -20,7 +20,9 @@ use utils::auth::encode_from_key_file;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::broker::StorageBroker;
|
||||
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
|
||||
use crate::endpoint_storage::{
|
||||
ENDPOINT_STORAGE_DEFAULT_ADDR, ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage,
|
||||
};
|
||||
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
@@ -151,10 +153,10 @@ pub struct NeonLocalInitConf {
|
||||
pub generate_local_ssl_certs: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct EndpointStorageConf {
|
||||
pub port: u16,
|
||||
pub listen_addr: SocketAddr,
|
||||
}
|
||||
|
||||
/// Broker config for cluster internal communication.
|
||||
@@ -241,6 +243,14 @@ impl Default for NeonStorageControllerConf {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EndpointStorageConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NeonBroker {
|
||||
pub fn client_url(&self) -> Url {
|
||||
let url = if let Some(addr) = self.listen_https_addr {
|
||||
|
||||
@@ -10,7 +10,8 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hyper0::Uri;
|
||||
use nix::unistd::Pid;
|
||||
use pageserver_api::controller_api::{
|
||||
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
|
||||
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest,
|
||||
SafekeeperSchedulingPolicyRequest, SkSchedulingPolicy, TenantCreateRequest,
|
||||
TenantCreateResponse, TenantLocateResponse,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
@@ -20,7 +21,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
use reqwest::Method;
|
||||
use reqwest::{Method, Response};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::process::Command;
|
||||
@@ -570,6 +571,11 @@ impl StorageController {
|
||||
let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
|
||||
.expect("failed to generate jwt token");
|
||||
args.push(format!("--peer-jwt-token={peer_jwt_token}"));
|
||||
|
||||
let claims = Claims::new(None, Scope::SafekeeperData);
|
||||
let jwt_token =
|
||||
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
|
||||
args.push(format!("--safekeeper-jwt-token={jwt_token}"));
|
||||
}
|
||||
|
||||
if let Some(public_key) = &self.public_key {
|
||||
@@ -614,6 +620,10 @@ impl StorageController {
|
||||
self.env.base_data_dir.display()
|
||||
));
|
||||
|
||||
if self.env.safekeepers.iter().any(|sk| sk.auth_enabled) && self.private_key.is_none() {
|
||||
anyhow::bail!("Safekeeper set up for auth but no private key specified");
|
||||
}
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
args.push("--timelines-onto-safekeepers".to_string());
|
||||
}
|
||||
@@ -640,6 +650,10 @@ impl StorageController {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
self.register_safekeepers().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -743,6 +757,23 @@ impl StorageController {
|
||||
where
|
||||
RQ: Serialize + Sized,
|
||||
RS: DeserializeOwned + Sized,
|
||||
{
|
||||
let response = self.dispatch_inner(method, path, body).await?;
|
||||
Ok(response
|
||||
.json()
|
||||
.await
|
||||
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
|
||||
}
|
||||
|
||||
/// Simple HTTP request wrapper for calling into storage controller
|
||||
async fn dispatch_inner<RQ>(
|
||||
&self,
|
||||
method: reqwest::Method,
|
||||
path: String,
|
||||
body: Option<RQ>,
|
||||
) -> anyhow::Result<Response>
|
||||
where
|
||||
RQ: Serialize + Sized,
|
||||
{
|
||||
// In the special case of the `storage_controller start` subcommand, we wish
|
||||
// to use the API endpoint of the newly started storage controller in order
|
||||
@@ -785,10 +816,31 @@ impl StorageController {
|
||||
let response = builder.send().await?;
|
||||
let response = response.error_from_body().await?;
|
||||
|
||||
Ok(response
|
||||
.json()
|
||||
.await
|
||||
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Register the safekeepers in the storage controller
|
||||
#[instrument(skip(self))]
|
||||
async fn register_safekeepers(&self) -> anyhow::Result<()> {
|
||||
for sk in self.env.safekeepers.iter() {
|
||||
let sk_id = sk.id;
|
||||
let body = serde_json::json!({
|
||||
"id": sk_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "127.0.0.1",
|
||||
"port": sk.pg_port,
|
||||
"http_port": sk.http_port,
|
||||
"https_port": sk.https_port,
|
||||
"version": 5957,
|
||||
"availability_zone_id": format!("us-east-2b-{sk_id}"),
|
||||
});
|
||||
self.upsert_safekeeper(sk_id, body).await?;
|
||||
self.safekeeper_scheduling_policy(sk_id, SkSchedulingPolicy::Active)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call into the attach_hook API, for use before handing out attachments to pageservers
|
||||
@@ -816,6 +868,42 @@ impl StorageController {
|
||||
Ok(response.generation)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn upsert_safekeeper(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
request: serde_json::Value,
|
||||
) -> anyhow::Result<()> {
|
||||
let resp = self
|
||||
.dispatch_inner::<serde_json::Value>(
|
||||
Method::POST,
|
||||
format!("control/v1/safekeeper/{node_id}"),
|
||||
Some(request),
|
||||
)
|
||||
.await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!(
|
||||
"setting scheduling policy unsuccessful for safekeeper {node_id}: {}",
|
||||
resp.status()
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn safekeeper_scheduling_policy(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
scheduling_policy: SkSchedulingPolicy,
|
||||
) -> anyhow::Result<()> {
|
||||
self.dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
|
||||
Method::POST,
|
||||
format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
|
||||
Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn inspect(
|
||||
&self,
|
||||
|
||||
@@ -12,6 +12,7 @@ ERROR: invalid JWT encoding
|
||||
-- Test creating a session with an expired JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
|
||||
ERROR: Token used after it has expired
|
||||
DETAIL: exp=1742564432
|
||||
-- Test creating a session with a valid JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
|
||||
jwt_session_init
|
||||
|
||||
@@ -343,7 +343,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
|
||||
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
|
||||
fn token() -> String {
|
||||
let claims = endpoint_storage::Claims {
|
||||
let claims = endpoint_storage::claims::EndpointStorageClaims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: TIMELINE_ID,
|
||||
endpoint_id: ENDPOINT_ID.into(),
|
||||
@@ -489,16 +489,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
}
|
||||
|
||||
fn delete_prefix_token(uri: &str) -> String {
|
||||
use serde::Serialize;
|
||||
let parts = uri.split("/").collect::<Vec<&str>>();
|
||||
#[derive(Serialize)]
|
||||
struct PrefixClaims {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
endpoint_id: Option<endpoint_storage::EndpointId>,
|
||||
exp: u64,
|
||||
}
|
||||
let claims = PrefixClaims {
|
||||
let claims = endpoint_storage::claims::DeletePrefixClaims {
|
||||
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
|
||||
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
|
||||
endpoint_id: parts.get(3).map(ToString::to_string),
|
||||
|
||||
52
endpoint_storage/src/claims.rs
Normal file
52
endpoint_storage/src/claims.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use utils::id::{EndpointId, TenantId, TimelineId};
|
||||
|
||||
/// Claims to add, remove, or retrieve endpoint data. Used by compute_ctl
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct EndpointStorageClaims {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub endpoint_id: EndpointId,
|
||||
pub exp: u64,
|
||||
}
|
||||
|
||||
/// Claims to remove tenant, timeline, or endpoint data. Used by control plane
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct DeletePrefixClaims {
|
||||
pub tenant_id: TenantId,
|
||||
/// None when tenant is deleted (endpoint_id is also None in this case)
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
/// None when timeline is deleted
|
||||
pub endpoint_id: Option<EndpointId>,
|
||||
pub exp: u64,
|
||||
}
|
||||
|
||||
impl Display for EndpointStorageClaims {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"EndpointClaims(tenant_id={} timeline_id={} endpoint_id={} exp={})",
|
||||
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DeletePrefixClaims {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"DeletePrefixClaims(tenant_id={} timeline_id={} endpoint_id={}, exp={})",
|
||||
self.tenant_id,
|
||||
self.timeline_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string()),
|
||||
self.endpoint_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string()),
|
||||
self.exp
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod claims;
|
||||
use crate::claims::{DeletePrefixClaims, EndpointStorageClaims};
|
||||
use anyhow::Result;
|
||||
use axum::extract::{FromRequestParts, Path};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
@@ -13,7 +15,7 @@ use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::id::{EndpointId, TenantId, TimelineId};
|
||||
|
||||
// simplified version of utils::auth::JwtAuth
|
||||
pub struct JwtAuth {
|
||||
@@ -79,26 +81,6 @@ pub struct Storage {
|
||||
pub max_upload_file_limit: usize,
|
||||
}
|
||||
|
||||
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct Claims {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub endpoint_id: EndpointId,
|
||||
pub exp: u64,
|
||||
}
|
||||
|
||||
impl Display for Claims {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
|
||||
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct KeyRequest {
|
||||
tenant_id: TenantId,
|
||||
@@ -107,6 +89,13 @@ struct KeyRequest {
|
||||
path: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
struct PrefixKeyRequest {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
endpoint_id: Option<EndpointId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct S3Path {
|
||||
pub path: RemotePath,
|
||||
@@ -165,7 +154,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
.extract::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
let claims: Claims = state
|
||||
let claims: EndpointStorageClaims = state
|
||||
.auth
|
||||
.decode(bearer.token())
|
||||
.map_err(|e| bad_request(e, "decoding token"))?;
|
||||
@@ -178,7 +167,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
path.endpoint_id.clone()
|
||||
};
|
||||
|
||||
let route = Claims {
|
||||
let route = EndpointStorageClaims {
|
||||
tenant_id: path.tenant_id,
|
||||
timeline_id: path.timeline_id,
|
||||
endpoint_id,
|
||||
@@ -193,38 +182,13 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct PrefixKeyPath {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub endpoint_id: Option<EndpointId>,
|
||||
}
|
||||
|
||||
impl Display for PrefixKeyPath {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
|
||||
self.tenant_id,
|
||||
self.timeline_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string()),
|
||||
self.endpoint_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string())
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct PrefixS3Path {
|
||||
pub path: RemotePath,
|
||||
}
|
||||
|
||||
impl From<&PrefixKeyPath> for PrefixS3Path {
|
||||
fn from(path: &PrefixKeyPath) -> Self {
|
||||
impl From<&DeletePrefixClaims> for PrefixS3Path {
|
||||
fn from(path: &DeletePrefixClaims) -> Self {
|
||||
let timeline_id = path
|
||||
.timeline_id
|
||||
.as_ref()
|
||||
@@ -250,21 +214,27 @@ impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
|
||||
state: &Arc<Storage>,
|
||||
) -> Result<Self, Self::Rejection> {
|
||||
let Path(path) = parts
|
||||
.extract::<Path<PrefixKeyPath>>()
|
||||
.extract::<Path<PrefixKeyRequest>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid route"))?;
|
||||
let TypedHeader(Authorization(bearer)) = parts
|
||||
.extract::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
let claims: PrefixKeyPath = state
|
||||
let claims: DeletePrefixClaims = state
|
||||
.auth
|
||||
.decode(bearer.token())
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
if path != claims {
|
||||
return Err(unauthorized(path, claims));
|
||||
let route = DeletePrefixClaims {
|
||||
tenant_id: path.tenant_id,
|
||||
timeline_id: path.timeline_id,
|
||||
endpoint_id: path.endpoint_id,
|
||||
exp: claims.exp,
|
||||
};
|
||||
if route != claims {
|
||||
return Err(unauthorized(route, claims));
|
||||
}
|
||||
Ok((&path).into())
|
||||
Ok((&route).into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -297,7 +267,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn s3_path() {
|
||||
let auth = Claims {
|
||||
let auth = EndpointStorageClaims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: TIMELINE_ID,
|
||||
endpoint_id: ENDPOINT_ID.into(),
|
||||
@@ -327,10 +297,11 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn prefix_s3_path() {
|
||||
let mut path = PrefixKeyPath {
|
||||
let mut path = DeletePrefixClaims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: None,
|
||||
endpoint_id: None,
|
||||
exp: 0,
|
||||
};
|
||||
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
|
||||
assert_eq!(
|
||||
|
||||
@@ -1,16 +1,58 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
use std::str::FromStr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::privilege::Privilege;
|
||||
use crate::responses::ComputeCtlConfig;
|
||||
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
|
||||
|
||||
/// The value to place in the [`ComputeClaims::audience`] claim.
|
||||
pub static COMPUTE_AUDIENCE: &str = "compute";
|
||||
|
||||
/// Available scopes for a compute's JWT.
|
||||
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ComputeClaimsScope {
|
||||
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
|
||||
/// facilities.
|
||||
Admin,
|
||||
}
|
||||
|
||||
impl FromStr for ComputeClaimsScope {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"admin" => Ok(ComputeClaimsScope::Admin),
|
||||
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// When making requests to the `compute_ctl` external HTTP server, the client
|
||||
/// must specify a set of claims in `Authorization` header JWTs such that
|
||||
/// `compute_ctl` can authorize the request.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename = "snake_case")]
|
||||
pub struct ComputeClaims {
|
||||
pub compute_id: String,
|
||||
/// The compute ID that will validate the token. The only case in which this
|
||||
/// can be [`None`] is if [`Self::scope`] is
|
||||
/// [`ComputeClaimsScope::Admin`].
|
||||
pub compute_id: Option<String>,
|
||||
|
||||
/// The scope of what the token authorizes.
|
||||
pub scope: Option<ComputeClaimsScope>,
|
||||
|
||||
/// The recipient the token is intended for.
|
||||
///
|
||||
/// See [RFC 7519](https://www.rfc-editor.org/rfc/rfc7519#section-4.1.3) for
|
||||
/// more information.
|
||||
///
|
||||
/// TODO: Remove the [`Option`] wrapper when control plane learns to send
|
||||
/// the claim.
|
||||
#[serde(rename = "aud")]
|
||||
pub audience: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
/// Request of the /configure API
|
||||
|
||||
@@ -46,6 +46,30 @@ pub struct ExtensionInstallResponse {
|
||||
pub version: ExtVersion,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcPrewarmState {
|
||||
#[default]
|
||||
NotPrewarmed,
|
||||
Prewarming,
|
||||
Completed,
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcOffloadState {
|
||||
#[default]
|
||||
NotOffloaded,
|
||||
Offloading,
|
||||
Completed,
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Response of the /status API
|
||||
#[derive(Serialize, Debug, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
|
||||
@@ -172,6 +172,15 @@ pub struct ComputeSpec {
|
||||
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
|
||||
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
|
||||
pub logs_export_host: Option<String>,
|
||||
|
||||
/// Address of endpoint storage service
|
||||
pub endpoint_storage_addr: Option<String>,
|
||||
/// JWT for authorizing requests to endpoint storage service
|
||||
pub endpoint_storage_token: Option<String>,
|
||||
|
||||
/// If true, download LFC state from endpoint_storage and pass it to Postgres on startup
|
||||
#[serde(default)]
|
||||
pub prewarm_lfc_on_startup: bool,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
|
||||
@@ -84,6 +84,11 @@
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "prewarm_lfc_on_startup",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501",
|
||||
|
||||
@@ -16,6 +16,7 @@ pub struct Collector {
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
|
||||
// SAFETY: libc::sysconf is safe, it merely returns a value.
|
||||
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
|
||||
if long == -1 {
|
||||
panic!("sysconf(_SC_CLK_TCK) failed");
|
||||
|
||||
@@ -182,6 +182,7 @@ pub struct ConfigToml {
|
||||
pub tracing: Option<Tracing>,
|
||||
pub enable_tls_page_service_api: bool,
|
||||
pub dev_mode: bool,
|
||||
pub timeline_import_config: TimelineImportConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -300,6 +301,12 @@ impl From<OtelExporterProtocol> for tracing_utils::Protocol {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct TimelineImportConfig {
|
||||
pub import_job_concurrency: NonZeroUsize,
|
||||
pub import_job_soft_size_limit: NonZeroUsize,
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
pub mod mock {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -659,6 +666,10 @@ impl Default for ConfigToml {
|
||||
tracing: None,
|
||||
enable_tls_page_service_api: false,
|
||||
dev_mode: false,
|
||||
timeline_import_config: TimelineImportConfig {
|
||||
import_job_concurrency: NonZeroUsize::new(128).unwrap(),
|
||||
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -561,6 +561,21 @@ pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn key_to_rel_tag(key: Key) -> RelTag {
|
||||
RelTag {
|
||||
spcnode: key.field2,
|
||||
dbnode: key.field3,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn key_to_blknum(key: Key) -> BlockNumber {
|
||||
key.field6
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_size_to_key(rel: RelTag) -> Key {
|
||||
Key {
|
||||
|
||||
@@ -295,6 +295,9 @@ pub struct TenantId(Id);
|
||||
|
||||
id_newtype!(TenantId);
|
||||
|
||||
/// If needed, reuse small string from proxy/src/types.rc
|
||||
pub type EndpointId = String;
|
||||
|
||||
// A pair uniquely identifying Neon instance.
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct TenantTimelineId {
|
||||
|
||||
@@ -17,7 +17,7 @@ impl std::fmt::Display for RateLimitStats {
|
||||
}
|
||||
|
||||
impl RateLimit {
|
||||
pub fn new(interval: Duration) -> Self {
|
||||
pub const fn new(interval: Duration) -> Self {
|
||||
Self {
|
||||
last: None,
|
||||
interval,
|
||||
|
||||
@@ -230,6 +230,8 @@ pub struct PageServerConf {
|
||||
/// such as authentication requirements for HTTP and PostgreSQL APIs.
|
||||
/// This is insecure and should only be used in development environments.
|
||||
pub dev_mode: bool,
|
||||
|
||||
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -404,6 +406,7 @@ impl PageServerConf {
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
timeline_import_config,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -457,6 +460,7 @@ impl PageServerConf {
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
timeline_import_config,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -1038,21 +1038,23 @@ impl PageServerHandler {
|
||||
tracing::info_span!(
|
||||
parent: &parent_span,
|
||||
"handle_get_page_request",
|
||||
request_id = %req.hdr.reqid,
|
||||
rel = %req.rel,
|
||||
blkno = %req.blkno,
|
||||
req_lsn = %req.hdr.request_lsn,
|
||||
not_modified_since_lsn = %req.hdr.not_modified_since
|
||||
not_modified_since_lsn = %req.hdr.not_modified_since,
|
||||
)
|
||||
}};
|
||||
($shard_id:expr) => {{
|
||||
tracing::info_span!(
|
||||
parent: &parent_span,
|
||||
"handle_get_page_request",
|
||||
request_id = %req.hdr.reqid,
|
||||
rel = %req.rel,
|
||||
blkno = %req.blkno,
|
||||
req_lsn = %req.hdr.request_lsn,
|
||||
not_modified_since_lsn = %req.hdr.not_modified_since,
|
||||
shard_id = %$shard_id
|
||||
shard_id = %$shard_id,
|
||||
)
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -16,9 +16,9 @@ use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
||||
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
|
||||
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
|
||||
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
TWOPHASEDIR_KEY, dbdir_key_range, key_to_blknum, key_to_rel_tag, rel_block_to_key,
|
||||
rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
};
|
||||
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
||||
@@ -40,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext};
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
@@ -259,7 +259,7 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, Lsn, RequestContext); 1]>> =
|
||||
HashMap::with_capacity(pages.len());
|
||||
|
||||
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
|
||||
@@ -275,41 +275,21 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%lsn,
|
||||
"GET_BATCH",
|
||||
batch_size = %page_count,
|
||||
)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
Err(err) => {
|
||||
result_slots[response_slot_idx].write(Err(err));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if *blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag, blknum, lsn, nblocks
|
||||
);
|
||||
result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
.attached_child();
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push((response_slot_idx, ctx));
|
||||
key_slots.push((response_slot_idx, lsn, ctx));
|
||||
|
||||
let acc = req_keyspaces.entry(lsn).or_default();
|
||||
acc.add_key(key);
|
||||
@@ -323,56 +303,102 @@ impl Timeline {
|
||||
let query = VersionedKeySpaceQuery::scattered(query);
|
||||
let res = self
|
||||
.get_vectored(query, io_concurrency, ctx)
|
||||
.maybe_perf_instrument(ctx, |current_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: current_perf_span,
|
||||
"GET_BATCH",
|
||||
batch_size = %page_count,
|
||||
)
|
||||
})
|
||||
.maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for (slot, req_ctx) in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
PageReconstructError::Cancelled => PageReconstructError::Cancelled,
|
||||
// Try to check if error is caused by access beyond end of relation
|
||||
match &res {
|
||||
Err(err) => {
|
||||
let tag = key_to_rel_tag(key);
|
||||
let blknum = key_to_blknum(key);
|
||||
let mut first_error_slot: Option<usize> = None;
|
||||
for (slot, lsn, req_ctx) in key_slots {
|
||||
// Check relation size only in case of error
|
||||
let relsize_ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
x @ PageReconstructError::Other(_)
|
||||
| x @ PageReconstructError::AncestorLsnTimeout(_)
|
||||
| x @ PageReconstructError::WalRedo(_)
|
||||
| x @ PageReconstructError::MissingKey(_) => {
|
||||
PageReconstructError::Other(anyhow::anyhow!(
|
||||
"there was more than one request for this key in the batch, error logged once: {x:?}"
|
||||
))
|
||||
if let Ok(nblocks) = self
|
||||
.get_rel_size(tag, Version::Lsn(lsn), &relsize_ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
crnt_perf_span.clone()
|
||||
})
|
||||
.await
|
||||
{
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag, blknum, lsn, nblocks
|
||||
);
|
||||
result_slots[slot].write(Ok(ZERO_PAGE.clone()));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}),
|
||||
};
|
||||
if first_error_slot.is_none() {
|
||||
first_error_slot = Some(slot);
|
||||
} else {
|
||||
let err = match err {
|
||||
PageReconstructError::Cancelled => {
|
||||
PageReconstructError::Cancelled
|
||||
}
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
x @ PageReconstructError::Other(_)
|
||||
| x @ PageReconstructError::AncestorLsnTimeout(_)
|
||||
| x @ PageReconstructError::WalRedo(_)
|
||||
| x @ PageReconstructError::MissingKey(_) => {
|
||||
PageReconstructError::Other(anyhow::anyhow!(
|
||||
"there was more than one request for this key in the batch, error logged once: {x:?}"
|
||||
))
|
||||
}
|
||||
};
|
||||
result_slots[slot].write(Err(err));
|
||||
};
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
if let Some(slot) = first_error_slot {
|
||||
result_slots[slot].write(res);
|
||||
}
|
||||
}
|
||||
Ok(buf) => {
|
||||
let (first_slot, _first_lsn, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for (slot, _lsn, req_ctx) in key_slots {
|
||||
result_slots[slot].write(Ok(buf.clone()));
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
for (slot, _lsn, req_ctx) in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -478,8 +504,6 @@ impl Timeline {
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
||||
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
@@ -1333,32 +1357,6 @@ impl Timeline {
|
||||
None
|
||||
}
|
||||
|
||||
/// Update cached relation size if there is no more recent update
|
||||
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
|
||||
if lsn < rel_size_cache.complete_as_of {
|
||||
// Do not cache old values. It's safe to cache the size on read, as long as
|
||||
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
|
||||
// never evict values from the cache, so if the relation size changed after
|
||||
// 'lsn', the new value is already in the cache.
|
||||
return;
|
||||
}
|
||||
|
||||
match rel_size_cache.map.entry(tag) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
let cached_lsn = entry.get_mut();
|
||||
if lsn >= cached_lsn.0 {
|
||||
*cached_lsn = (lsn, nblocks);
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
|
||||
@@ -94,10 +94,23 @@ impl Header {
|
||||
pub enum WriteBlobError {
|
||||
#[error(transparent)]
|
||||
Flush(FlushTaskError),
|
||||
#[error("blob too large ({len} bytes)")]
|
||||
BlobTooLarge { len: usize },
|
||||
#[error(transparent)]
|
||||
WriteBlobRaw(anyhow::Error),
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl WriteBlobError {
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
WriteBlobError::Flush(e) => e.is_cancel(),
|
||||
WriteBlobError::Other(_) => false,
|
||||
}
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
WriteBlobError::Flush(e) => e.into_anyhow(),
|
||||
WriteBlobError::Other(e) => e,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockCursor<'_> {
|
||||
@@ -327,7 +340,9 @@ where
|
||||
return (
|
||||
(
|
||||
io_buf.slice_len(),
|
||||
Err(WriteBlobError::BlobTooLarge { len }),
|
||||
Err(WriteBlobError::Other(anyhow::anyhow!(
|
||||
"blob too large ({len} bytes)"
|
||||
))),
|
||||
),
|
||||
srcbuf,
|
||||
);
|
||||
@@ -391,7 +406,7 @@ where
|
||||
// Verify the header, to ensure we don't write invalid/corrupt data.
|
||||
let header = match Header::decode(&raw_with_header)
|
||||
.context("decoding blob header")
|
||||
.map_err(WriteBlobError::WriteBlobRaw)
|
||||
.map_err(WriteBlobError::Other)
|
||||
{
|
||||
Ok(header) => header,
|
||||
Err(err) => return (raw_with_header, Err(err)),
|
||||
@@ -401,7 +416,7 @@ where
|
||||
let raw_len = raw_with_header.len();
|
||||
return (
|
||||
raw_with_header,
|
||||
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
|
||||
Err(WriteBlobError::Other(anyhow::anyhow!(
|
||||
"header length mismatch: {header_total_len} != {raw_len}"
|
||||
))),
|
||||
);
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
pub mod batch_split_writer;
|
||||
pub mod delta_layer;
|
||||
pub mod errors;
|
||||
pub mod filter_iterator;
|
||||
pub mod image_layer;
|
||||
pub mod inmemory_layer;
|
||||
|
||||
@@ -10,6 +10,7 @@ use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::layer::S3_UPLOAD_LIMIT;
|
||||
use super::{
|
||||
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
|
||||
@@ -235,7 +236,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
@@ -253,7 +254,8 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(PutError::Other)?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.batches.add_unfinished_image_writer(
|
||||
prev_image_writer,
|
||||
@@ -346,7 +348,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
//
|
||||
@@ -366,7 +368,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(PutError::Other)?,
|
||||
));
|
||||
}
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
@@ -386,7 +389,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(PutError::Other)?;
|
||||
let (start_key, prev_delta_writer) =
|
||||
self.inner.replace((key, next_delta_writer)).unwrap();
|
||||
self.batches.add_unfinished_delta_writer(
|
||||
@@ -396,11 +400,11 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
);
|
||||
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
anyhow::bail!(
|
||||
return Err(PutError::Other(anyhow::anyhow!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
inner.estimated_size()
|
||||
);
|
||||
)));
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
|
||||
@@ -55,6 +55,7 @@ use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::{
|
||||
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
@@ -477,12 +478,15 @@ impl DeltaLayerWriterInner {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
let (_, res) = self
|
||||
.put_value_bytes(
|
||||
key,
|
||||
lsn,
|
||||
Value::ser(&val)?.slice_len(),
|
||||
Value::ser(&val)
|
||||
.map_err(anyhow::Error::new)
|
||||
.map_err(PutError::Other)?
|
||||
.slice_len(),
|
||||
val.will_init(),
|
||||
ctx,
|
||||
)
|
||||
@@ -497,7 +501,7 @@ impl DeltaLayerWriterInner {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), PutError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
@@ -513,19 +517,24 @@ impl DeltaLayerWriterInner {
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(val, ctx, compression)
|
||||
.await;
|
||||
let res = res.map_err(PutError::WriteBlob);
|
||||
let off = match res {
|
||||
Ok((off, _)) => off,
|
||||
Err(e) => return (val, Err(anyhow::anyhow!(e))),
|
||||
Err(e) => return (val, Err(e)),
|
||||
};
|
||||
|
||||
let blob_ref = BlobRef::new(off, will_init);
|
||||
|
||||
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
|
||||
let res = self.tree.append(&delta_key.0, blob_ref.0);
|
||||
let res = self
|
||||
.tree
|
||||
.append(&delta_key.0, blob_ref.0)
|
||||
.map_err(anyhow::Error::new)
|
||||
.map_err(PutError::Other);
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
(val, res)
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
@@ -694,7 +703,7 @@ impl DeltaLayerWriter {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -709,7 +718,7 @@ impl DeltaLayerWriter {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), PutError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
@@ -1441,14 +1450,6 @@ impl DeltaLayerInner {
|
||||
offset
|
||||
}
|
||||
|
||||
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -1634,7 +1635,6 @@ pub(crate) mod test {
|
||||
use crate::tenant::disk_btree::tests::TestDisk;
|
||||
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::{TenantShard, Timeline};
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
@@ -2311,8 +2311,7 @@ pub(crate) mod test {
|
||||
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
|
||||
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
|
||||
// Test if the batch size is correctly determined
|
||||
let mut iter = delta_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
let mut num_items = 0;
|
||||
for _ in 0..3 {
|
||||
iter.next_batch().await.unwrap();
|
||||
@@ -2329,8 +2328,7 @@ pub(crate) mod test {
|
||||
iter.key_values_batch.clear();
|
||||
}
|
||||
// Test if the result is correct
|
||||
let mut iter = delta_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
assert_delta_iter_equal(&mut iter, &test_deltas).await;
|
||||
}
|
||||
}
|
||||
|
||||
24
pageserver/src/tenant/storage_layer/errors.rs
Normal file
24
pageserver/src/tenant/storage_layer/errors.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use crate::tenant::blob_io::WriteBlobError;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum PutError {
|
||||
#[error(transparent)]
|
||||
WriteBlob(WriteBlobError),
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl PutError {
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
PutError::WriteBlob(e) => e.is_cancel(),
|
||||
PutError::Other(_) => false,
|
||||
}
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
PutError::WriteBlob(e) => e.into_anyhow(),
|
||||
PutError::Other(e) => e,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -157,7 +157,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let merge_iter = MergeIterator::create(
|
||||
let merge_iter = MergeIterator::create_for_testing(
|
||||
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
|
||||
&[],
|
||||
&ctx,
|
||||
@@ -182,7 +182,7 @@ mod tests {
|
||||
result.extend(test_deltas1[90..100].iter().cloned());
|
||||
assert_filter_iter_equal(&mut filter_iter, &result).await;
|
||||
|
||||
let merge_iter = MergeIterator::create(
|
||||
let merge_iter = MergeIterator::create_for_testing(
|
||||
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
|
||||
&[],
|
||||
&ctx,
|
||||
|
||||
@@ -53,6 +53,7 @@ use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::layer_name::ImageLayerName;
|
||||
use super::{
|
||||
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
|
||||
@@ -684,14 +685,6 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -850,8 +843,14 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
) -> Result<(), PutError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(PutError::Other(anyhow::anyhow!(
|
||||
"key {:?} not in range {:?}",
|
||||
key,
|
||||
self.key_range
|
||||
)));
|
||||
}
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
@@ -861,7 +860,7 @@ impl ImageLayerWriterInner {
|
||||
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let (off, compression_info) = res?;
|
||||
let (off, compression_info) = res.map_err(PutError::WriteBlob)?;
|
||||
if compression_info.compressed_size.is_some() {
|
||||
// The image has been considered for compression at least
|
||||
self.uncompressed_bytes_eligible += uncompressed_len;
|
||||
@@ -873,7 +872,10 @@ impl ImageLayerWriterInner {
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
self.tree
|
||||
.append(&keybuf, off)
|
||||
.map_err(anyhow::Error::new)
|
||||
.map_err(PutError::Other)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -1093,7 +1095,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), PutError> {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
@@ -1240,7 +1242,6 @@ mod test {
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::{TenantShard, Timeline};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1507,8 +1508,7 @@ mod test {
|
||||
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
|
||||
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
|
||||
// Test if the batch size is correctly determined
|
||||
let mut iter = img_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
let mut num_items = 0;
|
||||
for _ in 0..3 {
|
||||
iter.next_batch().await.unwrap();
|
||||
@@ -1525,8 +1525,7 @@ mod test {
|
||||
iter.key_values_batch.clear();
|
||||
}
|
||||
// Test if the result is correct
|
||||
let mut iter = img_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use super::{
|
||||
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::context::{RequestContext, RequestContextBuilder};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
@@ -1076,24 +1076,17 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let ctx = if ctx.has_perf_span() {
|
||||
let dl_ctx = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::LayerDownload)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.root_perf_span(|| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason
|
||||
)
|
||||
})
|
||||
.detached_child();
|
||||
ctx.perf_follows_from(&dl_ctx);
|
||||
dl_ctx
|
||||
} else {
|
||||
ctx.attached_child()
|
||||
};
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"DOWNLOAD_LAYER",
|
||||
layer = %self,
|
||||
reason = %reason,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
async move {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
@@ -1101,7 +1094,7 @@ impl LayerInner {
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
@@ -1709,7 +1702,7 @@ impl DownloadError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub(crate) enum NeedsDownload {
|
||||
NotFound,
|
||||
NotFile(std::fs::FileType),
|
||||
|
||||
@@ -19,14 +19,6 @@ pub(crate) enum LayerRef<'a> {
|
||||
}
|
||||
|
||||
impl<'a> LayerRef<'a> {
|
||||
#[allow(dead_code)]
|
||||
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
|
||||
match self {
|
||||
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
|
||||
Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
|
||||
}
|
||||
}
|
||||
|
||||
fn iter_with_options(
|
||||
self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -322,6 +314,28 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
|
||||
}
|
||||
|
||||
impl<'a> MergeIterator<'a> {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn create_for_testing(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
/// Create a new merge iterator with custom options.
|
||||
///
|
||||
/// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
|
||||
/// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
|
||||
/// the buffer does not take too much memory.
|
||||
///
|
||||
/// The default options for L0 compactions are:
|
||||
/// - max_read_size: 1024 * 8192 (8MB)
|
||||
/// - max_batch_size: 1024
|
||||
///
|
||||
/// The default options for gc-compaction are:
|
||||
/// - max_read_size: 128 * 8192 (1MB)
|
||||
/// - max_batch_size: 128
|
||||
pub fn create_with_options(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
@@ -351,14 +365,6 @@ impl<'a> MergeIterator<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
|
||||
while let Some(mut iter) = self.heap.peek_mut() {
|
||||
if !iter.is_loaded() {
|
||||
@@ -477,7 +483,7 @@ mod tests {
|
||||
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -549,7 +555,7 @@ mod tests {
|
||||
let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -670,7 +676,7 @@ mod tests {
|
||||
// Test with different layer order for MergeIterator::create to ensure the order
|
||||
// is stable.
|
||||
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -682,7 +688,7 @@ mod tests {
|
||||
);
|
||||
assert_merge_iter_equal(&mut merge_iter, &expect).await;
|
||||
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
|
||||
|
||||
@@ -340,7 +340,7 @@ pub(crate) fn log_compaction_error(
|
||||
} else {
|
||||
match level {
|
||||
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
|
||||
Level::ERROR => error!("Compaction failed: {err:#}"),
|
||||
Level::ERROR => error!("Compaction failed: {err:?}"),
|
||||
Level::INFO => info!("Compaction failed: {err:#}"),
|
||||
level => unimplemented!("unexpected level {level:?}"),
|
||||
}
|
||||
|
||||
@@ -199,11 +199,8 @@ pub struct TimelineResources {
|
||||
|
||||
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
|
||||
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
|
||||
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
|
||||
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
|
||||
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
|
||||
/// implicitly extends the relation.
|
||||
pub(crate) struct RelSizeCache {
|
||||
pub(crate) complete_as_of: Lsn,
|
||||
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
|
||||
}
|
||||
|
||||
@@ -987,6 +984,16 @@ impl From<PageReconstructError> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::storage_layer::errors::PutError> for CreateImageLayersError {
|
||||
fn from(e: super::storage_layer::errors::PutError) -> Self {
|
||||
if e.is_cancel() {
|
||||
CreateImageLayersError::Cancelled
|
||||
} else {
|
||||
CreateImageLayersError::Other(e.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for CreateImageLayersError {
|
||||
fn from(e: GetVectoredError) -> Self {
|
||||
match e {
|
||||
@@ -2117,22 +2124,14 @@ impl Timeline {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Regardless of whether we're going to try_freeze_and_flush
|
||||
// or not, stop ingesting any more data. Walreceiver only provides
|
||||
// cancellation but no "wait until gone", because it uses the Timeline::gate.
|
||||
// So, only after the self.gate.close() below will we know for sure that
|
||||
// no walreceiver tasks are left.
|
||||
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
|
||||
// data during the call to `self.freeze_and_flush()` below.
|
||||
// That's not ideal, but, we don't have the concept of a ChildGuard,
|
||||
// which is what we'd need to properly model early shutdown of the walreceiver
|
||||
// task sub-tree before the other Timeline task sub-trees.
|
||||
// or not, stop ingesting any more data.
|
||||
let walreceiver = self.walreceiver.lock().unwrap().take();
|
||||
tracing::debug!(
|
||||
is_some = walreceiver.is_some(),
|
||||
"Waiting for WalReceiverManager..."
|
||||
);
|
||||
if let Some(walreceiver) = walreceiver {
|
||||
walreceiver.cancel();
|
||||
walreceiver.shutdown().await;
|
||||
}
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
@@ -2968,7 +2967,6 @@ impl Timeline {
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(RelSizeCache {
|
||||
complete_as_of: disk_consistent_lsn,
|
||||
map: HashMap::new(),
|
||||
}),
|
||||
|
||||
@@ -5923,6 +5921,16 @@ impl From<layer_manager::Shutdown> for CompactionError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::storage_layer::errors::PutError> for CompactionError {
|
||||
fn from(e: super::storage_layer::errors::PutError) -> Self {
|
||||
if e.is_cancel() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::Other(e.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(serde::Serialize)]
|
||||
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
|
||||
|
||||
@@ -1277,6 +1277,8 @@ impl Timeline {
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
|
||||
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
@@ -1287,7 +1289,7 @@ impl Timeline {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
@@ -1341,6 +1343,10 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(_) => {
|
||||
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
Err(err) if err.is_cancel() => {}
|
||||
@@ -1994,7 +2000,13 @@ impl Timeline {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
MergeIterator::create_with_options(
|
||||
&deltas,
|
||||
&[],
|
||||
ctx,
|
||||
1024 * 8192, /* 8 MiB buffer per layer iterator */
|
||||
1024,
|
||||
)
|
||||
};
|
||||
|
||||
// This iterator walks through all keys and is needed to calculate size used by each key
|
||||
@@ -2198,8 +2210,7 @@ impl Timeline {
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.await?;
|
||||
} else {
|
||||
let owner = self.shard_identity.get_shard_number(&key);
|
||||
|
||||
@@ -2828,7 +2839,7 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the memory usage is within the limit.
|
||||
/// Check to bail out of gc compaction early if it would use too much memory.
|
||||
async fn check_memory_usage(
|
||||
self: &Arc<Self>,
|
||||
layer_selection: &[Layer],
|
||||
@@ -2841,7 +2852,8 @@ impl Timeline {
|
||||
let layer_desc = layer.layer_desc();
|
||||
if layer_desc.is_delta() {
|
||||
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
|
||||
// Multiply the layer size so that tests can pass.
|
||||
// Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
|
||||
// use 3MB layer size and we need to account for that).
|
||||
estimated_memory_usage_mb +=
|
||||
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
|
||||
num_delta_layers += 1;
|
||||
@@ -3600,6 +3612,13 @@ impl Timeline {
|
||||
last_key = Some(key);
|
||||
}
|
||||
accumulated_values.push((key, lsn, val));
|
||||
|
||||
if accumulated_values.len() >= 65536 {
|
||||
// Assume all of them are images, that would be 512MB of data in memory for a single key.
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"too many values for a single key, giving up gc-compaction"
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
let last_key: &mut Key = last_key.as_mut().unwrap();
|
||||
stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
|
||||
|
||||
@@ -149,14 +149,7 @@ pub async fn doit(
|
||||
}
|
||||
.await?;
|
||||
|
||||
flow::run(
|
||||
timeline.clone(),
|
||||
base_lsn,
|
||||
control_file,
|
||||
storage.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
|
||||
|
||||
//
|
||||
// Communicate that shard is done.
|
||||
|
||||
@@ -34,7 +34,9 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, ensure};
|
||||
use bytes::Bytes;
|
||||
use futures::stream::FuturesOrdered;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::TimelineImportConfig;
|
||||
use pageserver_api::key::{
|
||||
CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
|
||||
rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -46,8 +48,9 @@ use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::relfile_utils::parse_relfilename;
|
||||
use postgres_ffi::{BLCKSZ, pg_constants};
|
||||
use remote_storage::RemotePath;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{Instrument, debug, info_span, instrument};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{debug, instrument};
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -63,37 +66,39 @@ use crate::tenant::storage_layer::{ImageLayerWriter, Layer};
|
||||
|
||||
pub async fn run(
|
||||
timeline: Arc<Timeline>,
|
||||
pgdata_lsn: Lsn,
|
||||
control_file: ControlFile,
|
||||
storage: RemoteStorageWrapper,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
Flow {
|
||||
timeline,
|
||||
pgdata_lsn,
|
||||
let planner = Planner {
|
||||
control_file,
|
||||
tasks: Vec::new(),
|
||||
storage,
|
||||
}
|
||||
.run(ctx)
|
||||
.await
|
||||
storage: storage.clone(),
|
||||
shard: timeline.shard_identity,
|
||||
tasks: Vec::default(),
|
||||
};
|
||||
|
||||
let import_config = &timeline.conf.timeline_import_config;
|
||||
let plan = planner.plan(import_config).await?;
|
||||
plan.execute(timeline, import_config, ctx).await
|
||||
}
|
||||
|
||||
struct Flow {
|
||||
timeline: Arc<Timeline>,
|
||||
pgdata_lsn: Lsn,
|
||||
struct Planner {
|
||||
control_file: ControlFile,
|
||||
tasks: Vec<AnyImportTask>,
|
||||
storage: RemoteStorageWrapper,
|
||||
shard: ShardIdentity,
|
||||
tasks: Vec<AnyImportTask>,
|
||||
}
|
||||
|
||||
impl Flow {
|
||||
/// Perform the ingestion into [`Self::timeline`].
|
||||
/// Assumes the timeline is empty (= no layers).
|
||||
pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
|
||||
struct Plan {
|
||||
jobs: Vec<ChunkProcessingJob>,
|
||||
}
|
||||
|
||||
self.pgdata_lsn = pgdata_lsn;
|
||||
impl Planner {
|
||||
/// Creates an import plan
|
||||
///
|
||||
/// This function is and must remain pure: given the same input, it will generate the same import plan.
|
||||
async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
|
||||
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
|
||||
|
||||
let datadir = PgDataDir::new(&self.storage).await?;
|
||||
|
||||
@@ -115,7 +120,7 @@ impl Flow {
|
||||
}
|
||||
|
||||
// Import SLRUs
|
||||
if self.timeline.tenant_shard_id.is_shard_zero() {
|
||||
if self.shard.is_shard_zero() {
|
||||
// pg_xact (01:00 keyspace)
|
||||
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
|
||||
.await?;
|
||||
@@ -166,14 +171,16 @@ impl Flow {
|
||||
let mut last_end_key = Key::MIN;
|
||||
let mut current_chunk = Vec::new();
|
||||
let mut current_chunk_size: usize = 0;
|
||||
let mut parallel_jobs = Vec::new();
|
||||
let mut jobs = Vec::new();
|
||||
for task in std::mem::take(&mut self.tasks).into_iter() {
|
||||
if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
|
||||
if current_chunk_size + task.total_size()
|
||||
> import_config.import_job_soft_size_limit.into()
|
||||
{
|
||||
let key_range = last_end_key..task.key_range().start;
|
||||
parallel_jobs.push(ChunkProcessingJob::new(
|
||||
jobs.push(ChunkProcessingJob::new(
|
||||
key_range.clone(),
|
||||
std::mem::take(&mut current_chunk),
|
||||
&self,
|
||||
pgdata_lsn,
|
||||
));
|
||||
last_end_key = key_range.end;
|
||||
current_chunk_size = 0;
|
||||
@@ -181,45 +188,13 @@ impl Flow {
|
||||
current_chunk_size += task.total_size();
|
||||
current_chunk.push(task);
|
||||
}
|
||||
parallel_jobs.push(ChunkProcessingJob::new(
|
||||
jobs.push(ChunkProcessingJob::new(
|
||||
last_end_key..Key::MAX,
|
||||
current_chunk,
|
||||
&self,
|
||||
pgdata_lsn,
|
||||
));
|
||||
|
||||
// Start all jobs simultaneosly
|
||||
let mut work = JoinSet::new();
|
||||
// TODO: semaphore?
|
||||
for job in parallel_jobs {
|
||||
let ctx: RequestContext =
|
||||
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
|
||||
work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
|
||||
}
|
||||
let mut results = Vec::new();
|
||||
while let Some(result) = work.join_next().await {
|
||||
match result {
|
||||
Ok(res) => {
|
||||
results.push(res);
|
||||
}
|
||||
Err(_joinset_err) => {
|
||||
results.push(Err(anyhow::anyhow!(
|
||||
"parallel job panicked or cancelled, check pageserver logs"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if results.iter().all(|r| r.is_ok()) {
|
||||
Ok(())
|
||||
} else {
|
||||
let mut msg = String::new();
|
||||
for result in results {
|
||||
if let Err(err) = result {
|
||||
msg.push_str(&format!("{err:?}\n\n"));
|
||||
}
|
||||
}
|
||||
bail!("Some parallel jobs failed:\n\n{msg}");
|
||||
}
|
||||
Ok(Plan { jobs })
|
||||
}
|
||||
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
|
||||
@@ -266,7 +241,7 @@ impl Flow {
|
||||
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
|
||||
self.tasks
|
||||
.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
|
||||
*self.timeline.get_shard_identity(),
|
||||
self.shard,
|
||||
start_key..end_key,
|
||||
&file.path,
|
||||
self.storage.clone(),
|
||||
@@ -289,7 +264,7 @@ impl Flow {
|
||||
}
|
||||
|
||||
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
|
||||
assert!(self.timeline.tenant_shard_id.is_shard_zero());
|
||||
assert!(self.shard.is_shard_zero());
|
||||
|
||||
let segments = self.storage.listfilesindir(path).await?;
|
||||
let segments: Vec<(String, u32, usize)> = segments
|
||||
@@ -344,6 +319,68 @@ impl Flow {
|
||||
}
|
||||
}
|
||||
|
||||
impl Plan {
|
||||
async fn execute(
|
||||
self,
|
||||
timeline: Arc<Timeline>,
|
||||
import_config: &TimelineImportConfig,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut work = FuturesOrdered::new();
|
||||
let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
|
||||
|
||||
let jobs_in_plan = self.jobs.len();
|
||||
|
||||
let mut jobs = self.jobs.into_iter().enumerate().peekable();
|
||||
let mut results = Vec::new();
|
||||
|
||||
// Run import jobs concurrently up to the limit specified by the pageserver configuration.
|
||||
// Note that we process completed futures in the oreder of insertion. This will be the
|
||||
// building block for resuming imports across pageserver restarts or tenant migrations.
|
||||
while results.len() < jobs_in_plan {
|
||||
tokio::select! {
|
||||
permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
|
||||
let permit = permit.expect("never closed");
|
||||
let (job_idx, job) = jobs.next().expect("we peeked");
|
||||
let job_timeline = timeline.clone();
|
||||
let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
|
||||
|
||||
work.push_back(tokio::task::spawn(async move {
|
||||
let _permit = permit;
|
||||
let res = job.run(job_timeline, &ctx).await;
|
||||
(job_idx, res)
|
||||
}));
|
||||
},
|
||||
maybe_complete_job_idx = work.next() => {
|
||||
match maybe_complete_job_idx {
|
||||
Some(Ok((_job_idx, res))) => {
|
||||
results.push(res);
|
||||
},
|
||||
Some(Err(_)) => {
|
||||
results.push(Err(anyhow::anyhow!(
|
||||
"parallel job panicked or cancelled, check pageserver logs"
|
||||
)));
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if results.iter().all(|r| r.is_ok()) {
|
||||
Ok(())
|
||||
} else {
|
||||
let mut msg = String::new();
|
||||
for result in results {
|
||||
if let Err(err) = result {
|
||||
msg.push_str(&format!("{err:?}\n\n"));
|
||||
}
|
||||
}
|
||||
bail!("Some parallel jobs failed:\n\n{msg}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// dbdir iteration tools
|
||||
//
|
||||
@@ -713,7 +750,6 @@ impl From<ImportSlruBlocksTask> for AnyImportTask {
|
||||
}
|
||||
|
||||
struct ChunkProcessingJob {
|
||||
timeline: Arc<Timeline>,
|
||||
range: Range<Key>,
|
||||
tasks: Vec<AnyImportTask>,
|
||||
|
||||
@@ -721,25 +757,24 @@ struct ChunkProcessingJob {
|
||||
}
|
||||
|
||||
impl ChunkProcessingJob {
|
||||
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
|
||||
assert!(env.pgdata_lsn.is_valid());
|
||||
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
|
||||
assert!(pgdata_lsn.is_valid());
|
||||
Self {
|
||||
timeline: env.timeline.clone(),
|
||||
range,
|
||||
tasks,
|
||||
pgdata_lsn: env.pgdata_lsn,
|
||||
pgdata_lsn,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
let mut writer = ImageLayerWriter::new(
|
||||
self.timeline.conf,
|
||||
self.timeline.timeline_id,
|
||||
self.timeline.tenant_shard_id,
|
||||
timeline.conf,
|
||||
timeline.timeline_id,
|
||||
timeline.tenant_shard_id,
|
||||
&self.range,
|
||||
self.pgdata_lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -751,24 +786,20 @@ impl ChunkProcessingJob {
|
||||
|
||||
let resident_layer = if nimages > 0 {
|
||||
let (desc, path) = writer.finish(ctx).await?;
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
|
||||
Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
|
||||
} else {
|
||||
// dropping the writer cleans up
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// this is sharing the same code as create_image_layers
|
||||
let mut guard = self.timeline.layers.write().await;
|
||||
let mut guard = timeline.layers.write().await;
|
||||
guard
|
||||
.open_mut()?
|
||||
.track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
|
||||
.track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
|
||||
crate::tenant::timeline::drop_wlock(guard);
|
||||
|
||||
// Schedule the layer for upload but don't add barriers such as
|
||||
// wait for completion or index upload, so we don't inhibit upload parallelism.
|
||||
// TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
|
||||
// TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
|
||||
self.timeline
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_layer_file_upload(resident_layer)?;
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ pub struct WalReceiver {
|
||||
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
|
||||
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
|
||||
cancel: CancellationToken,
|
||||
task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl WalReceiver {
|
||||
@@ -79,7 +80,7 @@ impl WalReceiver {
|
||||
let loop_status = Arc::new(std::sync::RwLock::new(None));
|
||||
let manager_status = Arc::clone(&loop_status);
|
||||
let cancel = timeline.cancel.child_token();
|
||||
WALRECEIVER_RUNTIME.spawn({
|
||||
let task = WALRECEIVER_RUNTIME.spawn({
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -120,14 +121,25 @@ impl WalReceiver {
|
||||
Self {
|
||||
manager_status,
|
||||
cancel,
|
||||
task,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = tracing::Level::DEBUG)]
|
||||
pub fn cancel(&self) {
|
||||
pub async fn shutdown(self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug!("cancelling walreceiver tasks");
|
||||
self.cancel.cancel();
|
||||
match self.task.await {
|
||||
Ok(()) => debug!("Shutdown success"),
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
// already logged by panic hook
|
||||
}
|
||||
Err(je) => {
|
||||
error!("shutdown walreceiver task join error: {je}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||
|
||||
@@ -99,7 +97,7 @@ impl VirtualFile {
|
||||
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let mode = get_io_mode();
|
||||
@@ -112,21 +110,16 @@ impl VirtualFile {
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::DirectRw, _) => true,
|
||||
};
|
||||
let open_options = open_options.clone();
|
||||
let open_options = if set_o_direct {
|
||||
if set_o_direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let mut open_options = open_options;
|
||||
open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
open_options
|
||||
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
unreachable!(
|
||||
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
|
||||
);
|
||||
} else {
|
||||
open_options
|
||||
};
|
||||
}
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile { inner, _mode: mode })
|
||||
}
|
||||
@@ -530,7 +523,7 @@ impl VirtualFileInner {
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
@@ -558,10 +551,11 @@ impl VirtualFileInner {
|
||||
// It would perhaps be nicer to check just for the read and write flags
|
||||
// explicitly, but OpenOptions doesn't contain any functions to read flags,
|
||||
// only to set them.
|
||||
let mut reopen_options = open_options.clone();
|
||||
reopen_options.create(false);
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
let reopen_options = open_options
|
||||
.clone()
|
||||
.create(false)
|
||||
.create_new(false)
|
||||
.truncate(false);
|
||||
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
@@ -1307,7 +1301,7 @@ mod tests {
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
|
||||
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
|
||||
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||
}
|
||||
}
|
||||
@@ -1374,7 +1368,7 @@ mod tests {
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
@@ -1393,8 +1387,7 @@ mod tests {
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.to_owned(),
|
||||
.truncate(true),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1412,12 +1405,7 @@ mod tests {
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = A::open(
|
||||
path_b.clone(),
|
||||
OpenOptions::new().read(true).to_owned(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
@@ -1466,7 +1454,7 @@ mod tests {
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true).clone(),
|
||||
OpenOptions::new().read(true),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -111,13 +111,17 @@ pub(crate) fn get() -> IoEngine {
|
||||
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::atomic::{AtomicU8, Ordering};
|
||||
#[cfg(target_os = "linux")]
|
||||
use {std::time::Duration, tracing::info};
|
||||
|
||||
use super::owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use super::owned_buffers_io::slice::SliceMutExt;
|
||||
use super::{FileGuard, Metadata};
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
|
||||
pub(super) fn epoll_uring_error_to_std(
|
||||
e: tokio_epoll_uring::Error<std::io::Error>,
|
||||
) -> std::io::Error {
|
||||
match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
@@ -149,7 +153,11 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, slice).await;
|
||||
let (resources, res) =
|
||||
retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async {
|
||||
system.read(file_guard, offset, slice).await
|
||||
})
|
||||
.await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
@@ -164,7 +172,10 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.fsync(file_guard).await;
|
||||
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
|
||||
system.fsync(file_guard).await
|
||||
})
|
||||
.await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
@@ -182,7 +193,10 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.fdatasync(file_guard).await;
|
||||
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
|
||||
system.fdatasync(file_guard).await
|
||||
})
|
||||
.await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
@@ -201,7 +215,10 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.statx(file_guard).await;
|
||||
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
|
||||
system.statx(file_guard).await
|
||||
})
|
||||
.await;
|
||||
(
|
||||
resources,
|
||||
res.map_err(epoll_uring_error_to_std).map(Metadata::from),
|
||||
@@ -224,6 +241,7 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
// TODO: ftruncate op for tokio-epoll-uring
|
||||
// Don't forget to use retry_ecanceled_once
|
||||
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
||||
(file_guard, res)
|
||||
}
|
||||
@@ -245,8 +263,11 @@ impl IoEngine {
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let ((file_guard, slice), res) =
|
||||
system.write(file_guard, offset, buf.into_raw_slice()).await;
|
||||
let ((file_guard, slice), res) = retry_ecanceled_once(
|
||||
(file_guard, buf.into_raw_slice()),
|
||||
async |(file_guard, buf)| system.write(file_guard, offset, buf).await,
|
||||
)
|
||||
.await;
|
||||
(
|
||||
(file_guard, FullSlice::must_new(slice)),
|
||||
res.map_err(epoll_uring_error_to_std),
|
||||
@@ -282,6 +303,56 @@ impl IoEngine {
|
||||
}
|
||||
}
|
||||
|
||||
/// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data,
|
||||
/// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED.
|
||||
/// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals.
|
||||
/// Investigation ticket: <https://github.com/neondatabase/neon/issues/11446>
|
||||
///
|
||||
/// This function retries the operation once if it fails with ECANCELED.
|
||||
/// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations.
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(super) async fn retry_ecanceled_once<F, Fut, T, V>(
|
||||
resources: T,
|
||||
f: F,
|
||||
) -> (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)
|
||||
where
|
||||
F: Fn(T) -> Fut,
|
||||
Fut: std::future::Future<Output = (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)>,
|
||||
T: Send,
|
||||
V: Send,
|
||||
{
|
||||
let (resources, res) = f(resources).await;
|
||||
let Err(e) = res else {
|
||||
return (resources, res);
|
||||
};
|
||||
let tokio_epoll_uring::Error::Op(err) = e else {
|
||||
return (resources, Err(e));
|
||||
};
|
||||
if err.raw_os_error() != Some(nix::libc::ECANCELED) {
|
||||
return (resources, Err(tokio_epoll_uring::Error::Op(err)));
|
||||
}
|
||||
{
|
||||
static RATE_LIMIT: std::sync::Mutex<utils::rate_limit::RateLimit> =
|
||||
std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1)));
|
||||
let mut guard = RATE_LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
info!(
|
||||
%rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited"
|
||||
);
|
||||
});
|
||||
drop(guard);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners
|
||||
let (resources, res) = f(resources).await;
|
||||
(resources, res)
|
||||
}
|
||||
|
||||
pub(super) fn panic_operation_must_be_idempotent() {
|
||||
panic!(
|
||||
"unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)"
|
||||
)
|
||||
}
|
||||
|
||||
pub enum FeatureTestResult {
|
||||
PlatformPreferred(IoEngineKind),
|
||||
Worse {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::path::Path;
|
||||
|
||||
use super::io_engine::IoEngine;
|
||||
@@ -43,7 +44,7 @@ impl OpenOptions {
|
||||
self.write
|
||||
}
|
||||
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
pub fn read(mut self, read: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
@@ -56,7 +57,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
pub fn write(mut self, write: bool) -> Self {
|
||||
self.write = write;
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
@@ -70,7 +71,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
pub fn create(mut self, create: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
@@ -83,7 +84,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
pub fn create_new(mut self, create_new: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
@@ -96,7 +97,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
pub fn truncate(mut self, truncate: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
@@ -109,25 +110,28 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
/// Don't use, `O_APPEND` is not supported.
|
||||
pub fn append(&mut self, _append: bool) {
|
||||
super::io_engine::panic_operation_must_be_idempotent();
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match &self.inner {
|
||||
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
#[cfg(target_os = "linux")]
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
|
||||
system.open(path, x).await.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async {
|
||||
let res = system.open(path, x).await;
|
||||
((), res)
|
||||
})
|
||||
.await;
|
||||
res.map_err(super::io_engine::epoll_uring_error_to_std)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
pub fn mode(mut self, mode: u32) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
@@ -140,7 +144,10 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
pub fn custom_flags(mut self, flags: i32) -> Self {
|
||||
if flags & nix::libc::O_APPEND != 0 {
|
||||
super::io_engine::panic_operation_must_be_idempotent();
|
||||
}
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
|
||||
@@ -247,6 +247,19 @@ pub enum FlushTaskError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl FlushTaskError {
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
FlushTaskError::Cancelled => true,
|
||||
}
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
FlushTaskError::Cancelled => anyhow::anyhow!(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Buf, W> FlushBackgroundTask<Buf, W>
|
||||
where
|
||||
Buf: IoBufAligned + Send + Sync,
|
||||
|
||||
@@ -425,15 +425,12 @@ compact_prefetch_buffers(void)
|
||||
* point inside and outside PostgreSQL.
|
||||
*
|
||||
* This still does throw errors when it receives malformed responses from PS.
|
||||
*
|
||||
* When we're not called from CHECK_FOR_INTERRUPTS (indicated by
|
||||
* IsHandlingInterrupts) we also report we've ended prefetch receive work,
|
||||
* just in case state tracking was lost due to an error in the sync getPage
|
||||
* response code.
|
||||
*/
|
||||
void
|
||||
communicator_prefetch_pump_state(bool IsHandlingInterrupts)
|
||||
communicator_prefetch_pump_state(void)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive != MyPState->ring_flush)
|
||||
{
|
||||
NeonResponse *response;
|
||||
@@ -482,9 +479,7 @@ communicator_prefetch_pump_state(bool IsHandlingInterrupts)
|
||||
}
|
||||
}
|
||||
|
||||
/* We never pump the prefetch state while handling other pages */
|
||||
if (!IsHandlingInterrupts)
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
@@ -672,9 +667,10 @@ prefetch_wait_for(uint64 ring_index)
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive <= ring_index)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
entry = GetPrfSlot(MyPState->ring_receive);
|
||||
|
||||
Assert(entry->status == PRFS_REQUESTED);
|
||||
@@ -683,17 +679,18 @@ prefetch_wait_for(uint64 ring_index)
|
||||
result = false;
|
||||
break;
|
||||
}
|
||||
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
if (result)
|
||||
{
|
||||
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
|
||||
PrefetchRequest *slot = GetPrfSlot(ring_index);
|
||||
return slot->status == PRFS_RECEIVED;
|
||||
result = slot->status == PRFS_RECEIVED;
|
||||
}
|
||||
return false;
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
return result;
|
||||
;
|
||||
}
|
||||
|
||||
@@ -720,6 +717,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
Assert(readpage_reentrant_guard);
|
||||
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
@@ -802,6 +800,7 @@ communicator_prefetch_receive(BufferTag tag)
|
||||
PrfHashEntry *entry;
|
||||
PrefetchRequest hashkey;
|
||||
|
||||
Assert(readpage_reentrant_guard);
|
||||
hashkey.buftag = tag;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
|
||||
@@ -821,8 +820,12 @@ communicator_prefetch_receive(BufferTag tag)
|
||||
void
|
||||
prefetch_on_ps_disconnect(void)
|
||||
{
|
||||
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
|
||||
/* Prohibit callig of prefetch_pump_state */
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive < MyPState->ring_unused)
|
||||
{
|
||||
PrefetchRequest *slot;
|
||||
@@ -851,6 +854,9 @@ prefetch_on_ps_disconnect(void)
|
||||
MyNeonCounters->getpage_prefetch_discards_total += 1;
|
||||
}
|
||||
|
||||
/* Restore guard */
|
||||
readpage_reentrant_guard = save_readpage_reentrant_guard;
|
||||
|
||||
/*
|
||||
* We can have gone into retry due to network error, so update stats with
|
||||
* the latest available
|
||||
@@ -2509,7 +2515,7 @@ communicator_processinterrupts(void)
|
||||
if (timeout_signaled)
|
||||
{
|
||||
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
|
||||
communicator_prefetch_pump_state(true);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
timeout_signaled = false;
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
|
||||
@@ -44,7 +44,7 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
|
||||
void *buffer);
|
||||
|
||||
extern void communicator_reconfigure_timeout_if_needed(void);
|
||||
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
|
||||
extern void communicator_prefetch_pump_state(void);
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "portability/instr_time.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
@@ -79,6 +80,7 @@ int neon_protocol_version = 3;
|
||||
static int neon_compute_mode = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
static int max_sockets;
|
||||
|
||||
static int pageserver_response_log_timeout = 10000;
|
||||
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
|
||||
@@ -336,6 +338,13 @@ load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p)
|
||||
pageserver_disconnect(i);
|
||||
}
|
||||
pagestore_local_counter = end_update_counter;
|
||||
|
||||
/* Reserve file descriptors for sockets */
|
||||
while (max_sockets < num_shards)
|
||||
{
|
||||
max_sockets += 1;
|
||||
ReserveExternalFD();
|
||||
}
|
||||
}
|
||||
|
||||
if (num_shards_p)
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/buf_internals.h"
|
||||
@@ -396,9 +397,10 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
|
||||
XLogRecPtr
|
||||
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
|
||||
{
|
||||
if (lsn < FirstNormalUnloggedLSN || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
|
||||
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
|
||||
return lsn;
|
||||
|
||||
Assert(lsn >= WalSegMinSize);
|
||||
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
|
||||
lsn = SetLastWrittenLSNForBlockRangeInternal(lsn, rlocator, forknum, from, n_blocks);
|
||||
LWLockRelease(LastWrittenLsnLock);
|
||||
@@ -435,7 +437,6 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
|
||||
NInfoGetRelNumber(relfilenode) == InvalidOid)
|
||||
return InvalidXLogRecPtr;
|
||||
|
||||
|
||||
BufTagInit(key, relNumber, forknum, blockno, spcOid, dbOid);
|
||||
|
||||
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
|
||||
@@ -444,6 +445,10 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
|
||||
{
|
||||
XLogRecPtr lsn = lsns[i];
|
||||
|
||||
if (lsn == InvalidXLogRecPtr)
|
||||
continue;
|
||||
|
||||
Assert(lsn >= WalSegMinSize);
|
||||
key.blockNum = blockno + i;
|
||||
entry = hash_search(lastWrittenLsnCache, &key, HASH_ENTER, &found);
|
||||
if (found)
|
||||
|
||||
@@ -150,7 +150,7 @@ NeonWALReaderFree(NeonWALReader *state)
|
||||
* fetched from timeline 'tli'.
|
||||
*
|
||||
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
|
||||
* occurs, in which case 'err' has the desciption. Error always closes remote
|
||||
* occurs, in which case 'err' has the description. Error always closes remote
|
||||
* connection, if there was any, so socket subscription should be removed.
|
||||
*
|
||||
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with
|
||||
|
||||
@@ -1179,7 +1179,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
blocknum += iterblocks;
|
||||
}
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -1218,7 +1218,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
|
||||
communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -1262,7 +1262,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
*/
|
||||
neon_log(SmgrTrace, "writeback noop");
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1315,7 +1315,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
}
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
|
||||
|
||||
@@ -1339,7 +1339,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -1449,7 +1449,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
nblocks, PG_IOV_MAX);
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
@@ -1480,7 +1480,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -1665,7 +1665,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
|
||||
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1727,7 +1727,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||
|
||||
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1902,7 +1902,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
|
||||
|
||||
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1989,8 +1989,14 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
/*
|
||||
* We have to disable this check for pg14-16 because sorted build of GIST index requires
|
||||
* to perform unlogged build several times
|
||||
*/
|
||||
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
|
||||
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
#endif
|
||||
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
|
||||
|
||||
@@ -124,6 +124,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
}
|
||||
else
|
||||
{
|
||||
wp->safekeepers_generation = INVALID_GENERATION;
|
||||
host = wp->config->safekeepers_list;
|
||||
}
|
||||
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
|
||||
@@ -756,7 +757,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.members.m[i];
|
||||
|
||||
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
if (sk_id->node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
/*
|
||||
* If mconf or list of safekeepers to connect to changed (the
|
||||
@@ -781,7 +782,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
|
||||
|
||||
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
if (sk_id->node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
|
||||
{
|
||||
@@ -1071,7 +1072,6 @@ RecvVoteResponse(Safekeeper *sk)
|
||||
/* ready for elected message */
|
||||
sk->state = SS_WAIT_ELECTED;
|
||||
|
||||
wp->n_votes++;
|
||||
/* Are we already elected? */
|
||||
if (wp->state == WPS_CAMPAIGN)
|
||||
{
|
||||
|
||||
@@ -845,9 +845,6 @@ typedef struct WalProposer
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
|
||||
/* number of votes collected from safekeepers */
|
||||
int n_votes;
|
||||
|
||||
/* number of successful connections over the lifetime of walproposer */
|
||||
int n_connected;
|
||||
|
||||
|
||||
@@ -409,14 +409,22 @@ impl JwkCacheEntryLock {
|
||||
|
||||
if let Some(exp) = payload.expiration {
|
||||
if now >= exp + CLOCK_SKEW_LEEWAY {
|
||||
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired));
|
||||
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
|
||||
exp.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(nbf) = payload.not_before {
|
||||
if nbf >= now + CLOCK_SKEW_LEEWAY {
|
||||
return Err(JwtError::InvalidClaims(
|
||||
JwtClaimsError::JwtTokenNotYetReadyToUse,
|
||||
JwtClaimsError::JwtTokenNotYetReadyToUse(
|
||||
nbf.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -534,10 +542,10 @@ struct JwtPayload<'a> {
|
||||
#[serde(rename = "aud", default)]
|
||||
audience: OneOrMany,
|
||||
/// Expiration - Time after which the JWT expires
|
||||
#[serde(deserialize_with = "numeric_date_opt", rename = "exp", default)]
|
||||
#[serde(rename = "exp", deserialize_with = "numeric_date_opt", default)]
|
||||
expiration: Option<SystemTime>,
|
||||
/// Not before - Time after which the JWT expires
|
||||
#[serde(deserialize_with = "numeric_date_opt", rename = "nbf", default)]
|
||||
/// Not before - Time before which the JWT is not valid
|
||||
#[serde(rename = "nbf", deserialize_with = "numeric_date_opt", default)]
|
||||
not_before: Option<SystemTime>,
|
||||
|
||||
// the following entries are only extracted for the sake of debug logging.
|
||||
@@ -609,8 +617,15 @@ impl<'de> Deserialize<'de> for OneOrMany {
|
||||
}
|
||||
|
||||
fn numeric_date_opt<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
|
||||
let d = <Option<u64>>::deserialize(d)?;
|
||||
Ok(d.map(|n| SystemTime::UNIX_EPOCH + Duration::from_secs(n)))
|
||||
<Option<u64>>::deserialize(d)?
|
||||
.map(|t| {
|
||||
SystemTime::UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(t))
|
||||
.ok_or_else(|| {
|
||||
serde::de::Error::custom(format_args!("timestamp out of bounds: {t}"))
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
||||
struct JwkRenewalPermit<'a> {
|
||||
@@ -746,11 +761,11 @@ pub enum JwtClaimsError {
|
||||
#[error("invalid JWT token audience")]
|
||||
InvalidJwtTokenAudience,
|
||||
|
||||
#[error("JWT token has expired")]
|
||||
JwtTokenHasExpired,
|
||||
#[error("JWT token has expired (exp={0})")]
|
||||
JwtTokenHasExpired(u64),
|
||||
|
||||
#[error("JWT token is not yet ready to use")]
|
||||
JwtTokenNotYetReadyToUse,
|
||||
#[error("JWT token is not yet ready to use (nbf={0})")]
|
||||
JwtTokenNotYetReadyToUse(u64),
|
||||
}
|
||||
|
||||
#[allow(dead_code, reason = "Debug use only")]
|
||||
@@ -1233,14 +1248,14 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
"nbf": now + 60,
|
||||
"aud": "neon",
|
||||
}},
|
||||
error: JwtClaimsError::JwtTokenNotYetReadyToUse,
|
||||
error: JwtClaimsError::JwtTokenNotYetReadyToUse(now + 60),
|
||||
},
|
||||
Test {
|
||||
body: json! {{
|
||||
"exp": now - 60,
|
||||
"aud": ["neon"],
|
||||
}},
|
||||
error: JwtClaimsError::JwtTokenHasExpired,
|
||||
error: JwtClaimsError::JwtTokenHasExpired(now - 60),
|
||||
},
|
||||
Test {
|
||||
body: json! {{
|
||||
|
||||
@@ -12,9 +12,9 @@ use tracing::{debug, warn};
|
||||
use crate::auth::password_hack::parse_endpoint_param;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, SniKind};
|
||||
use crate::metrics::{Metrics, SniGroup, SniKind};
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::SERVERLESS_DRIVER_SNI;
|
||||
use crate::serverless::{AUTH_BROKER_SNI, SERVERLESS_DRIVER_SNI};
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
#[derive(Debug, Error, PartialEq, Eq, Clone)]
|
||||
@@ -32,12 +32,6 @@ pub(crate) enum ComputeUserInfoParseError {
|
||||
option: EndpointId,
|
||||
},
|
||||
|
||||
#[error(
|
||||
"Common name inferred from SNI ('{}') is not known",
|
||||
.cn,
|
||||
)]
|
||||
UnknownCommonName { cn: String },
|
||||
|
||||
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
|
||||
MalformedProjectName(EndpointId),
|
||||
}
|
||||
@@ -66,22 +60,15 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn endpoint_sni(
|
||||
sni: &str,
|
||||
common_names: &HashSet<String>,
|
||||
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
|
||||
let Some((subdomain, common_name)) = sni.split_once('.') else {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
|
||||
};
|
||||
pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<EndpointId> {
|
||||
let (subdomain, common_name) = sni.split_once('.')?;
|
||||
if !common_names.contains(common_name) {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName {
|
||||
cn: common_name.into(),
|
||||
});
|
||||
return None;
|
||||
}
|
||||
if subdomain == SERVERLESS_DRIVER_SNI {
|
||||
return Ok(None);
|
||||
if subdomain == SERVERLESS_DRIVER_SNI || subdomain == AUTH_BROKER_SNI {
|
||||
return None;
|
||||
}
|
||||
Ok(Some(EndpointId::from(subdomain)))
|
||||
Some(EndpointId::from(subdomain))
|
||||
}
|
||||
|
||||
impl ComputeUserInfoMaybeEndpoint {
|
||||
@@ -113,15 +100,8 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
})
|
||||
.map(|name| name.into());
|
||||
|
||||
let endpoint_from_domain = if let Some(sni_str) = sni {
|
||||
if let Some(cn) = common_names {
|
||||
endpoint_sni(sni_str, cn)?
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let endpoint_from_domain =
|
||||
sni.and_then(|sni_str| common_names.and_then(|cn| endpoint_sni(sni_str, cn)));
|
||||
|
||||
let endpoint = match (endpoint_option, endpoint_from_domain) {
|
||||
// Invariant: if we have both project name variants, they should match.
|
||||
@@ -148,22 +128,23 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
|
||||
let metrics = Metrics::get();
|
||||
debug!(%user, "credentials");
|
||||
if sni.is_some() {
|
||||
|
||||
let protocol = ctx.protocol();
|
||||
let kind = if sni.is_some() {
|
||||
debug!("Connection with sni");
|
||||
metrics.proxy.accepted_connections_by_sni.inc(SniKind::Sni);
|
||||
SniKind::Sni
|
||||
} else if endpoint.is_some() {
|
||||
metrics
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniKind::NoSni);
|
||||
debug!("Connection without sni");
|
||||
SniKind::NoSni
|
||||
} else {
|
||||
metrics
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniKind::PasswordHack);
|
||||
debug!("Connection with password hack");
|
||||
}
|
||||
SniKind::PasswordHack
|
||||
};
|
||||
|
||||
metrics
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniGroup { protocol, kind });
|
||||
|
||||
let options = NeonOptions::parse_params(params);
|
||||
|
||||
@@ -424,21 +405,34 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_inconsistent_sni() {
|
||||
fn parse_unknown_sni() {
|
||||
let options = StartupMessageParams::new([("user", "john_doe")]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.expect_err("should fail");
|
||||
match err {
|
||||
UnknownCommonName { cn } => {
|
||||
assert_eq!(cn, "localhost");
|
||||
}
|
||||
_ => panic!("bad error: {err:?}"),
|
||||
}
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert!(info.endpoint_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_unknown_sni_with_options() {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "endpoint=foo-bar-baz-1234"),
|
||||
]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(info.endpoint_id.as_deref(), Some("foo-bar-baz-1234"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -115,8 +115,8 @@ pub struct ProxyMetrics {
|
||||
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
|
||||
pub allowed_vpc_endpoint_ids: Histogram<10>,
|
||||
|
||||
/// Number of connections (per sni).
|
||||
pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
|
||||
/// Number of connections, by the method we used to determine the endpoint.
|
||||
pub accepted_connections_by_sni: CounterVec<SniSet>,
|
||||
|
||||
/// Number of connection failures (per kind).
|
||||
pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
|
||||
@@ -342,11 +342,20 @@ pub enum LatencyExclusions {
|
||||
ClientCplaneComputeRetry,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = SniSet)]
|
||||
pub struct SniGroup {
|
||||
pub protocol: Protocol,
|
||||
pub kind: SniKind,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
#[label(singleton = "kind")]
|
||||
pub enum SniKind {
|
||||
/// Domain name based routing. SNI for libpq/websockets. Host for HTTP
|
||||
Sni,
|
||||
/// Metadata based routing. `options` for libpq/websockets. Header for HTTP
|
||||
NoSni,
|
||||
/// Metadata based routing, using the password field.
|
||||
PasswordHack,
|
||||
}
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ pub(crate) enum HandshakeError {
|
||||
#[error("protocol violation")]
|
||||
ProtocolViolation,
|
||||
|
||||
#[error("missing certificate")]
|
||||
MissingCertificate,
|
||||
|
||||
#[error("{0}")]
|
||||
StreamUpgradeError(#[from] StreamUpgradeError),
|
||||
|
||||
@@ -42,10 +39,6 @@ impl ReportableError for HandshakeError {
|
||||
match self {
|
||||
HandshakeError::EarlyData => crate::error::ErrorKind::User,
|
||||
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
|
||||
// This error should not happen, but will if we have no default certificate and
|
||||
// the client sends no SNI extension.
|
||||
// If they provide SNI then we can be sure there is a certificate that matches.
|
||||
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
|
||||
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
|
||||
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
|
||||
StreamUpgradeError::Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
@@ -146,7 +139,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
// try parse endpoint
|
||||
let ep = conn_info
|
||||
.server_name()
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names));
|
||||
if let Some(ep) = ep {
|
||||
ctx.set_endpoint_id(ep);
|
||||
}
|
||||
@@ -161,10 +154,8 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
let (_, tls_server_end_point) = tls
|
||||
.cert_resolver
|
||||
.resolve(conn_info.server_name())
|
||||
.ok_or(HandshakeError::MissingCertificate)?;
|
||||
let (_, tls_server_end_point) =
|
||||
tls.cert_resolver.resolve(conn_info.server_name());
|
||||
|
||||
stream = PqStream {
|
||||
framed: Framed {
|
||||
|
||||
@@ -98,8 +98,7 @@ fn generate_tls_config<'a>(
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![cert.clone()], key.clone_key())?;
|
||||
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
cert_resolver.add_cert(key, vec![cert], true)?;
|
||||
let cert_resolver = CertResolver::new(key, vec![cert])?;
|
||||
|
||||
let common_names = cert_resolver.get_common_names();
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.0";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.1";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -56,6 +56,7 @@ use crate::serverless::backend::PoolingBackend;
|
||||
use crate::serverless::http_util::{api_error_into_response, json_response};
|
||||
|
||||
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
|
||||
@@ -38,7 +38,7 @@ use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::{ReadBodyError, read_body_with_limit};
|
||||
use crate::metrics::{HttpDirection, Metrics};
|
||||
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
|
||||
use crate::proxy::{NeonOptions, run_until_cancelled};
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::types::{DbName, RoleName};
|
||||
@@ -199,8 +199,7 @@ fn get_conn_info(
|
||||
let endpoint = match connection_url.host() {
|
||||
Some(url::Host::Domain(hostname)) => {
|
||||
if let Some(tls) = tls {
|
||||
endpoint_sni(hostname, &tls.common_names)?
|
||||
.ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
endpoint_sni(hostname, &tls.common_names).ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
} else {
|
||||
hostname
|
||||
.split_once('.')
|
||||
@@ -228,6 +227,32 @@ fn get_conn_info(
|
||||
}
|
||||
}
|
||||
|
||||
// check the URL that was used, for metrics
|
||||
{
|
||||
let host_endpoint = headers
|
||||
// get the host header
|
||||
.get("host")
|
||||
// extract the domain
|
||||
.and_then(|h| {
|
||||
let (host, _port) = h.to_str().ok()?.split_once(':')?;
|
||||
Some(host)
|
||||
})
|
||||
// get the endpoint prefix
|
||||
.map(|h| h.split_once('.').map_or(h, |(prefix, _)| prefix));
|
||||
|
||||
let kind = if host_endpoint == Some(&*endpoint) {
|
||||
SniKind::Sni
|
||||
} else {
|
||||
SniKind::NoSni
|
||||
};
|
||||
|
||||
let protocol = ctx.protocol();
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.accepted_connections_by_sni
|
||||
.inc(SniGroup { protocol, kind });
|
||||
}
|
||||
|
||||
ctx.set_user_agent(
|
||||
headers
|
||||
.get(hyper::header::USER_AGENT)
|
||||
|
||||
@@ -5,6 +5,7 @@ use anyhow::{Context, bail};
|
||||
use itertools::Itertools;
|
||||
use rustls::crypto::ring::{self, sign};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use rustls::sign::CertifiedKey;
|
||||
use x509_cert::der::{Reader, SliceReader};
|
||||
|
||||
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
|
||||
@@ -25,10 +26,8 @@ pub fn configure_tls(
|
||||
certs_dir: Option<&String>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
cert_resolver.add_cert_path(key_path, cert_path, true)?;
|
||||
let mut cert_resolver = CertResolver::parse_new(key_path, cert_path)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
@@ -40,11 +39,8 @@ pub fn configure_tls(
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver.add_cert_path(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
)?;
|
||||
cert_resolver
|
||||
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -83,92 +79,42 @@ pub fn configure_tls(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint),
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
Self::new(priv_key, cert_chain)
|
||||
}
|
||||
|
||||
fn add_cert_path(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
pub fn new(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
|
||||
})?
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
let mut certs = HashMap::new();
|
||||
let default = (cert.clone(), tls_server_end_point);
|
||||
certs.insert(common_name, (cert, tls_server_end_point));
|
||||
Ok(Self { certs, default })
|
||||
}
|
||||
|
||||
pub fn add_cert(
|
||||
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
self.add_cert(priv_key, cert_chain)
|
||||
}
|
||||
|
||||
fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some((cert.clone(), tls_server_end_point));
|
||||
}
|
||||
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
self.certs.insert(common_name, (cert, tls_server_end_point));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -177,12 +123,82 @@ impl CertResolver {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_key_cert(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
Ok((priv_key, cert_chain))
|
||||
}
|
||||
|
||||
fn process_key_cert(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<(String, Arc<CertifiedKey>, TlsServerEndPoint)> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
Ok((common_name, cert, tls_server_end_point))
|
||||
}
|
||||
|
||||
impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
fn resolve(
|
||||
&self,
|
||||
client_hello: rustls::server::ClientHello<'_>,
|
||||
) -> Option<Arc<rustls::sign::CertifiedKey>> {
|
||||
self.resolve(client_hello.server_name()).map(|x| x.0)
|
||||
Some(self.resolve(client_hello.server_name()).0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,7 +206,7 @@ impl CertResolver {
|
||||
pub fn resolve(
|
||||
&self,
|
||||
server_name: Option<&str>,
|
||||
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
|
||||
) -> (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint) {
|
||||
// loop here and cut off more and more subdomains until we find
|
||||
// a match to get a proper wildcard support. OTOH, we now do not
|
||||
// use nested domains, so keep this simple for now.
|
||||
@@ -200,12 +216,17 @@ impl CertResolver {
|
||||
if let Some(mut sni_name) = server_name {
|
||||
loop {
|
||||
if let Some(cert) = self.certs.get(sni_name) {
|
||||
return Some(cert.clone());
|
||||
return cert.clone();
|
||||
}
|
||||
if let Some((_, rest)) = sni_name.split_once('.') {
|
||||
sni_name = rest;
|
||||
} else {
|
||||
return None;
|
||||
// The customer has some custom DNS mapping - just return
|
||||
// a default certificate.
|
||||
//
|
||||
// This will error if the customer uses anything stronger
|
||||
// than sslmode=require. That's a choice they can make.
|
||||
return self.default.clone();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -121,6 +121,20 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn switch_timeline_membership(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &models::TimelineMembershipSwitchRequest,
|
||||
) -> Result<models::TimelineMembershipSwitchResponse> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/membership",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.put(&uri, req).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self
|
||||
|
||||
@@ -243,8 +243,7 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
|
||||
let resp =
|
||||
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{SinkExt, StreamExt, TryStreamExt};
|
||||
use http_utils::error::ApiError;
|
||||
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
@@ -30,7 +31,7 @@ use utils::pausable_failpoint;
|
||||
|
||||
use crate::control_file::CONTROL_FILE_NAME;
|
||||
use crate::state::{EvictionState, TimelinePersistentState};
|
||||
use crate::timeline::{Timeline, WalResidentTimeline};
|
||||
use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
|
||||
use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
|
||||
use crate::wal_storage::open_wal_file;
|
||||
use crate::{GlobalTimelines, debug_dump, wal_backup};
|
||||
@@ -395,7 +396,7 @@ pub async fn handle_request(
|
||||
sk_auth_token: Option<SecretString>,
|
||||
ssl_ca_certs: Vec<Certificate>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
) -> Result<PullTimelineResponse, ApiError> {
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
@@ -411,7 +412,9 @@ pub async fn handle_request(
|
||||
for ssl_ca_cert in ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
let http_client = http_client
|
||||
.build()
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
let http_hosts = request.http_hosts.clone();
|
||||
|
||||
@@ -443,10 +446,10 @@ pub async fn handle_request(
|
||||
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
|
||||
let min_required_successful = (http_hosts.len() - 1).max(1);
|
||||
if statuses.len() < min_required_successful {
|
||||
bail!(
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"only got {} successful status responses. required: {min_required_successful}",
|
||||
statuses.len()
|
||||
)
|
||||
)));
|
||||
}
|
||||
|
||||
// Find the most advanced safekeeper
|
||||
@@ -465,7 +468,7 @@ pub async fn handle_request(
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
pull_timeline(
|
||||
match pull_timeline(
|
||||
status,
|
||||
safekeeper_host,
|
||||
sk_auth_token,
|
||||
@@ -473,6 +476,21 @@ pub async fn handle_request(
|
||||
global_timelines,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => Ok(resp),
|
||||
Err(e) => {
|
||||
match e.downcast_ref::<TimelineError>() {
|
||||
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
|
||||
safekeeper_host: None,
|
||||
}),
|
||||
Some(TimelineError::CreationInProgress(_)) => {
|
||||
// We don't return success here because creation might still fail.
|
||||
Err(ApiError::Conflict("Creation in progress".to_owned()))
|
||||
}
|
||||
_ => Err(ApiError::InternalServerError(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn pull_timeline(
|
||||
|
||||
@@ -98,6 +98,23 @@ impl SafekeeperClient {
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn switch_timeline_membership(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &models::TimelineMembershipSwitchRequest,
|
||||
) -> Result<models::TimelineMembershipSwitchResponse> {
|
||||
measured_request!(
|
||||
"switch_timeline_membership",
|
||||
crate::metrics::Method::Put,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.switch_timeline_membership(tenant_id, timeline_id, req)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -3886,10 +3886,10 @@ impl Service {
|
||||
|
||||
None
|
||||
} else if safekeepers {
|
||||
// Note that we do not support creating the timeline on the safekeepers
|
||||
// for imported timelines. The `start_lsn` of the timeline is not known
|
||||
// until the import finshes.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
// Note that for imported timelines, we do not create the timeline on the safekeepers
|
||||
// straight away. Instead, we do it once the import finalized such that we know what
|
||||
// start LSN to provide for the safekeepers. This is done in
|
||||
// [`Self::finalize_timeline_import`].
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
|
||||
@@ -3966,11 +3966,22 @@ impl Service {
|
||||
let active = self.timeline_active_on_all_shards(&import).await?;
|
||||
|
||||
match active {
|
||||
true => {
|
||||
Some(timeline_info) => {
|
||||
tracing::info!("Timeline became active on all shards");
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
// Now that we know the start LSN of this timeline, create it on the
|
||||
// safekeepers.
|
||||
self.tenant_timeline_create_safekeepers_until_success(
|
||||
import.tenant_id,
|
||||
timeline_info,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
false => {
|
||||
None => {
|
||||
tracing::info!("Timeline not active on all shards yet");
|
||||
|
||||
tokio::select! {
|
||||
@@ -4004,9 +4015,6 @@ impl Service {
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
|
||||
|
||||
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
|
||||
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
tracing::info!(%import_failed, "Timeline import complete");
|
||||
|
||||
Ok(())
|
||||
@@ -4021,10 +4029,16 @@ impl Service {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// If the timeline is active on all shards, returns the [`TimelineInfo`]
|
||||
/// collected from shard 0.
|
||||
///
|
||||
/// An error is returned if the shard layout has changed during the import.
|
||||
/// This is guarded against within the storage controller and the pageserver,
|
||||
/// and, therefore, unexpected.
|
||||
async fn timeline_active_on_all_shards(
|
||||
self: &Arc<Self>,
|
||||
import: &TimelineImport,
|
||||
) -> anyhow::Result<bool> {
|
||||
) -> anyhow::Result<Option<TimelineInfo>> {
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -4048,13 +4062,17 @@ impl Service {
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
} else {
|
||||
return Ok(false);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
targets
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
anyhow::bail!("No shards found to finalize import for");
|
||||
}
|
||||
|
||||
let results = self
|
||||
.tenant_for_shards_api(
|
||||
targets,
|
||||
@@ -4070,10 +4088,17 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(results.into_iter().all(|res| match res {
|
||||
let all_active = results.iter().all(|res| match res {
|
||||
Ok(info) => info.state == TimelineState::Active,
|
||||
Err(_) => false,
|
||||
}))
|
||||
});
|
||||
|
||||
if all_active {
|
||||
// Both unwraps are validated above
|
||||
Ok(Some(results.into_iter().next().unwrap().unwrap()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
@@ -5181,7 +5206,8 @@ impl Service {
|
||||
}
|
||||
|
||||
// We don't expect any new_shard_count shards to exist here, but drop them just in case
|
||||
tenants.retain(|_id, s| s.shard.count != *new_shard_count);
|
||||
tenants
|
||||
.retain(|id, s| !(id.tenant_id == *tenant_id && s.shard.count == *new_shard_count));
|
||||
|
||||
detach_locations
|
||||
};
|
||||
@@ -8484,7 +8510,7 @@ impl Service {
|
||||
// By default, live migrations are generous about the wait time for getting
|
||||
// the secondary location up to speed. When draining, give up earlier in order
|
||||
// to not stall the operation when a cold secondary is encountered.
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
@@ -8817,7 +8843,7 @@ impl Service {
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
|
||||
@@ -323,6 +323,42 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: TimelineInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
const BACKOFF: Duration = Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
}
|
||||
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(_) => {
|
||||
tracing::info!("Timeline created on safekeepers");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to create timeline on safekeepers: {err}");
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
},
|
||||
_ = tokio::time::sleep(BACKOFF) => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Directly insert the timeline into the database without reconciling it with safekeepers.
|
||||
///
|
||||
/// Useful if the timeline already exists on the specified safekeepers,
|
||||
|
||||
@@ -165,16 +165,17 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
.head_object(&path, &CancellationToken::new())
|
||||
.await;
|
||||
|
||||
if response.is_err() {
|
||||
if let Err(e) = response {
|
||||
// Object is not present.
|
||||
let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
|
||||
|
||||
let msg = format!(
|
||||
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
|
||||
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {}) with error: {}",
|
||||
layer,
|
||||
metadata.generation.get_suffix(),
|
||||
metadata.shard,
|
||||
is_l0,
|
||||
e,
|
||||
);
|
||||
|
||||
if is_l0 || ignore_error {
|
||||
|
||||
@@ -137,11 +137,10 @@ struct TenantRefAccumulator {
|
||||
impl TenantRefAccumulator {
|
||||
fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
|
||||
let this_shard_idx = ttid.tenant_shard_id.to_index();
|
||||
(*self
|
||||
.shards_seen
|
||||
self.shards_seen
|
||||
.entry(ttid.tenant_shard_id.tenant_id)
|
||||
.or_default())
|
||||
.insert(this_shard_idx);
|
||||
.or_default()
|
||||
.insert(this_shard_idx);
|
||||
|
||||
let mut ancestor_refs = Vec::new();
|
||||
for (layer_name, layer_metadata) in &index_part.layer_metadata {
|
||||
@@ -767,10 +766,13 @@ pub async fn pageserver_physical_gc(
|
||||
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
|
||||
);
|
||||
Ok(try_stream! {
|
||||
let mut cnt = 0;
|
||||
while let Some(ttid_res) = timelines.next().await {
|
||||
let ttid = ttid_res?;
|
||||
cnt += 1;
|
||||
yield (ttid, tenant_manifest_arc.clone());
|
||||
}
|
||||
tracing::info!(%tenant_shard_id, "Found {} timelines", cnt);
|
||||
})
|
||||
}
|
||||
});
|
||||
@@ -790,6 +792,7 @@ pub async fn pageserver_physical_gc(
|
||||
&accumulator,
|
||||
tenant_manifest_arc,
|
||||
)
|
||||
.instrument(info_span!("gc_timeline", %ttid))
|
||||
});
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let mut timelines = std::pin::pin!(timelines);
|
||||
|
||||
@@ -153,7 +153,10 @@ pub async fn scan_pageserver_metadata(
|
||||
const CONCURRENCY: usize = 32;
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
|
||||
let timelines = tenants.map_ok(|t| {
|
||||
tracing::info!("Found tenant: {}", t);
|
||||
stream_tenant_timelines(&remote_client, &target, t)
|
||||
});
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ pub struct SnapshotDownloader {
|
||||
remote_client: GenericRemoteStorage,
|
||||
#[allow(dead_code)]
|
||||
target: RootTarget,
|
||||
bucket_config: BucketConfig,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
@@ -43,7 +42,6 @@ impl SnapshotDownloader {
|
||||
Ok(Self {
|
||||
remote_client,
|
||||
target,
|
||||
bucket_config,
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
@@ -218,11 +216,9 @@ impl SnapshotDownloader {
|
||||
}
|
||||
|
||||
pub async fn download(&self) -> anyhow::Result<()> {
|
||||
let (remote_client, target) =
|
||||
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
|
||||
// Generate a stream of TenantShardId
|
||||
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
|
||||
let shards =
|
||||
stream_tenant_shards(&self.remote_client, &self.target, self.tenant_id).await?;
|
||||
let shards: Vec<TenantShardId> = shards.try_collect().await?;
|
||||
|
||||
// Only read from shards that have the highest count: avoids redundantly downloading
|
||||
@@ -240,7 +236,8 @@ impl SnapshotDownloader {
|
||||
|
||||
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
|
||||
let timelines =
|
||||
stream_tenant_timelines(&self.remote_client, &self.target, shard).await?;
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn load_timeline_index(
|
||||
@@ -251,8 +248,8 @@ impl SnapshotDownloader {
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines =
|
||||
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
|
||||
let timelines = timelines
|
||||
.map_ok(|ttid| load_timeline_index(&self.remote_client, &self.target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
|
||||
|
||||
while let Some(i) = timelines.next().await {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import urllib.parse
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
import requests
|
||||
@@ -9,11 +10,23 @@ from requests.auth import AuthBase
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from requests import PreparedRequest
|
||||
|
||||
|
||||
COMPUTE_AUDIENCE = "compute"
|
||||
"""
|
||||
The value to place in the `aud` claim.
|
||||
"""
|
||||
|
||||
|
||||
@final
|
||||
class ComputeClaimsScope(StrEnum):
|
||||
ADMIN = "admin"
|
||||
|
||||
|
||||
@final
|
||||
class BearerAuth(AuthBase):
|
||||
"""
|
||||
@@ -50,6 +63,35 @@ class EndpointHttpClient(requests.Session):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def prewarm_lfc_status(self) -> dict[str, str]:
|
||||
res = self.get(f"http://localhost:{self.external_port}/lfc/prewarm")
|
||||
res.raise_for_status()
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def prewarm_lfc(self):
|
||||
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
|
||||
|
||||
def prewarmed():
|
||||
json = self.prewarm_lfc_status()
|
||||
status, err = json["status"], json.get("error")
|
||||
assert status == "completed", f"{status}, error {err}"
|
||||
|
||||
wait_until(prewarmed)
|
||||
|
||||
def offload_lfc(self):
|
||||
url = f"http://localhost:{self.external_port}/lfc/offload"
|
||||
self.post(url).raise_for_status()
|
||||
|
||||
def offloaded():
|
||||
res = self.get(url)
|
||||
res.raise_for_status()
|
||||
json = res.json()
|
||||
status, err = json["status"], json.get("error")
|
||||
assert status == "completed", f"{status}, error {err}"
|
||||
|
||||
wait_until(offloaded)
|
||||
|
||||
def database_schema(self, database: str):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",
|
||||
|
||||
@@ -21,6 +21,7 @@ if TYPE_CHECKING:
|
||||
Any,
|
||||
)
|
||||
|
||||
from fixtures.endpoint.http import ComputeClaimsScope
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@@ -535,12 +536,16 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def endpoint_generate_jwt(self, endpoint_id: str) -> str:
|
||||
def endpoint_generate_jwt(
|
||||
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
"""
|
||||
args = ["endpoint", "generate-jwt", endpoint_id]
|
||||
if scope:
|
||||
args += ["--scope", str(scope)]
|
||||
|
||||
cmd = self.raw_cli(args)
|
||||
cmd.check_returncode()
|
||||
@@ -552,7 +557,7 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
endpoint_id: str,
|
||||
safekeepers_generation: int | None = None,
|
||||
safekeepers: list[int] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
allow_multiple: bool = False,
|
||||
create_test_user: bool = False,
|
||||
@@ -567,8 +572,8 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
extra_env_vars = env or {}
|
||||
if basebackup_request_tries is not None:
|
||||
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
if remote_ext_base_url is not None:
|
||||
args.extend(["--remote-ext-base-url", remote_ext_base_url])
|
||||
|
||||
if safekeepers_generation is not None:
|
||||
args.extend(["--safekeepers-generation", str(safekeepers_generation)])
|
||||
|
||||
@@ -51,7 +51,7 @@ from fixtures.common_types import (
|
||||
TimelineId,
|
||||
)
|
||||
from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.endpoint.http import ComputeClaimsScope, EndpointHttpClient
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
from fixtures.neon_cli import NeonLocalCli, Pagectl
|
||||
@@ -1185,7 +1185,9 @@ class NeonEnv:
|
||||
"broker": {},
|
||||
"safekeepers": [],
|
||||
"pageservers": [],
|
||||
"endpoint_storage": {"port": self.port_distributor.get_port()},
|
||||
"endpoint_storage": {
|
||||
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
|
||||
},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
}
|
||||
|
||||
@@ -1297,13 +1299,6 @@ class NeonEnv:
|
||||
for key, value in override.items():
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
|
||||
if not config.test_may_use_compatibility_snapshot_binaries:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
else:
|
||||
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
|
||||
|
||||
if self.pageserver_wal_receiver_protocol is not None:
|
||||
key, value = PageserverWalReceiverProtocol.to_config_key_value(
|
||||
self.pageserver_wal_receiver_protocol
|
||||
@@ -1407,30 +1402,6 @@ class NeonEnv:
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
# Last step: register safekeepers at the storage controller
|
||||
if (
|
||||
self.storage_controller_config is not None
|
||||
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
|
||||
):
|
||||
for sk_id, sk in enumerate(self.safekeepers):
|
||||
# 0 is an invalid safekeeper id
|
||||
sk_id = sk_id + 1
|
||||
body = {
|
||||
"id": sk_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "127.0.0.1",
|
||||
"port": sk.port.pg,
|
||||
"http_port": sk.port.http,
|
||||
"https_port": None,
|
||||
"version": 5957,
|
||||
"availability_zone_id": f"us-east-2b-{sk_id}",
|
||||
}
|
||||
|
||||
self.storage_controller.on_safekeeper_deploy(sk_id, body)
|
||||
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
|
||||
|
||||
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
|
||||
|
||||
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
|
||||
@@ -3864,7 +3835,7 @@ class NeonAuthBroker:
|
||||
external_http_port: int,
|
||||
auth_backend: NeonAuthBroker.ProxyV1,
|
||||
):
|
||||
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
|
||||
self.domain = "local.neon.build" # resolves to 127.0.0.1
|
||||
self.host = "127.0.0.1"
|
||||
self.http_port = http_port
|
||||
self.external_http_port = external_http_port
|
||||
@@ -3881,7 +3852,7 @@ class NeonAuthBroker:
|
||||
# generate key of it doesn't exist
|
||||
crt_path = self.test_output_dir / "proxy.crt"
|
||||
key_path = self.test_output_dir / "proxy.key"
|
||||
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
|
||||
generate_proxy_tls_certs(f"apiauth.{self.domain}", key_path, crt_path)
|
||||
|
||||
args = [
|
||||
str(self.neon_binpath / "proxy"),
|
||||
@@ -3925,10 +3896,10 @@ class NeonAuthBroker:
|
||||
|
||||
log.info(f"Executing http query: {query}")
|
||||
|
||||
connstr = f"postgresql://{user}@{self.domain}/postgres"
|
||||
connstr = f"postgresql://{user}@ep-foo-bar-1234.{self.domain}/postgres"
|
||||
async with httpx.AsyncClient(verify=str(self.test_output_dir / "proxy.crt")) as client:
|
||||
response = await client.post(
|
||||
f"https://{self.domain}:{self.external_http_port}/sql",
|
||||
f"https://apiauth.{self.domain}:{self.external_http_port}/sql",
|
||||
json={"query": query, "params": args},
|
||||
headers={
|
||||
"Neon-Connection-String": connstr,
|
||||
@@ -4218,13 +4189,13 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
self.config(config_lines)
|
||||
|
||||
self.__jwt = self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id)
|
||||
self.__jwt = self.generate_jwt()
|
||||
|
||||
return self
|
||||
|
||||
def start(
|
||||
self,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
safekeeper_generation: int | None = None,
|
||||
safekeepers: list[int] | None = None,
|
||||
@@ -4250,7 +4221,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.endpoint_id,
|
||||
safekeepers_generation=safekeeper_generation,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
create_test_user=create_test_user,
|
||||
@@ -4265,6 +4236,14 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
return self
|
||||
|
||||
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
"""
|
||||
assert self.endpoint_id is not None
|
||||
return self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id, scope)
|
||||
|
||||
def endpoint_path(self) -> Path:
|
||||
"""Path to endpoint directory"""
|
||||
assert self.endpoint_id
|
||||
@@ -4457,7 +4436,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
hot_standby: bool = False,
|
||||
lsn: Lsn | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
allow_multiple: bool = False,
|
||||
basebackup_request_tries: int | None = None,
|
||||
@@ -4476,7 +4455,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
).start(
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
@@ -4560,7 +4539,7 @@ class EndpointFactory:
|
||||
lsn: Lsn | None = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: list[str] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
basebackup_request_tries: int | None = None,
|
||||
) -> Endpoint:
|
||||
@@ -4580,7 +4559,7 @@ class EndpointFactory:
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
@@ -4634,7 +4613,10 @@ class EndpointFactory:
|
||||
return self
|
||||
|
||||
def new_replica(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
|
||||
self,
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
@@ -4650,7 +4632,10 @@ class EndpointFactory:
|
||||
)
|
||||
|
||||
def new_replica_start(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
|
||||
self,
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
@@ -5467,6 +5452,13 @@ def wait_for_last_flush_lsn(
|
||||
|
||||
if last_flush_lsn is None:
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
# The last_flush_lsn may not correspond to a record boundary.
|
||||
# For example, if the compute flushed WAL on a page boundary,
|
||||
# the remaining part of the record might not be flushed for a long time.
|
||||
# This would prevent the pageserver from reaching last_flush_lsn promptly.
|
||||
# To ensure the rest of the record reaches the pageserver quickly,
|
||||
# we forcibly flush the WAL by using CHECKPOINT.
|
||||
endpoint.safe_psql("CHECKPOINT")
|
||||
|
||||
results = []
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
|
||||
@@ -122,6 +122,10 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
".*Call to node.*management API.*failed.*Timeout.*",
|
||||
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
|
||||
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
|
||||
# Many tests will take safekeepers offline
|
||||
".*Call to safekeeper.*management API.*failed.*receive body.*",
|
||||
".*Call to safekeeper.*management API.*failed.*ReceiveBody.*",
|
||||
".*Call to safekeeper.*management API.*failed.*Timeout.*",
|
||||
# Many tests will start up with a node offline
|
||||
".*startup_reconcile: Could not scan node.*",
|
||||
# Tests run in dev mode
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import math # Add this import
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
@@ -87,7 +88,10 @@ def test_cumulative_statistics_persistence(
|
||||
- insert additional tuples that by itself are not enough to trigger auto-vacuum but in combination with the previous tuples are
|
||||
- verify that autovacuum is triggered by the combination of tuples inserted before and after endpoint suspension
|
||||
"""
|
||||
project = neon_api.create_project(pg_version)
|
||||
project = neon_api.create_project(
|
||||
pg_version,
|
||||
f"Test cumulative statistics persistence, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
|
||||
)
|
||||
project_id = project["project"]["id"]
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
endpoint_id = project["endpoints"][0]["id"]
|
||||
|
||||
@@ -62,7 +62,9 @@ def test_ro_replica_lag(
|
||||
|
||||
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project = neon_api.create_project(
|
||||
pg_version, f"Test readonly replica lag, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
@@ -195,7 +197,9 @@ def test_replication_start_stop(
|
||||
pgbench_duration = f"-T{2**num_replicas * configuration_test_time_sec}"
|
||||
error_occurred = False
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project = neon_api.create_project(
|
||||
pg_version, f"Test replication start stop, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
|
||||
@@ -206,7 +206,7 @@ class NeonProject:
|
||||
self.neon_api = neon_api
|
||||
self.pg_bin = pg_bin
|
||||
proj = self.neon_api.create_project(
|
||||
pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}"
|
||||
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
self.id: str = proj["project"]["id"]
|
||||
self.name: str = proj["project"]["name"]
|
||||
|
||||
@@ -202,6 +202,8 @@ def test_pageserver_gc_compaction_preempt(
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
|
||||
env.pageserver.allowed_errors.append(".*flush task cancelled.*")
|
||||
env.pageserver.allowed_errors.append(".*failed to pipe.*")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -544,3 +544,69 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
|
||||
)
|
||||
role = cursor.fetchone()
|
||||
assert role is None
|
||||
|
||||
|
||||
def test_db_with_custom_settings(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can work with databases that have some custom settings.
|
||||
For example, role=some_other_role, default_transaction_read_only=on,
|
||||
search_path=non_public_schema, statement_timeout=1 (1ms).
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
TEST_ROLE = "some_other_role"
|
||||
TEST_DB = "db_with_custom_settings"
|
||||
TEST_SCHEMA = "non_public_schema"
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": [
|
||||
{
|
||||
"name": TEST_DB,
|
||||
"owner": TEST_ROLE,
|
||||
}
|
||||
],
|
||||
"roles": [
|
||||
{
|
||||
"name": TEST_ROLE,
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
endpoint.reconfigure()
|
||||
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
cursor.execute(f"CREATE SCHEMA {TEST_SCHEMA}")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET role = {TEST_ROLE}")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET default_transaction_read_only = on")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET search_path = {TEST_SCHEMA}")
|
||||
cursor.execute(f"ALTER DATABASE {TEST_DB} SET statement_timeout = 1")
|
||||
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
cursor.execute("SELECT current_role")
|
||||
role = cursor.fetchone()
|
||||
assert role is not None
|
||||
assert role[0] == TEST_ROLE
|
||||
|
||||
cursor.execute("SHOW default_transaction_read_only")
|
||||
default_transaction_read_only = cursor.fetchone()
|
||||
assert default_transaction_read_only is not None
|
||||
assert default_transaction_read_only[0] == "on"
|
||||
|
||||
cursor.execute("SHOW search_path")
|
||||
search_path = cursor.fetchone()
|
||||
assert search_path is not None
|
||||
assert search_path[0] == TEST_SCHEMA
|
||||
|
||||
# Do not check statement_timeout, because we force it to 2min
|
||||
# in `endpoint.cursor()` fixture.
|
||||
|
||||
endpoint.reconfigure()
|
||||
|
||||
78
test_runner/regress/test_compute_http.py
Normal file
78
test_runner/regress/test_compute_http.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from http.client import FORBIDDEN, UNAUTHORIZED
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import jwt
|
||||
import pytest
|
||||
from fixtures.endpoint.http import COMPUTE_AUDIENCE, ComputeClaimsScope, EndpointHttpClient
|
||||
from fixtures.utils import run_only_on_default_postgres
|
||||
from requests import RequestException
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
|
||||
def test_compute_no_scope_claim(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that if the JWT scope is not admin and no compute_id is specified,
|
||||
the external HTTP server returns a 403 Forbidden error.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# Encode nothing in the token
|
||||
token = jwt.encode({}, env.auth_keys.priv, algorithm="EdDSA")
|
||||
|
||||
# Create an admin-scoped HTTP client
|
||||
client = EndpointHttpClient(
|
||||
external_port=endpoint.external_http_port,
|
||||
internal_port=endpoint.internal_http_port,
|
||||
jwt=token,
|
||||
)
|
||||
|
||||
try:
|
||||
client.status()
|
||||
pytest.fail("Exception should have been raised")
|
||||
except RequestException as e:
|
||||
assert e.response is not None
|
||||
assert e.response.status_code == FORBIDDEN
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"audience",
|
||||
(COMPUTE_AUDIENCE, "invalid", None),
|
||||
ids=["with_audience", "with_invalid_audience", "without_audience"],
|
||||
)
|
||||
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
|
||||
def test_compute_admin_scope_claim(neon_simple_env: NeonEnv, audience: str | None):
|
||||
"""
|
||||
Test that an admin-scoped JWT can access the compute's external HTTP server
|
||||
without the compute_id being specified in the claims.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
data: dict[str, str | list[str]] = {"scope": str(ComputeClaimsScope.ADMIN)}
|
||||
if audience:
|
||||
data["aud"] = [audience]
|
||||
|
||||
token = jwt.encode(data, env.auth_keys.priv, algorithm="EdDSA")
|
||||
|
||||
# Create an admin-scoped HTTP client
|
||||
client = EndpointHttpClient(
|
||||
external_port=endpoint.external_http_port,
|
||||
internal_port=endpoint.internal_http_port,
|
||||
jwt=token,
|
||||
)
|
||||
|
||||
try:
|
||||
client.status()
|
||||
if audience != COMPUTE_AUDIENCE:
|
||||
pytest.fail("Exception should have been raised")
|
||||
except RequestException as e:
|
||||
assert e.response is not None
|
||||
assert e.response.status_code == UNAUTHORIZED
|
||||
@@ -221,7 +221,7 @@ def test_remote_extensions(
|
||||
|
||||
endpoint.create_remote_extension_spec(spec)
|
||||
|
||||
endpoint.start(remote_ext_config=extensions_endpoint)
|
||||
endpoint.start(remote_ext_base_url=extensions_endpoint)
|
||||
|
||||
with endpoint.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -249,7 +249,7 @@ def test_remote_extensions(
|
||||
# Remove the extension files to force a redownload of the extension.
|
||||
extension.remove(test_output_dir, pg_version)
|
||||
|
||||
endpoint.start(remote_ext_config=extensions_endpoint)
|
||||
endpoint.start(remote_ext_base_url=extensions_endpoint)
|
||||
|
||||
# Test that ALTER EXTENSION UPDATE statements also fetch remote extensions.
|
||||
with endpoint.connect() as conn:
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user