Compare commits

..

10 Commits

Author SHA1 Message Date
Alex Chi Z
ab7e5fbf95 feat(pageserver): add PostHog config section
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-07 14:34:21 +08:00
Tristan Partin
0ef6851219 Make the audience claim in compute JWTs a vector (#11845)
According to RFC 7519, `aud` is generally an array of StringOrURI, but
in special cases may be a single StringOrURI value. To accomodate future
control plane work where a single token may work for multiple services,
make the claim a vector.

Link: https://www.rfc-editor.org/rfc/rfc7519#section-4.1.3

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-06 22:19:15 +00:00
Mikhail
5c356c63eb endpoint_storage compute_ctl integration (#11550)
Add `/lfc/(prewarm|offload)` routes to `compute_ctl` which interact with
endpoint storage.

Add `prewarm_lfc_on_startup` spec option which, if enabled, downloads
LFC prewarm data on compute startup.

Resolves: https://github.com/neondatabase/cloud/issues/26343
2025-05-06 22:02:12 +00:00
Suhas Thalanki
384e3df2ad fix: pinned anon extension to v2.1.0 (#11844)
## Problem

Currently the setup for `anon` v2 in the compute image downloads the
latest version of the extension. This can be problematic as on a compute
start/restart it can download a version that is newer than what we have
tested and potentially break things, hence not giving us the ability to
control when the extension is updated.

We were also using `v2.2.0`, which is not ready for production yet and
has been clarified by the maintainer.

Additional context:
https://gitlab.com/dalibo/postgresql_anonymizer/-/issues/530

## Summary of changes

Changed the URL from which we download the `anon` extension to point to
`v2.1.0` instead of `latest`.
2025-05-06 21:52:15 +00:00
Tristan Partin
f9b3a2e059 Add scoping to compute_ctl JWT claims (#11639)
Currently we only have an admin scope which allows a user to bypass the
compute_id check. When the admin scope is provided, validate the
audience of the JWT to be "compute".

Closes: https://github.com/neondatabase/cloud/issues/27614

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-06 19:51:10 +00:00
Jakub Kołodziejczak
79ee78ea32 feat(compute): enable audit logs for pg_session_jwt extension (#11829)
related to https://github.com/neondatabase/cloud/issues/28480
related to https://github.com/neondatabase/pg_session_jwt/pull/36

cc @MihaiBojin @conradludgate @lneves12
2025-05-06 15:18:50 +00:00
Erik Grinaker
0e0ad073bf storcon: fix split aborts removing other tenants (#11837)
## Problem

When aborting a split, the code accidentally removes all other tenant
shards from the in-memory map that have the same shard count as the
aborted split, causing "tenant not found" errors. It will recover on a
storcon restart, when it loads the persisted state. This issue has been
present for at least a year.

Resolves https://github.com/neondatabase/cloud/issues/28589.

## Summary of changes

Only remove shards belonging to the relevant tenant when aborting a
split.

Also adds a regression test.
2025-05-06 13:57:34 +00:00
Alex Chi Z.
6827f2f58c fix(pageserver): only keep iter_with_options API, improve docs in gc-compact (#11804)
## Problem

Address comments in https://github.com/neondatabase/neon/pull/11709

## Summary of changes

- remove `iter` API, users always need to specify buffer size depending
on the expected memory usage.
- several doc improvements

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2025-05-06 12:27:16 +00:00
Peter Bendel
c82e363ed9 cleanup orphan projects created by python tests, too (#11836)
## Problem

- some projects are created during GitHub workflows but not by action
project_create but by python test scripts.

If the python test fails the project is not deleted

## Summary of changes

- make sure we cleanup those python created projects a few days after
they are no longer used, too
2025-05-06 12:26:13 +00:00
Alexander Bayandin
50dc2fae77 compute-node.Dockerfile: remove layer with duplicated name (#11807)
## Problem

Two `rust-extensions-build-pgrx14` layers were added independently in
two different PRs, and the layers are exactly the same

## Summary of changes
- Remove one of `rust-extensions-build-pgrx14` layers
2025-05-06 10:52:21 +00:00
54 changed files with 1077 additions and 594 deletions

View File

@@ -64,7 +64,7 @@ jobs:
env:
ORG_ID: org-solitary-dew-09443886
LIMIT: 100
SEARCH: "Created by actions/neon-project-create; GITHUB_RUN_ID"
SEARCH: "GITHUB_RUN_ID="
BASE_URL: https://console-stage.neon.build/api/v2
DRY_RUN: "false" # Set to "true" to just test out the workflow

2
Cargo.lock generated
View File

@@ -1284,6 +1284,7 @@ name = "compute_tools"
version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
@@ -1420,6 +1421,7 @@ dependencies = [
"clap",
"comfy-table",
"compute_api",
"endpoint_storage",
"futures",
"http-utils",
"humantime",

View File

@@ -243,6 +243,7 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }

View File

@@ -1084,23 +1084,12 @@ RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
ARG PG_VERSION
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
# Version 14 is now required by a few
# Version 14 is now required by a few
# This layer should be used as a base for new pgrx extensions,
# and eventually get merged with `rust-extensions-build`
#
@@ -1333,8 +1322,8 @@ ARG PG_VERSION
# Do not update without approve from proxy team
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
WORKDIR /ext-src
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.1.tar.gz -O pg_session_jwt.tar.gz && \
echo "62fec9e472cb805c53ba24a0765afdb8ea2720cfc03ae7813e61687b36d1b0ad pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
@@ -1362,7 +1351,8 @@ COPY compute/patches/anon_v2.patch .
# This is an experimental extension, never got to real production.
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/latest/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/2.1.0/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
echo "48e7f5ae2f1ca516df3da86c5c739d48dd780a4e885705704ccaad0faa89d6c0 pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt && \
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "=0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \

View File

@@ -10,6 +10,7 @@ default = []
testing = ["fail/failpoints"]
[dependencies]
async-compression.workspace = true
base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true

View File

@@ -1,17 +1,10 @@
use std::collections::HashMap;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
};
@@ -25,6 +18,16 @@ use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::spawn;
use tracing::{Instrument, debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
@@ -150,6 +153,9 @@ pub struct ComputeState {
/// set up the span relationship ourselves.
pub startup_span: Option<tracing::span::Span>,
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_offload_state: LfcOffloadState,
pub metrics: ComputeMetrics,
}
@@ -163,6 +169,8 @@ impl ComputeState {
pspec: None,
startup_span: None,
metrics: ComputeMetrics::default(),
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
}
}
@@ -198,6 +206,8 @@ pub struct ParsedSpec {
pub pageserver_connstr: String,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
pub endpoint_storage_addr: Option<SocketAddr>,
pub endpoint_storage_token: Option<String>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
@@ -251,6 +261,18 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.or(Err("invalid timeline id"))?
};
let endpoint_storage_addr: Option<SocketAddr> = spec
.endpoint_storage_addr
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
.unwrap_or_default()
.parse()
.ok();
let endpoint_storage_token = spec
.endpoint_storage_token
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
Ok(ParsedSpec {
spec,
pageserver_connstr,
@@ -258,6 +280,8 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
storage_auth_token,
tenant_id,
timeline_id,
endpoint_storage_addr,
endpoint_storage_token,
})
}
}
@@ -736,6 +760,9 @@ impl ComputeNode {
// Log metrics so that we can search for slow operations in logs
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
if pspec.spec.prewarm_lfc_on_startup {
self.prewarm_lfc();
}
Ok(())
}

View File

@@ -0,0 +1,202 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
use compute_api::responses::LfcOffloadState;
use compute_api::responses::LfcPrewarmState;
use http::StatusCode;
use reqwest::Client;
use std::sync::Arc;
use tokio::{io::AsyncReadExt, spawn};
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
pub struct LfcPrewarmStateWithProgress {
#[serde(flatten)]
base: LfcPrewarmState,
total: i32,
prewarmed: i32,
skipped: i32,
}
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
struct EndpointStoragePair {
url: String,
token: String,
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
};
let tenant_id = pspec.tenant_id;
let timeline_id = pspec.timeline_id;
let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
let Some(ref token) = pspec.endpoint_storage_token else {
bail!("pspec.endpoint_storage_token missing")
};
let token = token.clone();
Ok(EndpointStoragePair { url, token })
}
}
impl ComputeNode {
// If prewarm failed, we want to get overall number of segments as well as done ones.
// However, this function should be reliable even if querying postgres failed.
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
info!("requesting LFC prewarm state from postgres");
let mut state = LfcPrewarmStateWithProgress::default();
{
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
}
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
Ok(client) => client,
Err(err) => {
error!(%err, "connecting to postgres");
return state;
}
};
let row = match client
.query_one("select * from get_prewarm_info()", &[])
.await
{
Ok(row) => row,
Err(err) => {
error!(%err, "querying LFC prewarm status");
return state;
}
};
state.total = row.try_get(0).unwrap_or_default();
state.prewarmed = row.try_get(1).unwrap_or_default();
state.skipped = row.try_get(2).unwrap_or_default();
state
}
pub fn lfc_offload_state(&self) -> LfcOffloadState {
self.state.lock().unwrap().lfc_offload_state.clone()
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming =
std::mem::replace(state, LfcPrewarmState::Prewarming)
{
return false;
}
}
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl().await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
error: err.to_string(),
};
});
true
}
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
}
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
let status = res.status();
if status != StatusCode::OK {
bail!("{status} querying endpoint storage")
}
let mut uncompressed = Vec::new();
let lfc_state = res
.bytes()
.await
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())
}
/// Returns false if there is an offload request ongoing, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if let LfcOffloadState::Offloading =
std::mem::replace(state, LfcOffloadState::Offloading)
{
return false;
}
}
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.offload_lfc_impl().await else {
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
};
});
true
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
Err(err) => Err(err).context("writing to endpoint storage"),
}
}
}

View File

@@ -223,6 +223,9 @@ pub fn write_postgres_conf(
// TODO: tune this after performance testing
writeln!(file, "pgaudit.log_rotation_age=5")?;
// Enable audit logs for pg_session_jwt extension
writeln!(file, "pg_session_jwt.audit_log=on")?;
// Add audit shared_preload_libraries, if they are not present.
//
// The caller who sets the flag is responsible for ensuring that the necessary

View File

@@ -1,12 +1,10 @@
use std::collections::HashSet;
use anyhow::{Result, anyhow};
use axum::{RequestExt, body::Body};
use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
};
use compute_api::requests::ComputeClaims;
use compute_api::requests::{COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope};
use futures::future::BoxFuture;
use http::{Request, Response, StatusCode};
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
@@ -25,13 +23,14 @@ pub(in crate::http) struct Authorize {
impl Authorize {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// Nothing is currently required
validation.required_spec_claims = HashSet::new();
validation.validate_exp = true;
// Unused by the control plane
validation.validate_aud = false;
// Unused by the control plane
validation.validate_nbf = false;
// Unused by the control plane
validation.validate_aud = false;
validation.set_audience(&[COMPUTE_AUDIENCE]);
// Nothing is currently required
validation.set_required_spec_claims(&[] as &[&str; 0]);
Self {
compute_id,
@@ -64,11 +63,47 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
};
if data.claims.compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid compute ID in authorization token claims",
));
match data.claims.scope {
// TODO: We should validate audience for every token, but
// instead of this ad-hoc validation, we should turn
// [`Validation::validate_aud`] on. This is merely a stopgap
// while we roll out `aud` deployment. We return a 401
// Unauthorized because when we eventually do use
// [`Validation`], we will hit the above `Err` match arm which
// returns 401 Unauthorized.
Some(ComputeClaimsScope::Admin) => {
let Some(ref audience) = data.claims.audience else {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"missing audience in authorization token claims",
));
};
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid audience in authorization token claims",
));
}
}
// If the scope is not [`ComputeClaimsScope::Admin`], then we
// must validate the compute_id
_ => {
let Some(ref claimed_compute_id) = data.claims.compute_id else {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"missing compute_id in authorization token claims",
));
};
if *claimed_compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"invalid compute ID in authorization token claims",
));
}
}
}
// Make claims available to any subsequent middleware or request

View File

@@ -0,0 +1,39 @@
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
Json(compute.lfc_prewarm_state().await)
}
// Following functions are marked async for axum, as it's more convenient than wrapping these
// in async lambdas at call site
pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadState> {
Json(compute.lfc_offload_state())
}
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(
StatusCode::TOO_MANY_REQUESTS,
"Multiple requests for prewarm are not allowed",
)
}
}
pub(in crate::http) async fn offload(compute: Compute) -> Response {
if compute.offload_lfc() {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(
StatusCode::TOO_MANY_REQUESTS,
"Multiple requests for prewarm offload are not allowed",
)
}
}

View File

@@ -11,6 +11,7 @@ pub(in crate::http) mod extensions;
pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod status;

View File

@@ -23,7 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, metrics, metrics_json, status, terminate,
grants, insights, lfc, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -85,6 +85,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))

View File

@@ -11,6 +11,7 @@ pub mod http;
pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;

View File

@@ -1,7 +1,7 @@
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
IntCounter, IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -97,6 +97,24 @@ pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::
.expect("failed to define a metric")
});
/// Needed as neon.file_cache_prewarm_batch == 0 doesn't mean we never tried to prewarm.
/// On the other hand, LFC_PREWARMED_PAGES is excessive as we can GET /lfc/prewarm
pub(crate) static LFC_PREWARM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarm_requests_total",
"Total number of LFC prewarm requests made by compute_ctl",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOAD_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offload_requests_total",
"Total number of LFC offload requests made by compute_ctl",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -106,5 +124,7 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics.extend(LFC_PREWARM_REQUESTS.collect());
metrics.extend(LFC_OFFLOAD_REQUESTS.collect());
metrics
}

View File

@@ -30,6 +30,7 @@ mod pg_helpers_tests {
r#"fsync = off
wal_level = logical
hot_standby = on
prewarm_lfc_on_startup = off
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
wal_log_hints = on
log_connections = on

View File

@@ -41,7 +41,7 @@ storage_broker.workspace = true
http-utils.workspace = true
utils.workspace = true
whoami.workspace = true
endpoint_storage.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true
tracing.workspace = true

View File

@@ -16,10 +16,11 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
@@ -705,6 +706,9 @@ struct EndpointStopCmdArgs {
struct EndpointGenerateJwtCmdArgs {
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
scope: Option<ComputeClaimsScope>,
}
#[derive(clap::Subcommand)]
@@ -1018,7 +1022,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
})
.collect(),
endpoint_storage: EndpointStorageConf {
port: ENDPOINT_STORAGE_DEFAULT_PORT,
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
@@ -1484,10 +1488,25 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
None
};
let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
+ Duration::from_secs(86400))
.as_secs();
let claims = endpoint_storage::claims::EndpointStorageClaims {
tenant_id: endpoint.tenant_id,
timeline_id: endpoint.timeline_id,
endpoint_id: endpoint_id.to_string(),
exp,
};
let endpoint_storage_token = env.generate_auth_token(&claims)?;
let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
println!("Starting existing endpoint {endpoint_id}...");
endpoint
.start(
&auth_token,
endpoint_storage_token,
endpoint_storage_addr,
safekeepers_generation,
safekeepers,
pageservers,
@@ -1540,12 +1559,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
endpoint.stop(&args.mode, args.destroy)?;
}
EndpointCmd::GenerateJwt(args) => {
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let jwt = endpoint.generate_jwt()?;
let endpoint = {
let endpoint_id = &args.endpoint_id;
cplane
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
};
let jwt = endpoint.generate_jwt(args.scope)?;
print!("{jwt}");
}

View File

@@ -45,7 +45,9 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::{ComputeClaims, ConfigurationRequest};
use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
};
@@ -630,9 +632,17 @@ impl Endpoint {
}
/// Generate a JWT with the correct claims.
pub fn generate_jwt(&self) -> Result<String> {
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
self.env.generate_auth_token(&ComputeClaims {
compute_id: self.endpoint_id.clone(),
audience: match scope {
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
_ => None,
},
compute_id: match scope {
Some(ComputeClaimsScope::Admin) => None,
_ => Some(self.endpoint_id.clone()),
},
scope,
})
}
@@ -640,6 +650,8 @@ impl Endpoint {
pub async fn start(
&self,
auth_token: &Option<String>,
endpoint_storage_token: String,
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
@@ -733,6 +745,9 @@ impl Endpoint {
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
endpoint_storage_addr: Some(endpoint_storage_addr),
endpoint_storage_token: Some(endpoint_storage_token),
prewarm_lfc_on_startup: false,
};
// this strange code is needed to support respec() in tests
@@ -903,7 +918,7 @@ impl Endpoint {
self.external_http_address.port()
),
)
.bearer_auth(self.generate_jwt()?)
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.send()
.await?;
@@ -980,7 +995,7 @@ impl Endpoint {
self.external_http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.bearer_auth(self.generate_jwt()?)
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.body(
serde_json::to_string(&ConfigurationRequest {
spec,

View File

@@ -3,17 +3,19 @@ use crate::local_env::LocalEnv;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::net::SocketAddr;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub const ENDPOINT_STORAGE_DEFAULT_ADDR: SocketAddr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9993);
pub struct EndpointStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
pub addr: SocketAddr,
}
impl EndpointStorage {
@@ -22,7 +24,7 @@ impl EndpointStorage {
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.endpoint_storage.port,
addr: env.endpoint_storage.listen_addr,
}
}
@@ -31,7 +33,7 @@ impl EndpointStorage {
}
fn listen_addr(&self) -> Utf8PathBuf {
format!("127.0.0.1:{}", self.port).into()
format!("{}:{}", self.addr.ip(), self.addr.port()).into()
}
pub fn init(&self) -> Result<()> {

View File

@@ -20,7 +20,9 @@ use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::broker::StorageBroker;
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::endpoint_storage::{
ENDPOINT_STORAGE_DEFAULT_ADDR, ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage,
};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -151,10 +153,10 @@ pub struct NeonLocalInitConf {
pub generate_local_ssl_certs: bool,
}
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct EndpointStorageConf {
pub port: u16,
pub listen_addr: SocketAddr,
}
/// Broker config for cluster internal communication.
@@ -241,6 +243,14 @@ impl Default for NeonStorageControllerConf {
}
}
impl Default for EndpointStorageConf {
fn default() -> Self {
Self {
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
}
}
}
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {

View File

@@ -12,6 +12,7 @@ ERROR: invalid JWT encoding
-- Test creating a session with an expired JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
ERROR: Token used after it has expired
DETAIL: exp=1742564432
-- Test creating a session with a valid JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
jwt_session_init

View File

@@ -343,7 +343,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = endpoint_storage::Claims {
let claims = endpoint_storage::claims::EndpointStorageClaims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
@@ -489,16 +489,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
#[derive(Serialize)]
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<endpoint_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
let claims = endpoint_storage::claims::DeletePrefixClaims {
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
endpoint_id: parts.get(3).map(ToString::to_string),

View File

@@ -0,0 +1,52 @@
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use utils::id::{EndpointId, TenantId, TimelineId};
/// Claims to add, remove, or retrieve endpoint data. Used by compute_ctl
#[derive(Deserialize, Serialize, PartialEq)]
pub struct EndpointStorageClaims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
/// Claims to remove tenant, timeline, or endpoint data. Used by control plane
#[derive(Deserialize, Serialize, PartialEq)]
pub struct DeletePrefixClaims {
pub tenant_id: TenantId,
/// None when tenant is deleted (endpoint_id is also None in this case)
pub timeline_id: Option<TimelineId>,
/// None when timeline is deleted
pub endpoint_id: Option<EndpointId>,
pub exp: u64,
}
impl Display for EndpointStorageClaims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"EndpointClaims(tenant_id={} timeline_id={} endpoint_id={} exp={})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
impl Display for DeletePrefixClaims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DeletePrefixClaims(tenant_id={} timeline_id={} endpoint_id={}, exp={})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.exp
)
}
}

View File

@@ -1,3 +1,5 @@
pub mod claims;
use crate::claims::{DeletePrefixClaims, EndpointStorageClaims};
use anyhow::Result;
use axum::extract::{FromRequestParts, Path};
use axum::response::{IntoResponse, Response};
@@ -13,7 +15,7 @@ use std::result::Result as StdResult;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use utils::id::{TenantId, TimelineId};
use utils::id::{EndpointId, TenantId, TimelineId};
// simplified version of utils::auth::JwtAuth
pub struct JwtAuth {
@@ -79,26 +81,6 @@ pub struct Storage {
pub max_upload_file_limit: usize,
}
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
#[derive(Deserialize, Serialize, PartialEq)]
pub struct Claims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
impl Display for Claims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
#[derive(Deserialize, Serialize)]
struct KeyRequest {
tenant_id: TenantId,
@@ -107,6 +89,13 @@ struct KeyRequest {
path: String,
}
#[derive(Deserialize, Serialize, PartialEq)]
struct PrefixKeyRequest {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<EndpointId>,
}
#[derive(Debug, PartialEq)]
pub struct S3Path {
pub path: RemotePath,
@@ -165,7 +154,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: Claims = state
let claims: EndpointStorageClaims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
@@ -178,7 +167,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
path.endpoint_id.clone()
};
let route = Claims {
let route = EndpointStorageClaims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id,
@@ -193,38 +182,13 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
}
}
#[derive(Deserialize, Serialize, PartialEq)]
pub struct PrefixKeyPath {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub endpoint_id: Option<EndpointId>,
}
impl Display for PrefixKeyPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string())
)
}
}
#[derive(Debug, PartialEq)]
pub struct PrefixS3Path {
pub path: RemotePath,
}
impl From<&PrefixKeyPath> for PrefixS3Path {
fn from(path: &PrefixKeyPath) -> Self {
impl From<&DeletePrefixClaims> for PrefixS3Path {
fn from(path: &DeletePrefixClaims) -> Self {
let timeline_id = path
.timeline_id
.as_ref()
@@ -250,21 +214,27 @@ impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path) = parts
.extract::<Path<PrefixKeyPath>>()
.extract::<Path<PrefixKeyRequest>>()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: PrefixKeyPath = state
let claims: DeletePrefixClaims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "invalid token"))?;
if path != claims {
return Err(unauthorized(path, claims));
let route = DeletePrefixClaims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id: path.endpoint_id,
exp: claims.exp,
};
if route != claims {
return Err(unauthorized(route, claims));
}
Ok((&path).into())
Ok((&route).into())
}
}
@@ -297,7 +267,7 @@ mod tests {
#[test]
fn s3_path() {
let auth = Claims {
let auth = EndpointStorageClaims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
@@ -327,10 +297,11 @@ mod tests {
#[test]
fn prefix_s3_path() {
let mut path = PrefixKeyPath {
let mut path = DeletePrefixClaims {
tenant_id: TENANT_ID,
timeline_id: None,
endpoint_id: None,
exp: 0,
};
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
assert_eq!(

View File

@@ -1,16 +1,58 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use crate::privilege::Privilege;
use crate::responses::ComputeCtlConfig;
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
/// The value to place in the [`ComputeClaims::audience`] claim.
pub static COMPUTE_AUDIENCE: &str = "compute";
/// Available scopes for a compute's JWT.
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ComputeClaimsScope {
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
/// facilities.
Admin,
}
impl FromStr for ComputeClaimsScope {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"admin" => Ok(ComputeClaimsScope::Admin),
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
}
}
}
/// When making requests to the `compute_ctl` external HTTP server, the client
/// must specify a set of claims in `Authorization` header JWTs such that
/// `compute_ctl` can authorize the request.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename = "snake_case")]
pub struct ComputeClaims {
pub compute_id: String,
/// The compute ID that will validate the token. The only case in which this
/// can be [`None`] is if [`Self::scope`] is
/// [`ComputeClaimsScope::Admin`].
pub compute_id: Option<String>,
/// The scope of what the token authorizes.
pub scope: Option<ComputeClaimsScope>,
/// The recipient the token is intended for.
///
/// See [RFC 7519](https://www.rfc-editor.org/rfc/rfc7519#section-4.1.3) for
/// more information.
///
/// TODO: Remove the [`Option`] wrapper when control plane learns to send
/// the claim.
#[serde(rename = "aud")]
pub audience: Option<Vec<String>>,
}
/// Request of the /configure API

View File

@@ -46,6 +46,30 @@ pub struct ExtensionInstallResponse {
pub version: ExtVersion,
}
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState {
#[default]
NotPrewarmed,
Prewarming,
Completed,
Failed {
error: String,
},
}
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState {
#[default]
NotOffloaded,
Offloading,
Completed,
Failed {
error: String,
},
}
/// Response of the /status API
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]

View File

@@ -172,6 +172,15 @@ pub struct ComputeSpec {
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
pub logs_export_host: Option<String>,
/// Address of endpoint storage service
pub endpoint_storage_addr: Option<String>,
/// JWT for authorizing requests to endpoint storage service
pub endpoint_storage_token: Option<String>,
/// If true, download LFC state from endpoint_storage and pass it to Postgres on startup
#[serde(default)]
pub prewarm_lfc_on_startup: bool,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.

View File

@@ -84,6 +84,11 @@
"value": "on",
"vartype": "bool"
},
{
"name": "prewarm_lfc_on_startup",
"value": "off",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501",

View File

@@ -43,6 +43,21 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
/// PostHog integration config
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
project_id: String,
/// Server-side (private) API key
server_api_key: String,
/// Client-side (public) API key
client_api_key: String,
/// Private API URL
private_api_url: String,
/// Public API URL
public_api_url: String,
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer
@@ -182,6 +197,8 @@ pub struct ConfigToml {
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub posthog_config: Option<PostHogConfig>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -659,6 +676,7 @@ impl Default for ConfigToml {
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
posthog_config: None,
}
}
}

View File

@@ -295,6 +295,9 @@ pub struct TenantId(Id);
id_newtype!(TenantId);
/// If needed, reuse small string from proxy/src/types.rc
pub type EndpointId = String;
// A pair uniquely identifying Neon instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TenantTimelineId {

View File

@@ -14,7 +14,7 @@ use std::time::Duration;
use anyhow::{Context, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pem::Pem;
@@ -230,6 +230,9 @@ pub struct PageServerConf {
/// such as authentication requirements for HTTP and PostgreSQL APIs.
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
/// PostHog integration config
pub posthog_config: Option<PostHogConfig>,
}
/// Token for authentication to safekeepers
@@ -404,6 +407,7 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
posthog_config,
} = config_toml;
let mut conf = PageServerConf {
@@ -513,6 +517,7 @@ impl PageServerConf {
}
None => Vec::new(),
},
posthog_config,
};
// ------------------------------------------------------------

View File

@@ -2469,7 +2469,6 @@ async fn timeline_checkpoint_handler(
.map_err(|e|
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Cancelled => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e),

View File

@@ -3198,7 +3198,6 @@ impl TenantShard {
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
CompactionError::Cancelled => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}

View File

@@ -90,18 +90,10 @@ impl Header {
}
}
#[derive(Debug, thiserror::Error)]
pub enum BlobWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum WriteBlobError {
#[error(transparent)]
Flush(BlobWriterError),
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
@@ -246,16 +238,14 @@ where
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> Result<Self, BlobWriterError> {
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
) -> anyhow::Result<Self> {
Ok(Self {
io_buf: Some(BytesMut::new()),
writer: BufferedWriter::new(
file,
start_offset,
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate_token,
gate.enter()?,
cancel,
ctx,
flush_task_span,
@@ -275,16 +265,13 @@ where
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), BlobWriterError>) {
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
let res = self
.writer
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
// Can remove all the complexity around owned buffers upstack
.write_buffered_borrowed(&src_buf, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})
.map(|len| {
self.offset += len as u64;
});
@@ -431,10 +418,8 @@ where
self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<W, BlobWriterError> {
let (_, file) = self.writer.shutdown(mode, ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})?;
) -> Result<W, FlushTaskError> {
let (_, file) = self.writer.shutdown(mode, ctx).await?;
Ok(file)
}
}
@@ -482,11 +467,8 @@ pub(crate) mod tests {
.await?,
gate.enter()?,
);
let mut wtr = BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test"))
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
let mut wtr =
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
@@ -508,11 +490,7 @@ pub(crate) mod tests {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
.await?;
file.disarm_into_inner()
};
Ok((temp_dir, pathbuf, offsets))

View File

@@ -19,14 +19,6 @@ use crate::context::RequestContext;
use crate::tenant::Timeline;
use crate::tenant::storage_layer::Layer;
#[derive(Debug, thiserror::Error)]
pub enum BatchSplitWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
Discarded(PersistentLayerKey),
@@ -105,7 +97,7 @@ impl BatchLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, BatchSplitWriterError> {
) -> anyhow::Result<Vec<ResidentLayer>> {
let res = self
.finish_with_discard_fn(tline, ctx, |_| async { false })
.await?;
@@ -123,7 +115,7 @@ impl BatchLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -147,14 +139,14 @@ impl BatchLayerWriter {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => writer
.finish(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
LayerWriterWrapper::Image(writer) => writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
};
let layer = match res {
Ok((desc, path)) => {
@@ -163,7 +155,7 @@ impl BatchLayerWriter {
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(BatchSplitWriterError::Other(e));
return Err(e);
}
}
}
@@ -243,7 +235,7 @@ impl<'a> SplitImageLayerWriter<'a> {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), BatchSplitWriterError> {
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -261,8 +253,7 @@ impl<'a> SplitImageLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
@@ -271,10 +262,7 @@ impl<'a> SplitImageLayerWriter<'a> {
);
self.start_key = key;
}
self.inner
.put_image(key, img, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
self.inner.put_image(key, img, ctx).await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -283,7 +271,7 @@ impl<'a> SplitImageLayerWriter<'a> {
ctx: &RequestContext,
end_key: Key,
discard_fn: D,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -303,7 +291,7 @@ impl<'a> SplitImageLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
@@ -358,7 +346,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), BatchSplitWriterError> {
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -378,8 +366,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?,
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
@@ -399,8 +386,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
.await?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
@@ -410,19 +396,16 @@ impl<'a> SplitDeltaLayerWriter<'a> {
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
return Err(BatchSplitWriterError::Other(anyhow::anyhow!(
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
)));
);
}
}
self.last_key_written = key;
let (_, inner) = self.inner.as_mut().unwrap();
inner
.put_value(key, lsn, val, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
inner.put_value(key, lsn, val, ctx).await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -430,7 +413,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -456,7 +439,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}

View File

@@ -35,9 +35,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -78,7 +76,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -450,11 +448,7 @@ impl DeltaLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -483,12 +477,15 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), DeltaLayerWriterError> {
let val_ser =
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
) -> anyhow::Result<()> {
let (_, res) = self
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
.put_value_bytes(
key,
lsn,
Value::ser(&val)?.slice_len(),
val.will_init(),
ctx,
)
.await;
res
}
@@ -500,46 +497,25 @@ impl DeltaLayerWriterInner {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{
if self.lsn_range.start > lsn {
return (
val,
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
))),
);
}
assert!(
self.lsn_range.start <= lsn,
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
);
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let off = match res {
Ok((off, _)) => off,
Err(e) => {
return (
val,
Err(match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
DeltaLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
DeltaLayerWriterError::Other(err)
}
},
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
}),
);
}
Err(e) => return (val, Err(anyhow::anyhow!(e))),
};
let blob_ref = BlobRef::new(off, will_init);
@@ -549,10 +525,7 @@ impl DeltaLayerWriterInner {
self.num_keys += 1;
(
val,
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
)
(val, res.map_err(|e| anyhow::anyhow!(e)))
}
fn size(&self) -> u64 {
@@ -566,7 +539,7 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let file = self
@@ -575,24 +548,17 @@ impl DeltaLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
})?;
.await?;
// Write out the index
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (index_root_blk, block_buf) = self.tree.finish()?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
res?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -609,27 +575,24 @@ impl DeltaLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary
.ser_into_page()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
res?;
let metadata = file
.metadata()
.await
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
.context("get file metadata to determine size")?;
// 5GB limit for objects without multipart upload (which we don't want to use)
// Make it a little bit below to account for differing GB units
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
if metadata.len() > S3_UPLOAD_LIMIT {
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
)));
}
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
);
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
@@ -646,7 +609,7 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all()
.await
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
.maybe_fatal_err("delta_layer sync_all")?;
trace!("created delta layer {}", self.path);
@@ -731,7 +694,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), DeltaLayerWriterError> {
) -> anyhow::Result<()> {
self.inner
.as_mut()
.unwrap()
@@ -746,7 +709,7 @@ impl DeltaLayerWriter {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{
@@ -768,7 +731,7 @@ impl DeltaLayerWriter {
mut self,
key_end: Key,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(key_end, ctx).await
}
@@ -782,14 +745,6 @@ impl DeltaLayerWriter {
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeltaLayerWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -800,7 +755,7 @@ pub enum RewriteSummaryError {
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::Error::new(e))
Self::Other(anyhow::anyhow!(e))
}
}
@@ -1486,14 +1441,6 @@ impl DeltaLayerInner {
offset
}
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
@@ -1679,7 +1626,6 @@ pub(crate) mod test {
use crate::tenant::disk_btree::tests::TestDisk;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
/// Construct an index for a fictional delta layer and and then
@@ -2356,8 +2302,7 @@ pub(crate) mod test {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut num_items = 0;
for _ in 0..3 {
iter.next_batch().await.unwrap();
@@ -2374,8 +2319,7 @@ pub(crate) mod test {
iter.key_values_batch.clear();
}
// Test if the result is correct
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
assert_delta_iter_equal(&mut iter, &test_deltas).await;
}
}

View File

@@ -157,7 +157,7 @@ mod tests {
.await
.unwrap();
let merge_iter = MergeIterator::create(
let merge_iter = MergeIterator::create_for_testing(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,
@@ -182,7 +182,7 @@ mod tests {
result.extend(test_deltas1[90..100].iter().cloned());
assert_filter_iter_equal(&mut filter_iter, &result).await;
let merge_iter = MergeIterator::create(
let merge_iter = MergeIterator::create_for_testing(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,

View File

@@ -33,9 +33,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -76,7 +74,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -342,14 +340,6 @@ impl ImageLayer {
}
}
#[derive(Debug, thiserror::Error)]
pub enum ImageLayerWriterError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -694,14 +684,6 @@ impl ImageLayerInner {
}
}
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub(crate) fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
@@ -824,11 +806,7 @@ impl ImageLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -864,13 +842,8 @@ impl ImageLayerWriterInner {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -880,18 +853,7 @@ impl ImageLayerWriterInner {
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
let (off, compression_info) = res?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -903,9 +865,7 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree
.append(&keybuf, off)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
self.tree.append(&keybuf, off)?;
#[cfg(feature = "testing")]
{
@@ -925,12 +885,8 @@ impl ImageLayerWriterInner {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
// NB: we don't update the (un)compressed metrics, since we can't determine them without
// decompressing the image. This seems okay.
@@ -940,23 +896,11 @@ impl ImageLayerWriterInner {
.blob_writer
.write_blob_raw(raw_with_header.slice_len(), ctx)
.await;
let offset = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
let offset = res?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree
.append(&keybuf, offset)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
self.tree.append(&keybuf, offset)?;
#[cfg(feature = "testing")]
{
@@ -973,7 +917,7 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
// Calculate compression ratio
@@ -996,24 +940,17 @@ impl ImageLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
})?;
.await?;
// Write out the index
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let (index_root_blk, block_buf) = self.tree.finish()?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
res?;
offset += PAGE_SZ as u64;
}
@@ -1036,18 +973,14 @@ impl ImageLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary
.ser_into_page()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
res?;
let metadata = file.metadata().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!(
"get metadata to determine file size: {}",
e
))
})?;
let metadata = file
.metadata()
.await
.context("get metadata to determine file size")?;
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
@@ -1070,9 +1003,9 @@ impl ImageLayerWriterInner {
// set inner.file here. The first read will have to re-open it.
// fsync the file
file.sync_all().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
})?;
file.sync_all()
.await
.maybe_fatal_err("image_layer sync_all")?;
trace!("created image layer {}", self.path);
@@ -1152,7 +1085,7 @@ impl ImageLayerWriter {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
@@ -1167,7 +1100,7 @@ impl ImageLayerWriter {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
) -> anyhow::Result<()> {
self.inner
.as_mut()
.unwrap()
@@ -1191,7 +1124,7 @@ impl ImageLayerWriter {
pub(crate) async fn finish(
mut self,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(ctx, None).await
}
@@ -1200,7 +1133,7 @@ impl ImageLayerWriter {
mut self,
end_key: Key,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
}
}
@@ -1299,7 +1232,6 @@ mod test {
use crate::context::RequestContext;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
#[tokio::test]
@@ -1566,8 +1498,7 @@ mod test {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = img_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut num_items = 0;
for _ in 0..3 {
iter.next_batch().await.unwrap();
@@ -1584,8 +1515,7 @@ mod test {
iter.key_values_batch.clear();
}
// Test if the result is correct
let mut iter = img_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
}
}

View File

@@ -26,7 +26,6 @@ use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
use super::delta_layer::DeltaLayerWriterError;
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
use crate::config::PageServerConf;
@@ -582,17 +581,7 @@ impl InMemoryLayer {
estimated_in_mem_size: AtomicU64::new(0),
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum InMemoryLayerError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
impl InMemoryLayer {
/// Write path.
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
@@ -602,7 +591,7 @@ impl InMemoryLayer {
&self,
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> Result<(), InMemoryLayerError> {
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
@@ -616,11 +605,7 @@ impl InMemoryLayer {
} = serialized_batch;
// Write the batch to the file
inner
.file
.write_raw(&raw, ctx)
.await
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
@@ -652,8 +637,7 @@ impl InMemoryLayer {
batch_offset,
len,
will_init,
})
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
})?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
@@ -810,25 +794,14 @@ impl InMemoryLayer {
ctx,
)
.await;
res.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
DeltaLayerWriterError::Other(err) => err,
})?;
res?;
}
}
}
}
// MAX is used here because we identify L0 layers by full key range
let (desc, path) = delta_layer_writer
.finish(Key::MAX, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
DeltaLayerWriterError::Other(err) => err,
})?;
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//

View File

@@ -19,14 +19,6 @@ pub(crate) enum LayerRef<'a> {
}
impl<'a> LayerRef<'a> {
#[allow(dead_code)]
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
match self {
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
}
}
fn iter_with_options(
self,
ctx: &'a RequestContext,
@@ -322,6 +314,28 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
}
impl<'a> MergeIterator<'a> {
#[cfg(test)]
pub(crate) fn create_for_testing(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
/// Create a new merge iterator with custom options.
///
/// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
/// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
/// the buffer does not take too much memory.
///
/// The default options for L0 compactions are:
/// - max_read_size: 1024 * 8192 (8MB)
/// - max_batch_size: 1024
///
/// The default options for gc-compaction are:
/// - max_read_size: 128 * 8192 (1MB)
/// - max_batch_size: 128
pub fn create_with_options(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
@@ -351,14 +365,6 @@ impl<'a> MergeIterator<'a> {
}
}
pub fn create(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
while let Some(mut iter) = self.heap.peek_mut() {
if !iter.is_loaded() {
@@ -477,7 +483,7 @@ mod tests {
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
.await
.unwrap();
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
@@ -549,7 +555,7 @@ mod tests {
let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
.await
.unwrap();
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
@@ -670,7 +676,7 @@ mod tests {
// Test with different layer order for MergeIterator::create to ensure the order
// is stable.
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
@@ -682,7 +688,7 @@ mod tests {
);
assert_merge_iter_equal(&mut merge_iter, &expect).await;
let mut merge_iter = MergeIterator::create(
let mut merge_iter = MergeIterator::create_for_testing(
&[
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_4.get_as_delta(&ctx).await.unwrap(),

View File

@@ -300,7 +300,6 @@ pub(crate) fn log_compaction_error(
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Cancelled => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,

View File

@@ -119,8 +119,6 @@ use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
@@ -775,21 +773,6 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
}
}
impl From<crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError>
for CreateImageLayersError
{
fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self {
match err {
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => {
Self::Cancelled
}
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => {
Self::Other(err)
}
}
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -2058,9 +2041,6 @@ impl Timeline {
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::Cancelled) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::AlreadyRunning(_)) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
@@ -5252,17 +5232,7 @@ impl Timeline {
};
// Write all the keys we just read into our new image layer.
image_layer_writer
.put_image(img_key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
anyhow::anyhow!("flush task cancelled"),
),
ImageLayerWriterError::Other(err) => {
CreateImageLayersError::Other(err)
}
})?;
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
@@ -5359,15 +5329,7 @@ impl Timeline {
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
// on the normal data path either.
image_layer_writer
.put_image(k, v, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
}
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
})?;
image_layer_writer.put_image(k, v, ctx).await?;
}
if wrote_any_image {
@@ -5881,8 +5843,6 @@ pub(crate) enum CompactionError {
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
#[error("cancelled")]
Cancelled,
}
impl CompactionError {
@@ -5897,7 +5857,6 @@ impl CompactionError {
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
| Self::Cancelled
)
}
@@ -6963,22 +6922,9 @@ impl Timeline {
)
.await?;
for (key, img) in images {
image_layer_writer
.put_image(key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
image_layer_writer.put_image(key, img, ctx).await?;
}
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
@@ -7432,10 +7378,7 @@ impl TimelineWriter<'_> {
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
}
res.map_err(|e| match e {
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
InMemoryLayerError::Other(err) => err,
})
res
}
#[cfg(test)]

View File

@@ -1547,7 +1547,7 @@ impl Timeline {
ctx,
)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
// Safety of layer rewrites:
// - We are writing to a different local file path than we are reading from, so the old Layer
@@ -1572,7 +1572,7 @@ impl Timeline {
let (desc, path) = image_layer_writer
.finish(ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
@@ -1994,7 +1994,13 @@ impl Timeline {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
MergeIterator::create_with_options(
&deltas,
&[],
ctx,
1024 * 8192, /* 8 MiB buffer per layer iterator */
1024,
)
};
// This iterator walks through all keys and is needed to calculate size used by each key
@@ -2140,7 +2146,7 @@ impl Timeline {
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -2199,7 +2205,7 @@ impl Timeline {
.unwrap()
.put_value(key, lsn, value, ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
} else {
let owner = self.shard_identity.get_shard_number(&key);
@@ -2217,7 +2223,7 @@ impl Timeline {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
@@ -2828,7 +2834,7 @@ impl Timeline {
Ok(())
}
/// Check if the memory usage is within the limit.
/// Check to bail out of gc compaction early if it would use too much memory.
async fn check_memory_usage(
self: &Arc<Self>,
layer_selection: &[Layer],
@@ -2841,7 +2847,8 @@ impl Timeline {
let layer_desc = layer.layer_desc();
if layer_desc.is_delta() {
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
// Multiply the layer size so that tests can pass.
// Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
// use 3MB layer size and we need to account for that).
estimated_memory_usage_mb +=
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_delta_layers += 1;
@@ -3682,7 +3689,8 @@ impl Timeline {
let (desc, path) = delta_writer_before
.finish(job_desc.compaction_key_range.start, ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3692,7 +3700,8 @@ impl Timeline {
let (desc, path) = delta_writer_after
.finish(key.key_range.end, ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3711,7 +3720,8 @@ impl Timeline {
writer
.finish_with_discard_fn(self, ctx, end_key, discard)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
.context("failed to finish image layer writer")
.map_err(CompactionError::Other)?
} else {
drop(writer);
Vec::new()
@@ -3724,7 +3734,8 @@ impl Timeline {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?
} else {
drop(delta_layer_writer);
Vec::new()
@@ -4249,10 +4260,7 @@ impl TimelineAdaptor {
unfinished_image_layer,
} = outcome
{
let (desc, path) = unfinished_image_layer
.finish(ctx)
.await
.map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?;
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_images.push(image_layer);

View File

@@ -1,7 +1,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
@@ -817,10 +816,7 @@ async fn copy_lsn_prefix(
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
})?;
.map_err(Error::Prepare)?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(Error::Prepare)?;

View File

@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.3.0";
pub(crate) const EXT_VERSION: &str = "0.3.1";
pub(crate) const EXT_SCHEMA: &str = "auth";
#[derive(Clone)]

View File

@@ -5181,7 +5181,8 @@ impl Service {
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
tenants.retain(|_id, s| s.shard.count != *new_shard_count);
tenants
.retain(|id, s| !(id.tenant_id == *tenant_id && s.shard.count == *new_shard_count));
detach_locations
};

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import urllib.parse
from enum import StrEnum
from typing import TYPE_CHECKING, final
import requests
@@ -9,11 +10,23 @@ from requests.auth import AuthBase
from typing_extensions import override
from fixtures.log_helper import log
from fixtures.utils import wait_until
if TYPE_CHECKING:
from requests import PreparedRequest
COMPUTE_AUDIENCE = "compute"
"""
The value to place in the `aud` claim.
"""
@final
class ComputeClaimsScope(StrEnum):
ADMIN = "admin"
@final
class BearerAuth(AuthBase):
"""
@@ -50,6 +63,35 @@ class EndpointHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def prewarm_lfc_status(self) -> dict[str, str]:
res = self.get(f"http://localhost:{self.external_port}/lfc/prewarm")
res.raise_for_status()
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self):
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, error {err}"
wait_until(prewarmed)
def offload_lfc(self):
url = f"http://localhost:{self.external_port}/lfc/offload"
self.post(url).raise_for_status()
def offloaded():
res = self.get(url)
res.raise_for_status()
json = res.json()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, error {err}"
wait_until(offloaded)
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",

View File

@@ -21,6 +21,7 @@ if TYPE_CHECKING:
Any,
)
from fixtures.endpoint.http import ComputeClaimsScope
from fixtures.pg_version import PgVersion
@@ -535,12 +536,16 @@ class NeonLocalCli(AbstractNeonCli):
res.check_returncode()
return res
def endpoint_generate_jwt(self, endpoint_id: str) -> str:
def endpoint_generate_jwt(
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.
"""
args = ["endpoint", "generate-jwt", endpoint_id]
if scope:
args += ["--scope", str(scope)]
cmd = self.raw_cli(args)
cmd.check_returncode()

View File

@@ -51,7 +51,7 @@ from fixtures.common_types import (
TimelineId,
)
from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.endpoint.http import ComputeClaimsScope, EndpointHttpClient
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.neon_cli import NeonLocalCli, Pagectl
@@ -1185,7 +1185,9 @@ class NeonEnv:
"broker": {},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {"port": self.port_distributor.get_port()},
"endpoint_storage": {
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
@@ -4218,7 +4220,7 @@ class Endpoint(PgProtocol, LogUtils):
self.config(config_lines)
self.__jwt = self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id)
self.__jwt = self.generate_jwt()
return self
@@ -4265,6 +4267,14 @@ class Endpoint(PgProtocol, LogUtils):
return self
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.
"""
assert self.endpoint_id is not None
return self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id, scope)
def endpoint_path(self) -> Path:
"""Path to endpoint directory"""
assert self.endpoint_id

View File

@@ -206,7 +206,7 @@ class NeonProject:
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}"
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from http.client import FORBIDDEN, UNAUTHORIZED
from typing import TYPE_CHECKING
import jwt
import pytest
from fixtures.endpoint.http import COMPUTE_AUDIENCE, ComputeClaimsScope, EndpointHttpClient
from fixtures.utils import run_only_on_default_postgres
from requests import RequestException
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
def test_compute_no_scope_claim(neon_simple_env: NeonEnv):
"""
Test that if the JWT scope is not admin and no compute_id is specified,
the external HTTP server returns a 403 Forbidden error.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
# Encode nothing in the token
token = jwt.encode({}, env.auth_keys.priv, algorithm="EdDSA")
# Create an admin-scoped HTTP client
client = EndpointHttpClient(
external_port=endpoint.external_http_port,
internal_port=endpoint.internal_http_port,
jwt=token,
)
try:
client.status()
pytest.fail("Exception should have been raised")
except RequestException as e:
assert e.response is not None
assert e.response.status_code == FORBIDDEN
@pytest.mark.parametrize(
"audience",
(COMPUTE_AUDIENCE, "invalid", None),
ids=["with_audience", "with_invalid_audience", "without_audience"],
)
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
def test_compute_admin_scope_claim(neon_simple_env: NeonEnv, audience: str | None):
"""
Test that an admin-scoped JWT can access the compute's external HTTP server
without the compute_id being specified in the claims.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
data: dict[str, str | list[str]] = {"scope": str(ComputeClaimsScope.ADMIN)}
if audience:
data["aud"] = [audience]
token = jwt.encode(data, env.auth_keys.priv, algorithm="EdDSA")
# Create an admin-scoped HTTP client
client = EndpointHttpClient(
external_port=endpoint.external_http_port,
internal_port=endpoint.internal_http_port,
jwt=token,
)
try:
client.status()
if audience != COMPUTE_AUDIENCE:
pytest.fail("Exception should have been raised")
except RequestException as e:
assert e.response is not None
assert e.response.status_code == UNAUTHORIZED

View File

@@ -4,10 +4,12 @@ import pytest
from aiohttp import ClientSession
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import run_only_on_default_postgres
from jwcrypto import jwk, jwt
@pytest.mark.asyncio
@run_only_on_default_postgres("test doesn't use postgres")
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
"""
Inserts, retrieves, and deletes test file using a JWT token
@@ -35,7 +37,6 @@ async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv)
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
headers = {"Authorization": f"Bearer {token}"}
log.info(f"cache key url {key}")
log.info(f"token {token}")
async with ClientSession(headers=headers) as session:
async with session.get(key) as res:

View File

@@ -1,11 +1,24 @@
import random
import threading
import time
from enum import Enum
import pytest
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
from prometheus_client.parser import text_string_to_metric_families as prom_parse_impl
class LfcQueryMethod(Enum):
COMPUTE_CTL = False
POSTGRES = True
PREWARM_LABEL = "compute_ctl_lfc_prewarm_requests_total"
OFFLOAD_LABEL = "compute_ctl_lfc_offload_requests_total"
QUERY_OPTIONS = LfcQueryMethod.POSTGRES, LfcQueryMethod.COMPUTE_CTL
def check_pinned_entries(cur):
@@ -19,11 +32,20 @@ def check_pinned_entries(cur):
assert n_pinned == 0
def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
return {
sample.name: sample.value
for family in prom_parse_impl(client.metrics())
for sample in family.samples
if sample.name in (PREWARM_LABEL, OFFLOAD_LABEL)
}
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
@@ -34,30 +56,57 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon version '1.6'")
pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
log.info(f"Inserting {n_records} rows")
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
http_client = endpoint.http_client()
if query is LfcQueryMethod.COMPUTE_CTL:
status = http_client.prewarm_lfc_status()
assert status["status"] == "not_prewarmed"
assert "error" not in status
http_client.offload_lfc()
assert http_client.prewarm_lfc_status()["status"] == "not_prewarmed"
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0}
else:
pg_cur.execute("select get_local_cache_state()")
lfc_state = pg_cur.fetchall()[0][0]
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
cur.execute("alter extension neon update to '1.6'")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
# wait until compute_ctl completes downgrade of extension to default version
time.sleep(1)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("alter extension neon update to '1.6'")
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = pg_cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
pg_cur.execute("select * from get_prewarm_info()")
prewarm_info = pg_cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
total, prewarmed, skipped, _ = prewarm_info
progress = (prewarmed + skipped) * 100 // total
log.info(f"Prewarm progress: {progress}%")
assert lfc_used_pages > 10000
assert (
@@ -66,18 +115,23 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
)
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
lfc_cur.execute("select sum(pk) from t")
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
check_pinned_entries(cur)
check_pinned_entries(pg_cur)
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
if query is LfcQueryMethod.COMPUTE_CTL:
assert http_client.prewarm_lfc_status() == desired
assert prom_parse(http_client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1}
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMethod):
env = neon_simple_env
n_records = 10000
n_threads = 4
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
@@ -87,40 +141,58 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
"neon.file_cache_prewarm_limit=1000000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute(
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon version '1.6'")
pg_cur.execute("CREATE DATABASE lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
lfc_cur.execute(
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
)
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
log.info(f"Inserting {n_records} rows")
lfc_cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
http_client = endpoint.http_client()
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.offload_lfc()
else:
pg_cur.execute("select get_local_cache_state()")
lfc_state = pg_cur.fetchall()[0][0]
running = True
n_prewarms = 0
def workload():
conn = endpoint.connect()
cur = conn.cursor()
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
n_transfers = 0
while running:
src = random.randint(1, n_records)
dst = random.randint(1, n_records)
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
lfc_cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
lfc_cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
n_transfers += 1
log.info(f"Number of transfers: {n_transfers}")
def prewarm():
conn = endpoint.connect()
cur = conn.cursor()
n_prewarms = 0
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
while running:
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
cur.execute("select pg_reload_conf()")
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
pg_cur.execute("alter system set neon.file_cache_size_limit='1MB'")
pg_cur.execute("select pg_reload_conf()")
pg_cur.execute("alter system set neon.file_cache_size_limit='1GB'")
pg_cur.execute("select pg_reload_conf()")
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
nonlocal n_prewarms
n_prewarms += 1
log.info(f"Number of prewarms: {n_prewarms}")
@@ -140,8 +212,10 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
t.join()
prewarm_thread.join()
cur.execute("select sum(balance) from accounts")
total_balance = cur.fetchall()[0][0]
lfc_cur.execute("select sum(balance) from accounts")
total_balance = lfc_cur.fetchall()[0][0]
assert total_balance == 0
check_pinned_entries(cur)
check_pinned_entries(pg_cur)
if query is LfcQueryMethod.COMPUTE_CTL:
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: n_prewarms}

View File

@@ -1334,6 +1334,13 @@ def test_sharding_split_failures(
tenant_id, timeline_id, shard_count=initial_shard_count, placement_policy='{"Attached":1}'
)
# Create bystander tenants with various shard counts. They should not be affected by the aborted
# splits. Regression test for https://github.com/neondatabase/cloud/issues/28589.
bystanders = {} # id → shard_count
for bystander_shard_count in [1, 2, 4, 8]:
id, _ = env.create_tenant(shard_count=bystander_shard_count)
bystanders[id] = bystander_shard_count
env.storage_controller.allowed_errors.extend(
[
# All split failures log a warning when then enqueue the abort operation
@@ -1394,6 +1401,8 @@ def test_sharding_split_failures(
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
if tenant_shard_id.tenant_id != tenant_id:
continue # skip bystanders
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
assert tenant_shard_id.shard_count == initial_shard_count
if loc[1]["mode"] == "Secondary":
@@ -1414,6 +1423,8 @@ def test_sharding_split_failures(
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
if tenant_shard_id.tenant_id != tenant_id:
continue # skip bystanders
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
assert tenant_shard_id.shard_count == split_shard_count
if loc[1]["mode"] == "Secondary":
@@ -1496,6 +1507,12 @@ def test_sharding_split_failures(
# the scheduler reaches an idle state
env.storage_controller.reconcile_until_idle(timeout_secs=30)
# Check that all bystanders are still around.
for bystander_id, bystander_shard_count in bystanders.items():
response = env.storage_controller.tenant_describe(bystander_id)
assert TenantId(response["tenant_id"]) == bystander_id
assert len(response["shards"]) == bystander_shard_count
env.storage_controller.consistency_check()