mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
10 Commits
devin/1746
...
skyzh/add-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab7e5fbf95 | ||
|
|
0ef6851219 | ||
|
|
5c356c63eb | ||
|
|
384e3df2ad | ||
|
|
f9b3a2e059 | ||
|
|
79ee78ea32 | ||
|
|
0e0ad073bf | ||
|
|
6827f2f58c | ||
|
|
c82e363ed9 | ||
|
|
50dc2fae77 |
2
.github/workflows/benchmarking.yml
vendored
2
.github/workflows/benchmarking.yml
vendored
@@ -64,7 +64,7 @@ jobs:
|
||||
env:
|
||||
ORG_ID: org-solitary-dew-09443886
|
||||
LIMIT: 100
|
||||
SEARCH: "Created by actions/neon-project-create; GITHUB_RUN_ID"
|
||||
SEARCH: "GITHUB_RUN_ID="
|
||||
BASE_URL: https://console-stage.neon.build/api/v2
|
||||
DRY_RUN: "false" # Set to "true" to just test out the workflow
|
||||
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1284,6 +1284,7 @@ name = "compute_tools"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"aws-config",
|
||||
"aws-sdk-kms",
|
||||
"aws-sdk-s3",
|
||||
@@ -1420,6 +1421,7 @@ dependencies = [
|
||||
"clap",
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"endpoint_storage",
|
||||
"futures",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
|
||||
@@ -243,6 +243,7 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
|
||||
http-utils = { version = "0.1", path = "./libs/http-utils/" }
|
||||
metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
pageserver = { path = "./pageserver" }
|
||||
|
||||
@@ -1084,23 +1084,12 @@ RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
|
||||
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions pgrx14"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
|
||||
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions pgrx14"
|
||||
#
|
||||
# Version 14 is now required by a few
|
||||
# Version 14 is now required by a few
|
||||
# This layer should be used as a base for new pgrx extensions,
|
||||
# and eventually get merged with `rust-extensions-build`
|
||||
#
|
||||
@@ -1333,8 +1322,8 @@ ARG PG_VERSION
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
WORKDIR /ext-src
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.1.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "62fec9e472cb805c53ba24a0765afdb8ea2720cfc03ae7813e61687b36d1b0ad pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
|
||||
@@ -1362,7 +1351,8 @@ COPY compute/patches/anon_v2.patch .
|
||||
# This is an experimental extension, never got to real production.
|
||||
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/latest/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
|
||||
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/2.1.0/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "48e7f5ae2f1ca516df3da86c5c739d48dd780a4e885705704ccaad0faa89d6c0 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt && \
|
||||
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "=0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
|
||||
@@ -10,6 +10,7 @@ default = []
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
async-compression.workspace = true
|
||||
base64.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
|
||||
@@ -1,17 +1,10 @@
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
|
||||
LfcPrewarmState,
|
||||
};
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
|
||||
};
|
||||
@@ -25,6 +18,16 @@ use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
use tokio::spawn;
|
||||
use tracing::{Instrument, debug, error, info, instrument, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -150,6 +153,9 @@ pub struct ComputeState {
|
||||
/// set up the span relationship ourselves.
|
||||
pub startup_span: Option<tracing::span::Span>,
|
||||
|
||||
pub lfc_prewarm_state: LfcPrewarmState,
|
||||
pub lfc_offload_state: LfcOffloadState,
|
||||
|
||||
pub metrics: ComputeMetrics,
|
||||
}
|
||||
|
||||
@@ -163,6 +169,8 @@ impl ComputeState {
|
||||
pspec: None,
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
lfc_prewarm_state: LfcPrewarmState::default(),
|
||||
lfc_offload_state: LfcOffloadState::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,6 +206,8 @@ pub struct ParsedSpec {
|
||||
pub pageserver_connstr: String,
|
||||
pub safekeeper_connstrings: Vec<String>,
|
||||
pub storage_auth_token: Option<String>,
|
||||
pub endpoint_storage_addr: Option<SocketAddr>,
|
||||
pub endpoint_storage_token: Option<String>,
|
||||
}
|
||||
|
||||
impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
@@ -251,6 +261,18 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
.or(Err("invalid timeline id"))?
|
||||
};
|
||||
|
||||
let endpoint_storage_addr: Option<SocketAddr> = spec
|
||||
.endpoint_storage_addr
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
|
||||
.unwrap_or_default()
|
||||
.parse()
|
||||
.ok();
|
||||
let endpoint_storage_token = spec
|
||||
.endpoint_storage_token
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
|
||||
|
||||
Ok(ParsedSpec {
|
||||
spec,
|
||||
pageserver_connstr,
|
||||
@@ -258,6 +280,8 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
storage_auth_token,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
endpoint_storage_addr,
|
||||
endpoint_storage_token,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -736,6 +760,9 @@ impl ComputeNode {
|
||||
// Log metrics so that we can search for slow operations in logs
|
||||
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
|
||||
|
||||
if pspec.spec.prewarm_lfc_on_startup {
|
||||
self.prewarm_lfc();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
202
compute_tools/src/compute_prewarm.rs
Normal file
202
compute_tools/src/compute_prewarm.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
use crate::compute::ComputeNode;
|
||||
use anyhow::{Context, Result, bail};
|
||||
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
use compute_api::responses::LfcPrewarmState;
|
||||
use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
use std::sync::Arc;
|
||||
use tokio::{io::AsyncReadExt, spawn};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(serde::Serialize, Default)]
|
||||
pub struct LfcPrewarmStateWithProgress {
|
||||
#[serde(flatten)]
|
||||
base: LfcPrewarmState,
|
||||
total: i32,
|
||||
prewarmed: i32,
|
||||
skipped: i32,
|
||||
}
|
||||
|
||||
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
|
||||
struct EndpointStoragePair {
|
||||
url: String,
|
||||
token: String,
|
||||
}
|
||||
|
||||
const KEY: &str = "lfc_state";
|
||||
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
|
||||
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
|
||||
bail!("pspec.endpoint_id missing")
|
||||
};
|
||||
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
|
||||
bail!("pspec.endpoint_storage_addr missing")
|
||||
};
|
||||
let tenant_id = pspec.tenant_id;
|
||||
let timeline_id = pspec.timeline_id;
|
||||
|
||||
let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
|
||||
let Some(ref token) = pspec.endpoint_storage_token else {
|
||||
bail!("pspec.endpoint_storage_token missing")
|
||||
};
|
||||
let token = token.clone();
|
||||
Ok(EndpointStoragePair { url, token })
|
||||
}
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
// If prewarm failed, we want to get overall number of segments as well as done ones.
|
||||
// However, this function should be reliable even if querying postgres failed.
|
||||
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
|
||||
info!("requesting LFC prewarm state from postgres");
|
||||
let mut state = LfcPrewarmStateWithProgress::default();
|
||||
{
|
||||
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
|
||||
}
|
||||
|
||||
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
error!(%err, "connecting to postgres");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
let row = match client
|
||||
.query_one("select * from get_prewarm_info()", &[])
|
||||
.await
|
||||
{
|
||||
Ok(row) => row,
|
||||
Err(err) => {
|
||||
error!(%err, "querying LFC prewarm status");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
state.total = row.try_get(0).unwrap_or_default();
|
||||
state.prewarmed = row.try_get(1).unwrap_or_default();
|
||||
state.skipped = row.try_get(2).unwrap_or_default();
|
||||
state
|
||||
}
|
||||
|
||||
pub fn lfc_offload_state(&self) -> LfcOffloadState {
|
||||
self.state.lock().unwrap().lfc_offload_state.clone()
|
||||
}
|
||||
|
||||
/// Returns false if there is a prewarm request ongoing, true otherwise
|
||||
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
|
||||
crate::metrics::LFC_PREWARM_REQUESTS.inc();
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
||||
if let LfcPrewarmState::Prewarming =
|
||||
std::mem::replace(state, LfcPrewarmState::Prewarming)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let cloned = self.clone();
|
||||
spawn(async move {
|
||||
let Err(err) = cloned.prewarm_impl().await else {
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
|
||||
return;
|
||||
};
|
||||
error!(%err);
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
|
||||
error: err.to_string(),
|
||||
};
|
||||
});
|
||||
true
|
||||
}
|
||||
|
||||
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.pspec.as_ref().unwrap().try_into()
|
||||
}
|
||||
|
||||
async fn prewarm_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
info!(%url, "requesting LFC state from endpoint storage");
|
||||
|
||||
let request = Client::new().get(&url).bearer_auth(token);
|
||||
let res = request.send().await.context("querying endpoint storage")?;
|
||||
let status = res.status();
|
||||
if status != StatusCode::OK {
|
||||
bail!("{status} querying endpoint storage")
|
||||
}
|
||||
|
||||
let mut uncompressed = Vec::new();
|
||||
let lfc_state = res
|
||||
.bytes()
|
||||
.await
|
||||
.context("getting request body from endpoint storage")?;
|
||||
ZstdDecoder::new(lfc_state.iter().as_slice())
|
||||
.read_to_end(&mut uncompressed)
|
||||
.await
|
||||
.context("decoding LFC state")?;
|
||||
let uncompressed_len = uncompressed.len();
|
||||
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
|
||||
|
||||
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
|
||||
.await
|
||||
.context("loading LFC state into postgres")
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Returns false if there is an offload request ongoing, true otherwise
|
||||
pub fn offload_lfc(self: &Arc<Self>) -> bool {
|
||||
crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_offload_state;
|
||||
if let LfcOffloadState::Offloading =
|
||||
std::mem::replace(state, LfcOffloadState::Offloading)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let cloned = self.clone();
|
||||
spawn(async move {
|
||||
let Err(err) = cloned.offload_lfc_impl().await else {
|
||||
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
|
||||
return;
|
||||
};
|
||||
error!(%err);
|
||||
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
|
||||
error: err.to_string(),
|
||||
};
|
||||
});
|
||||
true
|
||||
}
|
||||
|
||||
async fn offload_lfc_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
info!(%url, "requesting LFC state from postgres");
|
||||
|
||||
let mut compressed = Vec::new();
|
||||
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
.query_one("select get_local_cache_state()", &[])
|
||||
.await
|
||||
.context("querying LFC state")?
|
||||
.try_get::<usize, &[u8]>(0)
|
||||
.context("deserializing LFC state")
|
||||
.map(ZstdEncoder::new)?
|
||||
.read_to_end(&mut compressed)
|
||||
.await
|
||||
.context("compressing LFC state")?;
|
||||
let compressed_len = compressed.len();
|
||||
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
|
||||
|
||||
let request = Client::new().put(url).bearer_auth(token).body(compressed);
|
||||
match request.send().await {
|
||||
Ok(res) if res.status() == StatusCode::OK => Ok(()),
|
||||
Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
|
||||
Err(err) => Err(err).context("writing to endpoint storage"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,6 +223,9 @@ pub fn write_postgres_conf(
|
||||
// TODO: tune this after performance testing
|
||||
writeln!(file, "pgaudit.log_rotation_age=5")?;
|
||||
|
||||
// Enable audit logs for pg_session_jwt extension
|
||||
writeln!(file, "pg_session_jwt.audit_log=on")?;
|
||||
|
||||
// Add audit shared_preload_libraries, if they are not present.
|
||||
//
|
||||
// The caller who sets the flag is responsible for ensuring that the necessary
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::{Result, anyhow};
|
||||
use axum::{RequestExt, body::Body};
|
||||
use axum_extra::{
|
||||
TypedHeader,
|
||||
headers::{Authorization, authorization::Bearer},
|
||||
};
|
||||
use compute_api::requests::ComputeClaims;
|
||||
use compute_api::requests::{COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope};
|
||||
use futures::future::BoxFuture;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
|
||||
@@ -25,13 +23,14 @@ pub(in crate::http) struct Authorize {
|
||||
impl Authorize {
|
||||
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
|
||||
let mut validation = Validation::new(Algorithm::EdDSA);
|
||||
// Nothing is currently required
|
||||
validation.required_spec_claims = HashSet::new();
|
||||
validation.validate_exp = true;
|
||||
// Unused by the control plane
|
||||
validation.validate_aud = false;
|
||||
// Unused by the control plane
|
||||
validation.validate_nbf = false;
|
||||
// Unused by the control plane
|
||||
validation.validate_aud = false;
|
||||
validation.set_audience(&[COMPUTE_AUDIENCE]);
|
||||
// Nothing is currently required
|
||||
validation.set_required_spec_claims(&[] as &[&str; 0]);
|
||||
|
||||
Self {
|
||||
compute_id,
|
||||
@@ -64,11 +63,47 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
|
||||
};
|
||||
|
||||
if data.claims.compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
match data.claims.scope {
|
||||
// TODO: We should validate audience for every token, but
|
||||
// instead of this ad-hoc validation, we should turn
|
||||
// [`Validation::validate_aud`] on. This is merely a stopgap
|
||||
// while we roll out `aud` deployment. We return a 401
|
||||
// Unauthorized because when we eventually do use
|
||||
// [`Validation`], we will hit the above `Err` match arm which
|
||||
// returns 401 Unauthorized.
|
||||
Some(ComputeClaimsScope::Admin) => {
|
||||
let Some(ref audience) = data.claims.audience else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"missing audience in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid audience in authorization token claims",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// If the scope is not [`ComputeClaimsScope::Admin`], then we
|
||||
// must validate the compute_id
|
||||
_ => {
|
||||
let Some(ref claimed_compute_id) = data.claims.compute_id else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"missing compute_id in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if *claimed_compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make claims available to any subsequent middleware or request
|
||||
|
||||
39
compute_tools/src/http/routes/lfc.rs
Normal file
39
compute_tools/src/http/routes/lfc.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
|
||||
use crate::http::JsonResponse;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Json, http::StatusCode};
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
||||
|
||||
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
|
||||
Json(compute.lfc_prewarm_state().await)
|
||||
}
|
||||
|
||||
// Following functions are marked async for axum, as it's more convenient than wrapping these
|
||||
// in async lambdas at call site
|
||||
|
||||
pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadState> {
|
||||
Json(compute.lfc_offload_state())
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
|
||||
if compute.prewarm_lfc() {
|
||||
StatusCode::ACCEPTED.into_response()
|
||||
} else {
|
||||
JsonResponse::error(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
"Multiple requests for prewarm are not allowed",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn offload(compute: Compute) -> Response {
|
||||
if compute.offload_lfc() {
|
||||
StatusCode::ACCEPTED.into_response()
|
||||
} else {
|
||||
JsonResponse::error(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
"Multiple requests for prewarm offload are not allowed",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ pub(in crate::http) mod extensions;
|
||||
pub(in crate::http) mod failpoints;
|
||||
pub(in crate::http) mod grants;
|
||||
pub(in crate::http) mod insights;
|
||||
pub(in crate::http) mod lfc;
|
||||
pub(in crate::http) mod metrics;
|
||||
pub(in crate::http) mod metrics_json;
|
||||
pub(in crate::http) mod status;
|
||||
|
||||
@@ -23,7 +23,7 @@ use super::{
|
||||
middleware::authorize::Authorize,
|
||||
routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
grants, insights, metrics, metrics_json, status, terminate,
|
||||
grants, insights, lfc, metrics, metrics_json, status, terminate,
|
||||
},
|
||||
};
|
||||
use crate::compute::ComputeNode;
|
||||
@@ -85,6 +85,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
|
||||
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
|
||||
@@ -11,6 +11,7 @@ pub mod http;
|
||||
pub mod logger;
|
||||
pub mod catalog;
|
||||
pub mod compute;
|
||||
pub mod compute_prewarm;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
pub mod installed_extensions;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::{
|
||||
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
|
||||
IntCounter, IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -97,6 +97,24 @@ pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
/// Needed as neon.file_cache_prewarm_batch == 0 doesn't mean we never tried to prewarm.
|
||||
/// On the other hand, LFC_PREWARMED_PAGES is excessive as we can GET /lfc/prewarm
|
||||
pub(crate) static LFC_PREWARM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_ctl_lfc_prewarm_requests_total",
|
||||
"Total number of LFC prewarm requests made by compute_ctl",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static LFC_OFFLOAD_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_ctl_lfc_offload_requests_total",
|
||||
"Total number of LFC offload requests made by compute_ctl",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = COMPUTE_CTL_UP.collect();
|
||||
metrics.extend(INSTALLED_EXTENSIONS.collect());
|
||||
@@ -106,5 +124,7 @@ pub fn collect() -> Vec<MetricFamily> {
|
||||
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
|
||||
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
|
||||
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
|
||||
metrics.extend(LFC_PREWARM_REQUESTS.collect());
|
||||
metrics.extend(LFC_OFFLOAD_REQUESTS.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ mod pg_helpers_tests {
|
||||
r#"fsync = off
|
||||
wal_level = logical
|
||||
hot_standby = on
|
||||
prewarm_lfc_on_startup = off
|
||||
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
|
||||
wal_log_hints = on
|
||||
log_connections = on
|
||||
|
||||
@@ -41,7 +41,7 @@ storage_broker.workspace = true
|
||||
http-utils.workspace = true
|
||||
utils.workspace = true
|
||||
whoami.workspace = true
|
||||
|
||||
endpoint_storage.workspace = true
|
||||
compute_api.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -16,10 +16,11 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use clap::Parser;
|
||||
use compute_api::requests::ComputeClaimsScope;
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::broker::StorageBroker;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::{
|
||||
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
|
||||
@@ -705,6 +706,9 @@ struct EndpointStopCmdArgs {
|
||||
struct EndpointGenerateJwtCmdArgs {
|
||||
#[clap(help = "Postgres endpoint id")]
|
||||
endpoint_id: String,
|
||||
|
||||
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
|
||||
scope: Option<ComputeClaimsScope>,
|
||||
}
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
@@ -1018,7 +1022,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
})
|
||||
.collect(),
|
||||
endpoint_storage: EndpointStorageConf {
|
||||
port: ENDPOINT_STORAGE_DEFAULT_PORT,
|
||||
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
|
||||
},
|
||||
pg_distrib_dir: None,
|
||||
neon_distrib_dir: None,
|
||||
@@ -1484,10 +1488,25 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
None
|
||||
};
|
||||
|
||||
let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
|
||||
+ Duration::from_secs(86400))
|
||||
.as_secs();
|
||||
let claims = endpoint_storage::claims::EndpointStorageClaims {
|
||||
tenant_id: endpoint.tenant_id,
|
||||
timeline_id: endpoint.timeline_id,
|
||||
endpoint_id: endpoint_id.to_string(),
|
||||
exp,
|
||||
};
|
||||
|
||||
let endpoint_storage_token = env.generate_auth_token(&claims)?;
|
||||
let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
|
||||
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint
|
||||
.start(
|
||||
&auth_token,
|
||||
endpoint_storage_token,
|
||||
endpoint_storage_addr,
|
||||
safekeepers_generation,
|
||||
safekeepers,
|
||||
pageservers,
|
||||
@@ -1540,12 +1559,16 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
endpoint.stop(&args.mode, args.destroy)?;
|
||||
}
|
||||
EndpointCmd::GenerateJwt(args) => {
|
||||
let endpoint_id = &args.endpoint_id;
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
.get(endpoint_id)
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
let jwt = endpoint.generate_jwt()?;
|
||||
let endpoint = {
|
||||
let endpoint_id = &args.endpoint_id;
|
||||
|
||||
cplane
|
||||
.endpoints
|
||||
.get(endpoint_id)
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
|
||||
};
|
||||
|
||||
let jwt = endpoint.generate_jwt(args.scope)?;
|
||||
|
||||
print!("{jwt}");
|
||||
}
|
||||
|
||||
@@ -45,7 +45,9 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use compute_api::requests::{ComputeClaims, ConfigurationRequest};
|
||||
use compute_api::requests::{
|
||||
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
|
||||
};
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
|
||||
};
|
||||
@@ -630,9 +632,17 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
pub fn generate_jwt(&self) -> Result<String> {
|
||||
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
|
||||
self.env.generate_auth_token(&ComputeClaims {
|
||||
compute_id: self.endpoint_id.clone(),
|
||||
audience: match scope {
|
||||
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
|
||||
_ => None,
|
||||
},
|
||||
compute_id: match scope {
|
||||
Some(ComputeClaimsScope::Admin) => None,
|
||||
_ => Some(self.endpoint_id.clone()),
|
||||
},
|
||||
scope,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -640,6 +650,8 @@ impl Endpoint {
|
||||
pub async fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
endpoint_storage_token: String,
|
||||
endpoint_storage_addr: String,
|
||||
safekeepers_generation: Option<SafekeeperGeneration>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
@@ -733,6 +745,9 @@ impl Endpoint {
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
audit_log_level: ComputeAudit::Disabled,
|
||||
logs_export_host: None::<String>,
|
||||
endpoint_storage_addr: Some(endpoint_storage_addr),
|
||||
endpoint_storage_token: Some(endpoint_storage_token),
|
||||
prewarm_lfc_on_startup: false,
|
||||
};
|
||||
|
||||
// this strange code is needed to support respec() in tests
|
||||
@@ -903,7 +918,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
),
|
||||
)
|
||||
.bearer_auth(self.generate_jwt()?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
@@ -980,7 +995,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
))
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.bearer_auth(self.generate_jwt()?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.body(
|
||||
serde_json::to_string(&ConfigurationRequest {
|
||||
spec,
|
||||
|
||||
@@ -3,17 +3,19 @@ use crate::local_env::LocalEnv;
|
||||
use anyhow::{Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Directory within .neon which will be used by default for LocalFs remote storage.
|
||||
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
|
||||
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
|
||||
pub const ENDPOINT_STORAGE_DEFAULT_ADDR: SocketAddr =
|
||||
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9993);
|
||||
|
||||
pub struct EndpointStorage {
|
||||
pub bin: Utf8PathBuf,
|
||||
pub data_dir: Utf8PathBuf,
|
||||
pub pemfile: Utf8PathBuf,
|
||||
pub port: u16,
|
||||
pub addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl EndpointStorage {
|
||||
@@ -22,7 +24,7 @@ impl EndpointStorage {
|
||||
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
|
||||
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
|
||||
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
|
||||
port: env.endpoint_storage.port,
|
||||
addr: env.endpoint_storage.listen_addr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +33,7 @@ impl EndpointStorage {
|
||||
}
|
||||
|
||||
fn listen_addr(&self) -> Utf8PathBuf {
|
||||
format!("127.0.0.1:{}", self.port).into()
|
||||
format!("{}:{}", self.addr.ip(), self.addr.port()).into()
|
||||
}
|
||||
|
||||
pub fn init(&self) -> Result<()> {
|
||||
|
||||
@@ -20,7 +20,9 @@ use utils::auth::encode_from_key_file;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::broker::StorageBroker;
|
||||
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
|
||||
use crate::endpoint_storage::{
|
||||
ENDPOINT_STORAGE_DEFAULT_ADDR, ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage,
|
||||
};
|
||||
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
@@ -151,10 +153,10 @@ pub struct NeonLocalInitConf {
|
||||
pub generate_local_ssl_certs: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct EndpointStorageConf {
|
||||
pub port: u16,
|
||||
pub listen_addr: SocketAddr,
|
||||
}
|
||||
|
||||
/// Broker config for cluster internal communication.
|
||||
@@ -241,6 +243,14 @@ impl Default for NeonStorageControllerConf {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EndpointStorageConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NeonBroker {
|
||||
pub fn client_url(&self) -> Url {
|
||||
let url = if let Some(addr) = self.listen_https_addr {
|
||||
|
||||
@@ -12,6 +12,7 @@ ERROR: invalid JWT encoding
|
||||
-- Test creating a session with an expired JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
|
||||
ERROR: Token used after it has expired
|
||||
DETAIL: exp=1742564432
|
||||
-- Test creating a session with a valid JWT
|
||||
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
|
||||
jwt_session_init
|
||||
|
||||
@@ -343,7 +343,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
|
||||
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
|
||||
fn token() -> String {
|
||||
let claims = endpoint_storage::Claims {
|
||||
let claims = endpoint_storage::claims::EndpointStorageClaims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: TIMELINE_ID,
|
||||
endpoint_id: ENDPOINT_ID.into(),
|
||||
@@ -489,16 +489,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
}
|
||||
|
||||
fn delete_prefix_token(uri: &str) -> String {
|
||||
use serde::Serialize;
|
||||
let parts = uri.split("/").collect::<Vec<&str>>();
|
||||
#[derive(Serialize)]
|
||||
struct PrefixClaims {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
endpoint_id: Option<endpoint_storage::EndpointId>,
|
||||
exp: u64,
|
||||
}
|
||||
let claims = PrefixClaims {
|
||||
let claims = endpoint_storage::claims::DeletePrefixClaims {
|
||||
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
|
||||
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
|
||||
endpoint_id: parts.get(3).map(ToString::to_string),
|
||||
|
||||
52
endpoint_storage/src/claims.rs
Normal file
52
endpoint_storage/src/claims.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use utils::id::{EndpointId, TenantId, TimelineId};
|
||||
|
||||
/// Claims to add, remove, or retrieve endpoint data. Used by compute_ctl
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct EndpointStorageClaims {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub endpoint_id: EndpointId,
|
||||
pub exp: u64,
|
||||
}
|
||||
|
||||
/// Claims to remove tenant, timeline, or endpoint data. Used by control plane
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct DeletePrefixClaims {
|
||||
pub tenant_id: TenantId,
|
||||
/// None when tenant is deleted (endpoint_id is also None in this case)
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
/// None when timeline is deleted
|
||||
pub endpoint_id: Option<EndpointId>,
|
||||
pub exp: u64,
|
||||
}
|
||||
|
||||
impl Display for EndpointStorageClaims {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"EndpointClaims(tenant_id={} timeline_id={} endpoint_id={} exp={})",
|
||||
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DeletePrefixClaims {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"DeletePrefixClaims(tenant_id={} timeline_id={} endpoint_id={}, exp={})",
|
||||
self.tenant_id,
|
||||
self.timeline_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string()),
|
||||
self.endpoint_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string()),
|
||||
self.exp
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod claims;
|
||||
use crate::claims::{DeletePrefixClaims, EndpointStorageClaims};
|
||||
use anyhow::Result;
|
||||
use axum::extract::{FromRequestParts, Path};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
@@ -13,7 +15,7 @@ use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::id::{EndpointId, TenantId, TimelineId};
|
||||
|
||||
// simplified version of utils::auth::JwtAuth
|
||||
pub struct JwtAuth {
|
||||
@@ -79,26 +81,6 @@ pub struct Storage {
|
||||
pub max_upload_file_limit: usize,
|
||||
}
|
||||
|
||||
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct Claims {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub endpoint_id: EndpointId,
|
||||
pub exp: u64,
|
||||
}
|
||||
|
||||
impl Display for Claims {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
|
||||
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct KeyRequest {
|
||||
tenant_id: TenantId,
|
||||
@@ -107,6 +89,13 @@ struct KeyRequest {
|
||||
path: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
struct PrefixKeyRequest {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
endpoint_id: Option<EndpointId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct S3Path {
|
||||
pub path: RemotePath,
|
||||
@@ -165,7 +154,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
.extract::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
let claims: Claims = state
|
||||
let claims: EndpointStorageClaims = state
|
||||
.auth
|
||||
.decode(bearer.token())
|
||||
.map_err(|e| bad_request(e, "decoding token"))?;
|
||||
@@ -178,7 +167,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
path.endpoint_id.clone()
|
||||
};
|
||||
|
||||
let route = Claims {
|
||||
let route = EndpointStorageClaims {
|
||||
tenant_id: path.tenant_id,
|
||||
timeline_id: path.timeline_id,
|
||||
endpoint_id,
|
||||
@@ -193,38 +182,13 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct PrefixKeyPath {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub endpoint_id: Option<EndpointId>,
|
||||
}
|
||||
|
||||
impl Display for PrefixKeyPath {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
|
||||
self.tenant_id,
|
||||
self.timeline_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string()),
|
||||
self.endpoint_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string())
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct PrefixS3Path {
|
||||
pub path: RemotePath,
|
||||
}
|
||||
|
||||
impl From<&PrefixKeyPath> for PrefixS3Path {
|
||||
fn from(path: &PrefixKeyPath) -> Self {
|
||||
impl From<&DeletePrefixClaims> for PrefixS3Path {
|
||||
fn from(path: &DeletePrefixClaims) -> Self {
|
||||
let timeline_id = path
|
||||
.timeline_id
|
||||
.as_ref()
|
||||
@@ -250,21 +214,27 @@ impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
|
||||
state: &Arc<Storage>,
|
||||
) -> Result<Self, Self::Rejection> {
|
||||
let Path(path) = parts
|
||||
.extract::<Path<PrefixKeyPath>>()
|
||||
.extract::<Path<PrefixKeyRequest>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid route"))?;
|
||||
let TypedHeader(Authorization(bearer)) = parts
|
||||
.extract::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
let claims: PrefixKeyPath = state
|
||||
let claims: DeletePrefixClaims = state
|
||||
.auth
|
||||
.decode(bearer.token())
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
if path != claims {
|
||||
return Err(unauthorized(path, claims));
|
||||
let route = DeletePrefixClaims {
|
||||
tenant_id: path.tenant_id,
|
||||
timeline_id: path.timeline_id,
|
||||
endpoint_id: path.endpoint_id,
|
||||
exp: claims.exp,
|
||||
};
|
||||
if route != claims {
|
||||
return Err(unauthorized(route, claims));
|
||||
}
|
||||
Ok((&path).into())
|
||||
Ok((&route).into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -297,7 +267,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn s3_path() {
|
||||
let auth = Claims {
|
||||
let auth = EndpointStorageClaims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: TIMELINE_ID,
|
||||
endpoint_id: ENDPOINT_ID.into(),
|
||||
@@ -327,10 +297,11 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn prefix_s3_path() {
|
||||
let mut path = PrefixKeyPath {
|
||||
let mut path = DeletePrefixClaims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: None,
|
||||
endpoint_id: None,
|
||||
exp: 0,
|
||||
};
|
||||
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
|
||||
assert_eq!(
|
||||
|
||||
@@ -1,16 +1,58 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
use std::str::FromStr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::privilege::Privilege;
|
||||
use crate::responses::ComputeCtlConfig;
|
||||
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
|
||||
|
||||
/// The value to place in the [`ComputeClaims::audience`] claim.
|
||||
pub static COMPUTE_AUDIENCE: &str = "compute";
|
||||
|
||||
/// Available scopes for a compute's JWT.
|
||||
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ComputeClaimsScope {
|
||||
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
|
||||
/// facilities.
|
||||
Admin,
|
||||
}
|
||||
|
||||
impl FromStr for ComputeClaimsScope {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"admin" => Ok(ComputeClaimsScope::Admin),
|
||||
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// When making requests to the `compute_ctl` external HTTP server, the client
|
||||
/// must specify a set of claims in `Authorization` header JWTs such that
|
||||
/// `compute_ctl` can authorize the request.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename = "snake_case")]
|
||||
pub struct ComputeClaims {
|
||||
pub compute_id: String,
|
||||
/// The compute ID that will validate the token. The only case in which this
|
||||
/// can be [`None`] is if [`Self::scope`] is
|
||||
/// [`ComputeClaimsScope::Admin`].
|
||||
pub compute_id: Option<String>,
|
||||
|
||||
/// The scope of what the token authorizes.
|
||||
pub scope: Option<ComputeClaimsScope>,
|
||||
|
||||
/// The recipient the token is intended for.
|
||||
///
|
||||
/// See [RFC 7519](https://www.rfc-editor.org/rfc/rfc7519#section-4.1.3) for
|
||||
/// more information.
|
||||
///
|
||||
/// TODO: Remove the [`Option`] wrapper when control plane learns to send
|
||||
/// the claim.
|
||||
#[serde(rename = "aud")]
|
||||
pub audience: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
/// Request of the /configure API
|
||||
|
||||
@@ -46,6 +46,30 @@ pub struct ExtensionInstallResponse {
|
||||
pub version: ExtVersion,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcPrewarmState {
|
||||
#[default]
|
||||
NotPrewarmed,
|
||||
Prewarming,
|
||||
Completed,
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcOffloadState {
|
||||
#[default]
|
||||
NotOffloaded,
|
||||
Offloading,
|
||||
Completed,
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Response of the /status API
|
||||
#[derive(Serialize, Debug, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
|
||||
@@ -172,6 +172,15 @@ pub struct ComputeSpec {
|
||||
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
|
||||
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
|
||||
pub logs_export_host: Option<String>,
|
||||
|
||||
/// Address of endpoint storage service
|
||||
pub endpoint_storage_addr: Option<String>,
|
||||
/// JWT for authorizing requests to endpoint storage service
|
||||
pub endpoint_storage_token: Option<String>,
|
||||
|
||||
/// If true, download LFC state from endpoint_storage and pass it to Postgres on startup
|
||||
#[serde(default)]
|
||||
pub prewarm_lfc_on_startup: bool,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
|
||||
@@ -84,6 +84,11 @@
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "prewarm_lfc_on_startup",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501",
|
||||
|
||||
@@ -43,6 +43,21 @@ pub struct NodeMetadata {
|
||||
pub other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
/// PostHog integration config
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct PostHogConfig {
|
||||
/// PostHog project ID
|
||||
project_id: String,
|
||||
/// Server-side (private) API key
|
||||
server_api_key: String,
|
||||
/// Client-side (public) API key
|
||||
client_api_key: String,
|
||||
/// Private API URL
|
||||
private_api_url: String,
|
||||
/// Public API URL
|
||||
public_api_url: String,
|
||||
}
|
||||
|
||||
/// `pageserver.toml`
|
||||
///
|
||||
/// We use serde derive with `#[serde(default)]` to generate a deserializer
|
||||
@@ -182,6 +197,8 @@ pub struct ConfigToml {
|
||||
pub tracing: Option<Tracing>,
|
||||
pub enable_tls_page_service_api: bool,
|
||||
pub dev_mode: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -659,6 +676,7 @@ impl Default for ConfigToml {
|
||||
tracing: None,
|
||||
enable_tls_page_service_api: false,
|
||||
dev_mode: false,
|
||||
posthog_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,6 +295,9 @@ pub struct TenantId(Id);
|
||||
|
||||
id_newtype!(TenantId);
|
||||
|
||||
/// If needed, reuse small string from proxy/src/types.rc
|
||||
pub type EndpointId = String;
|
||||
|
||||
// A pair uniquely identifying Neon instance.
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct TenantTimelineId {
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::time::Duration;
|
||||
use anyhow::{Context, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pem::Pem;
|
||||
@@ -230,6 +230,9 @@ pub struct PageServerConf {
|
||||
/// such as authentication requirements for HTTP and PostgreSQL APIs.
|
||||
/// This is insecure and should only be used in development environments.
|
||||
pub dev_mode: bool,
|
||||
|
||||
/// PostHog integration config
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -404,6 +407,7 @@ impl PageServerConf {
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
posthog_config,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -513,6 +517,7 @@ impl PageServerConf {
|
||||
}
|
||||
None => Vec::new(),
|
||||
},
|
||||
posthog_config,
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -2469,7 +2469,6 @@ async fn timeline_checkpoint_handler(
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Cancelled => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e),
|
||||
|
||||
@@ -3198,7 +3198,6 @@ impl TenantShard {
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
CompactionError::Cancelled => (),
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::Offload(_) => {}
|
||||
|
||||
@@ -90,18 +90,10 @@ impl Header {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BlobWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WriteBlobError {
|
||||
#[error(transparent)]
|
||||
Flush(BlobWriterError),
|
||||
Flush(FlushTaskError),
|
||||
#[error("blob too large ({len} bytes)")]
|
||||
BlobTooLarge { len: usize },
|
||||
#[error(transparent)]
|
||||
@@ -246,16 +238,14 @@ where
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
flush_task_span: tracing::Span,
|
||||
) -> Result<Self, BlobWriterError> {
|
||||
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
|
||||
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
io_buf: Some(BytesMut::new()),
|
||||
writer: BufferedWriter::new(
|
||||
file,
|
||||
start_offset,
|
||||
|| IoBufferMut::with_capacity(Self::CAPACITY),
|
||||
gate_token,
|
||||
gate.enter()?,
|
||||
cancel,
|
||||
ctx,
|
||||
flush_task_span,
|
||||
@@ -275,16 +265,13 @@ where
|
||||
&mut self,
|
||||
src_buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), BlobWriterError>) {
|
||||
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
|
||||
let res = self
|
||||
.writer
|
||||
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
|
||||
// Can remove all the complexity around owned buffers upstack
|
||||
.write_buffered_borrowed(&src_buf, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
|
||||
})
|
||||
.map(|len| {
|
||||
self.offset += len as u64;
|
||||
});
|
||||
@@ -431,10 +418,8 @@ where
|
||||
self,
|
||||
mode: BufferedWriterShutdownMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<W, BlobWriterError> {
|
||||
let (_, file) = self.writer.shutdown(mode, ctx).await.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
|
||||
})?;
|
||||
) -> Result<W, FlushTaskError> {
|
||||
let (_, file) = self.writer.shutdown(mode, ctx).await?;
|
||||
Ok(file)
|
||||
}
|
||||
}
|
||||
@@ -482,11 +467,8 @@ pub(crate) mod tests {
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
let mut wtr = BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test"))
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
let mut wtr =
|
||||
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
@@ -508,11 +490,7 @@ pub(crate) mod tests {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
.await?;
|
||||
file.disarm_into_inner()
|
||||
};
|
||||
Ok((temp_dir, pathbuf, offsets))
|
||||
|
||||
@@ -19,14 +19,6 @@ use crate::context::RequestContext;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BatchSplitWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) enum BatchWriterResult {
|
||||
Produced(ResidentLayer),
|
||||
Discarded(PersistentLayerKey),
|
||||
@@ -105,7 +97,7 @@ impl BatchLayerWriter {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<ResidentLayer>, BatchSplitWriterError> {
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
let res = self
|
||||
.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await?;
|
||||
@@ -123,7 +115,7 @@ impl BatchLayerWriter {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
discard_fn: D,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -147,14 +139,14 @@ impl BatchLayerWriter {
|
||||
generated_layers.push(BatchWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let res = match inner {
|
||||
LayerWriterWrapper::Delta(writer) => writer
|
||||
.finish(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
|
||||
LayerWriterWrapper::Image(writer) => writer
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
|
||||
LayerWriterWrapper::Delta(writer) => {
|
||||
writer.finish(layer_key.key_range.end, ctx).await
|
||||
}
|
||||
LayerWriterWrapper::Image(writer) => {
|
||||
writer
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
}
|
||||
};
|
||||
let layer = match res {
|
||||
Ok((desc, path)) => {
|
||||
@@ -163,7 +155,7 @@ impl BatchLayerWriter {
|
||||
Err(e) => {
|
||||
tokio::fs::remove_file(&path).await.ok();
|
||||
clean_up_layers(generated_layers);
|
||||
return Err(BatchSplitWriterError::Other(e));
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -243,7 +235,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), BatchSplitWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
@@ -261,8 +253,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
.await?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.batches.add_unfinished_image_writer(
|
||||
prev_image_writer,
|
||||
@@ -271,10 +262,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
);
|
||||
self.start_key = key;
|
||||
}
|
||||
self.inner
|
||||
.put_image(key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -283,7 +271,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
discard_fn: D,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -303,7 +291,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
.await
|
||||
}
|
||||
@@ -358,7 +346,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), BatchSplitWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
//
|
||||
@@ -378,8 +366,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?,
|
||||
.await?,
|
||||
));
|
||||
}
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
@@ -399,8 +386,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
.await?;
|
||||
let (start_key, prev_delta_writer) =
|
||||
self.inner.replace((key, next_delta_writer)).unwrap();
|
||||
self.batches.add_unfinished_delta_writer(
|
||||
@@ -410,19 +396,16 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
);
|
||||
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
return Err(BatchSplitWriterError::Other(anyhow::anyhow!(
|
||||
anyhow::bail!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
inner.estimated_size()
|
||||
)));
|
||||
);
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
inner
|
||||
.put_value(key, lsn, val, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
|
||||
inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -430,7 +413,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
discard_fn: D,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -456,7 +439,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -35,9 +35,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use crate::tenant::blob_io::BlobWriterError;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
@@ -78,7 +76,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -450,11 +448,7 @@ impl DeltaLayerWriterInner {
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -483,12 +477,15 @@ impl DeltaLayerWriterInner {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
let val_ser =
|
||||
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
|
||||
) -> anyhow::Result<()> {
|
||||
let (_, res) = self
|
||||
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
|
||||
.put_value_bytes(
|
||||
key,
|
||||
lsn,
|
||||
Value::ser(&val)?.slice_len(),
|
||||
val.will_init(),
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
res
|
||||
}
|
||||
@@ -500,46 +497,25 @@ impl DeltaLayerWriterInner {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
if self.lsn_range.start > lsn {
|
||||
return (
|
||||
val,
|
||||
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
))),
|
||||
);
|
||||
}
|
||||
|
||||
assert!(
|
||||
self.lsn_range.start <= lsn,
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
);
|
||||
// We don't want to use compression in delta layer creation
|
||||
let compression = ImageCompressionAlgorithm::Disabled;
|
||||
let (val, res) = self
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(val, ctx, compression)
|
||||
.await;
|
||||
|
||||
let off = match res {
|
||||
Ok((off, _)) => off,
|
||||
Err(e) => {
|
||||
return (
|
||||
val,
|
||||
Err(match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
DeltaLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
DeltaLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
Err(e) => return (val, Err(anyhow::anyhow!(e))),
|
||||
};
|
||||
|
||||
let blob_ref = BlobRef::new(off, will_init);
|
||||
@@ -549,10 +525,7 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(
|
||||
val,
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
|
||||
)
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
@@ -566,7 +539,7 @@ impl DeltaLayerWriterInner {
|
||||
self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
let file = self
|
||||
@@ -575,24 +548,17 @@ impl DeltaLayerWriterInner {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
|
||||
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
|
||||
})?;
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let (index_root_blk, block_buf) = self
|
||||
.tree
|
||||
.finish()
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
assert!(self.lsn_range.start < self.lsn_range.end);
|
||||
@@ -609,27 +575,24 @@ impl DeltaLayerWriterInner {
|
||||
};
|
||||
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary
|
||||
.ser_into_page()
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
.metadata()
|
||||
.await
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
.context("get file metadata to determine size")?;
|
||||
|
||||
// 5GB limit for objects without multipart upload (which we don't want to use)
|
||||
// Make it a little bit below to account for differing GB units
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
|
||||
if metadata.len() > S3_UPLOAD_LIMIT {
|
||||
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
file.path(),
|
||||
metadata.len()
|
||||
)));
|
||||
}
|
||||
ensure!(
|
||||
metadata.len() <= S3_UPLOAD_LIMIT,
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
file.path(),
|
||||
metadata.len()
|
||||
);
|
||||
|
||||
// Note: Because we opened the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
@@ -646,7 +609,7 @@ impl DeltaLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all()
|
||||
.await
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
.maybe_fatal_err("delta_layer sync_all")?;
|
||||
|
||||
trace!("created delta layer {}", self.path);
|
||||
|
||||
@@ -731,7 +694,7 @@ impl DeltaLayerWriter {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -746,7 +709,7 @@ impl DeltaLayerWriter {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
@@ -768,7 +731,7 @@ impl DeltaLayerWriter {
|
||||
mut self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(key_end, ctx).await
|
||||
}
|
||||
|
||||
@@ -782,14 +745,6 @@ impl DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeltaLayerWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
@@ -800,7 +755,7 @@ pub enum RewriteSummaryError {
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::Error::new(e))
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1486,14 +1441,6 @@ impl DeltaLayerInner {
|
||||
offset
|
||||
}
|
||||
|
||||
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -1679,7 +1626,6 @@ pub(crate) mod test {
|
||||
use crate::tenant::disk_btree::tests::TestDisk;
|
||||
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::{TenantShard, Timeline};
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
@@ -2356,8 +2302,7 @@ pub(crate) mod test {
|
||||
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
|
||||
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
|
||||
// Test if the batch size is correctly determined
|
||||
let mut iter = delta_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
let mut num_items = 0;
|
||||
for _ in 0..3 {
|
||||
iter.next_batch().await.unwrap();
|
||||
@@ -2374,8 +2319,7 @@ pub(crate) mod test {
|
||||
iter.key_values_batch.clear();
|
||||
}
|
||||
// Test if the result is correct
|
||||
let mut iter = delta_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
assert_delta_iter_equal(&mut iter, &test_deltas).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let merge_iter = MergeIterator::create(
|
||||
let merge_iter = MergeIterator::create_for_testing(
|
||||
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
|
||||
&[],
|
||||
&ctx,
|
||||
@@ -182,7 +182,7 @@ mod tests {
|
||||
result.extend(test_deltas1[90..100].iter().cloned());
|
||||
assert_filter_iter_equal(&mut filter_iter, &result).await;
|
||||
|
||||
let merge_iter = MergeIterator::create(
|
||||
let merge_iter = MergeIterator::create_for_testing(
|
||||
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
|
||||
&[],
|
||||
&ctx,
|
||||
|
||||
@@ -33,9 +33,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use crate::tenant::blob_io::BlobWriterError;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
@@ -76,7 +74,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -342,14 +340,6 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ImageLayerWriterError {
|
||||
#[error("flush task cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
@@ -694,14 +684,6 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -824,11 +806,7 @@ impl ImageLayerWriterInner {
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -864,13 +842,8 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"key not in range"
|
||||
)));
|
||||
}
|
||||
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
@@ -880,18 +853,7 @@ impl ImageLayerWriterInner {
|
||||
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let (off, compression_info) = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
ImageLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
ImageLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let (off, compression_info) = res?;
|
||||
if compression_info.compressed_size.is_some() {
|
||||
// The image has been considered for compression at least
|
||||
self.uncompressed_bytes_eligible += uncompressed_len;
|
||||
@@ -903,9 +865,7 @@ impl ImageLayerWriterInner {
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree
|
||||
.append(&keybuf, off)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
self.tree.append(&keybuf, off)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -925,12 +885,8 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"key not in range"
|
||||
)));
|
||||
}
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
|
||||
// NB: we don't update the (un)compressed metrics, since we can't determine them without
|
||||
// decompressing the image. This seems okay.
|
||||
@@ -940,23 +896,11 @@ impl ImageLayerWriterInner {
|
||||
.blob_writer
|
||||
.write_blob_raw(raw_with_header.slice_len(), ctx)
|
||||
.await;
|
||||
let offset = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
ImageLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
ImageLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
let offset = res?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree
|
||||
.append(&keybuf, offset)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
self.tree.append(&keybuf, offset)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -973,7 +917,7 @@ impl ImageLayerWriterInner {
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
// Calculate compression ratio
|
||||
@@ -996,24 +940,17 @@ impl ImageLayerWriterInner {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
|
||||
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
|
||||
})?;
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let (index_root_blk, block_buf) = self
|
||||
.tree
|
||||
.finish()
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer?
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
|
||||
@@ -1036,18 +973,14 @@ impl ImageLayerWriterInner {
|
||||
};
|
||||
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary
|
||||
.ser_into_page()
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
res?;
|
||||
|
||||
let metadata = file.metadata().await.map_err(|e| {
|
||||
ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"get metadata to determine file size: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
let metadata = file
|
||||
.metadata()
|
||||
.await
|
||||
.context("get metadata to determine file size")?;
|
||||
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_shard_id,
|
||||
@@ -1070,9 +1003,9 @@ impl ImageLayerWriterInner {
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
|
||||
// fsync the file
|
||||
file.sync_all().await.map_err(|e| {
|
||||
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
|
||||
})?;
|
||||
file.sync_all()
|
||||
.await
|
||||
.maybe_fatal_err("image_layer sync_all")?;
|
||||
|
||||
trace!("created image layer {}", self.path);
|
||||
|
||||
@@ -1152,7 +1085,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
@@ -1167,7 +1100,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -1191,7 +1124,7 @@ impl ImageLayerWriter {
|
||||
pub(crate) async fn finish(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(ctx, None).await
|
||||
}
|
||||
|
||||
@@ -1200,7 +1133,7 @@ impl ImageLayerWriter {
|
||||
mut self,
|
||||
end_key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
|
||||
}
|
||||
}
|
||||
@@ -1299,7 +1232,6 @@ mod test {
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::{TenantShard, Timeline};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1566,8 +1498,7 @@ mod test {
|
||||
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
|
||||
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
|
||||
// Test if the batch size is correctly determined
|
||||
let mut iter = img_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
let mut num_items = 0;
|
||||
for _ in 0..3 {
|
||||
iter.next_batch().await.unwrap();
|
||||
@@ -1584,8 +1515,7 @@ mod test {
|
||||
iter.key_values_batch.clear();
|
||||
}
|
||||
// Test if the result is correct
|
||||
let mut iter = img_layer.iter(&ctx);
|
||||
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
|
||||
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
|
||||
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ use utils::lsn::Lsn;
|
||||
use utils::vec_map::VecMap;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
|
||||
|
||||
use super::delta_layer::DeltaLayerWriterError;
|
||||
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
|
||||
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
|
||||
use crate::config::PageServerConf;
|
||||
@@ -582,17 +581,7 @@ impl InMemoryLayer {
|
||||
estimated_in_mem_size: AtomicU64::new(0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum InMemoryLayerError {
|
||||
#[error("flush task cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
/// Write path.
|
||||
///
|
||||
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
|
||||
@@ -602,7 +591,7 @@ impl InMemoryLayer {
|
||||
&self,
|
||||
serialized_batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), InMemoryLayerError> {
|
||||
) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
@@ -616,11 +605,7 @@ impl InMemoryLayer {
|
||||
} = serialized_batch;
|
||||
|
||||
// Write the batch to the file
|
||||
inner
|
||||
.file
|
||||
.write_raw(&raw, ctx)
|
||||
.await
|
||||
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
|
||||
let expected_new_len = base_offset
|
||||
@@ -652,8 +637,7 @@ impl InMemoryLayer {
|
||||
batch_offset,
|
||||
len,
|
||||
will_init,
|
||||
})
|
||||
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
|
||||
})?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
|
||||
@@ -810,25 +794,14 @@ impl InMemoryLayer {
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
res.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
DeltaLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
res?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MAX is used here because we identify L0 layers by full key range
|
||||
let (desc, path) = delta_layer_writer
|
||||
.finish(Key::MAX, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
DeltaLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
|
||||
|
||||
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
|
||||
//
|
||||
|
||||
@@ -19,14 +19,6 @@ pub(crate) enum LayerRef<'a> {
|
||||
}
|
||||
|
||||
impl<'a> LayerRef<'a> {
|
||||
#[allow(dead_code)]
|
||||
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
|
||||
match self {
|
||||
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
|
||||
Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
|
||||
}
|
||||
}
|
||||
|
||||
fn iter_with_options(
|
||||
self,
|
||||
ctx: &'a RequestContext,
|
||||
@@ -322,6 +314,28 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
|
||||
}
|
||||
|
||||
impl<'a> MergeIterator<'a> {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn create_for_testing(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
/// Create a new merge iterator with custom options.
|
||||
///
|
||||
/// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
|
||||
/// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
|
||||
/// the buffer does not take too much memory.
|
||||
///
|
||||
/// The default options for L0 compactions are:
|
||||
/// - max_read_size: 1024 * 8192 (8MB)
|
||||
/// - max_batch_size: 1024
|
||||
///
|
||||
/// The default options for gc-compaction are:
|
||||
/// - max_read_size: 128 * 8192 (1MB)
|
||||
/// - max_batch_size: 128
|
||||
pub fn create_with_options(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
@@ -351,14 +365,6 @@ impl<'a> MergeIterator<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
|
||||
while let Some(mut iter) = self.heap.peek_mut() {
|
||||
if !iter.is_loaded() {
|
||||
@@ -477,7 +483,7 @@ mod tests {
|
||||
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -549,7 +555,7 @@ mod tests {
|
||||
let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -670,7 +676,7 @@ mod tests {
|
||||
// Test with different layer order for MergeIterator::create to ensure the order
|
||||
// is stable.
|
||||
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
@@ -682,7 +688,7 @@ mod tests {
|
||||
);
|
||||
assert_merge_iter_equal(&mut merge_iter, &expect).await;
|
||||
|
||||
let mut merge_iter = MergeIterator::create(
|
||||
let mut merge_iter = MergeIterator::create_for_testing(
|
||||
&[
|
||||
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
|
||||
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
|
||||
|
||||
@@ -300,7 +300,6 @@ pub(crate) fn log_compaction_error(
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
Cancelled => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
AlreadyRunning(_) => Level::ERROR,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
|
||||
@@ -119,8 +119,6 @@ use crate::tenant::gc_result::GcResult;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
|
||||
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
|
||||
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
|
||||
@@ -775,21 +773,6 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError>
|
||||
for CreateImageLayersError
|
||||
{
|
||||
fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self {
|
||||
match err {
|
||||
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => {
|
||||
Self::Cancelled
|
||||
}
|
||||
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => {
|
||||
Self::Other(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub(crate) enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
@@ -2058,9 +2041,6 @@ impl Timeline {
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::Cancelled) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::AlreadyRunning(_)) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
@@ -5252,17 +5232,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer
|
||||
.put_image(img_key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
|
||||
anyhow::anyhow!("flush task cancelled"),
|
||||
),
|
||||
ImageLayerWriterError::Other(err) => {
|
||||
CreateImageLayersError::Other(err)
|
||||
}
|
||||
})?;
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
}
|
||||
@@ -5359,15 +5329,7 @@ impl Timeline {
|
||||
|
||||
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
|
||||
// on the normal data path either.
|
||||
image_layer_writer
|
||||
.put_image(k, v, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
|
||||
})?;
|
||||
image_layer_writer.put_image(k, v, ctx).await?;
|
||||
}
|
||||
|
||||
if wrote_any_image {
|
||||
@@ -5881,8 +5843,6 @@ pub(crate) enum CompactionError {
|
||||
Other(anyhow::Error),
|
||||
#[error("Compaction already running: {0}")]
|
||||
AlreadyRunning(&'static str),
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
@@ -5897,7 +5857,6 @@ impl CompactionError {
|
||||
PageReconstructError::Cancelled
|
||||
))
|
||||
| Self::Offload(OffloadError::Cancelled)
|
||||
| Self::Cancelled
|
||||
)
|
||||
}
|
||||
|
||||
@@ -6963,22 +6922,9 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
for (key, img) in images {
|
||||
image_layer_writer
|
||||
.put_image(key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
image_layer_writer.put_image(key, img, ctx).await?;
|
||||
}
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!("force created image layer {}", image_layer.local_path());
|
||||
{
|
||||
@@ -7432,10 +7378,7 @@ impl TimelineWriter<'_> {
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
|
||||
}
|
||||
|
||||
res.map_err(|e| match e {
|
||||
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
InMemoryLayerError::Other(err) => err,
|
||||
})
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1547,7 +1547,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
// Safety of layer rewrites:
|
||||
// - We are writing to a different local file path than we are reading from, so the old Layer
|
||||
@@ -1572,7 +1572,7 @@ impl Timeline {
|
||||
let (desc, path) = image_layer_writer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
|
||||
@@ -1994,7 +1994,13 @@ impl Timeline {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
MergeIterator::create_with_options(
|
||||
&deltas,
|
||||
&[],
|
||||
ctx,
|
||||
1024 * 8192, /* 8 MiB buffer per layer iterator */
|
||||
1024,
|
||||
)
|
||||
};
|
||||
|
||||
// This iterator walks through all keys and is needed to calculate size used by each key
|
||||
@@ -2140,7 +2146,7 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
@@ -2199,7 +2205,7 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value, ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
} else {
|
||||
let owner = self.shard_identity.get_shard_number(&key);
|
||||
|
||||
@@ -2217,7 +2223,7 @@ impl Timeline {
|
||||
let (desc, path) = writer
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
new_layers.push(new_delta);
|
||||
@@ -2828,7 +2834,7 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the memory usage is within the limit.
|
||||
/// Check to bail out of gc compaction early if it would use too much memory.
|
||||
async fn check_memory_usage(
|
||||
self: &Arc<Self>,
|
||||
layer_selection: &[Layer],
|
||||
@@ -2841,7 +2847,8 @@ impl Timeline {
|
||||
let layer_desc = layer.layer_desc();
|
||||
if layer_desc.is_delta() {
|
||||
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
|
||||
// Multiply the layer size so that tests can pass.
|
||||
// Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
|
||||
// use 3MB layer size and we need to account for that).
|
||||
estimated_memory_usage_mb +=
|
||||
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
|
||||
num_delta_layers += 1;
|
||||
@@ -3682,7 +3689,8 @@ impl Timeline {
|
||||
let (desc, path) = delta_writer_before
|
||||
.finish(job_desc.compaction_key_range.start, ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
@@ -3692,7 +3700,8 @@ impl Timeline {
|
||||
let (desc, path) = delta_writer_after
|
||||
.finish(key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
@@ -3711,7 +3720,8 @@ impl Timeline {
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, end_key, discard)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
|
||||
.context("failed to finish image layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
} else {
|
||||
drop(writer);
|
||||
Vec::new()
|
||||
@@ -3724,7 +3734,8 @@ impl Timeline {
|
||||
delta_layer_writer
|
||||
.finish_with_discard_fn(self, ctx, discard)
|
||||
.await
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
} else {
|
||||
drop(delta_layer_writer);
|
||||
Vec::new()
|
||||
@@ -4249,10 +4260,7 @@ impl TimelineAdaptor {
|
||||
unfinished_image_layer,
|
||||
} = outcome
|
||||
{
|
||||
let (desc, path) = unfinished_image_layer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?;
|
||||
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
|
||||
let image_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
self.new_images.push(image_layer);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
@@ -817,10 +816,7 @@ async fn copy_lsn_prefix(
|
||||
let (desc, path) = writer
|
||||
.finish(reused_highest_key, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
|
||||
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
|
||||
})?;
|
||||
.map_err(Error::Prepare)?;
|
||||
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
|
||||
.map_err(Error::Prepare)?;
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.0";
|
||||
pub(crate) const EXT_VERSION: &str = "0.3.1";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -5181,7 +5181,8 @@ impl Service {
|
||||
}
|
||||
|
||||
// We don't expect any new_shard_count shards to exist here, but drop them just in case
|
||||
tenants.retain(|_id, s| s.shard.count != *new_shard_count);
|
||||
tenants
|
||||
.retain(|id, s| !(id.tenant_id == *tenant_id && s.shard.count == *new_shard_count));
|
||||
|
||||
detach_locations
|
||||
};
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import urllib.parse
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
import requests
|
||||
@@ -9,11 +10,23 @@ from requests.auth import AuthBase
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from requests import PreparedRequest
|
||||
|
||||
|
||||
COMPUTE_AUDIENCE = "compute"
|
||||
"""
|
||||
The value to place in the `aud` claim.
|
||||
"""
|
||||
|
||||
|
||||
@final
|
||||
class ComputeClaimsScope(StrEnum):
|
||||
ADMIN = "admin"
|
||||
|
||||
|
||||
@final
|
||||
class BearerAuth(AuthBase):
|
||||
"""
|
||||
@@ -50,6 +63,35 @@ class EndpointHttpClient(requests.Session):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def prewarm_lfc_status(self) -> dict[str, str]:
|
||||
res = self.get(f"http://localhost:{self.external_port}/lfc/prewarm")
|
||||
res.raise_for_status()
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def prewarm_lfc(self):
|
||||
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
|
||||
|
||||
def prewarmed():
|
||||
json = self.prewarm_lfc_status()
|
||||
status, err = json["status"], json.get("error")
|
||||
assert status == "completed", f"{status}, error {err}"
|
||||
|
||||
wait_until(prewarmed)
|
||||
|
||||
def offload_lfc(self):
|
||||
url = f"http://localhost:{self.external_port}/lfc/offload"
|
||||
self.post(url).raise_for_status()
|
||||
|
||||
def offloaded():
|
||||
res = self.get(url)
|
||||
res.raise_for_status()
|
||||
json = res.json()
|
||||
status, err = json["status"], json.get("error")
|
||||
assert status == "completed", f"{status}, error {err}"
|
||||
|
||||
wait_until(offloaded)
|
||||
|
||||
def database_schema(self, database: str):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",
|
||||
|
||||
@@ -21,6 +21,7 @@ if TYPE_CHECKING:
|
||||
Any,
|
||||
)
|
||||
|
||||
from fixtures.endpoint.http import ComputeClaimsScope
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@@ -535,12 +536,16 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def endpoint_generate_jwt(self, endpoint_id: str) -> str:
|
||||
def endpoint_generate_jwt(
|
||||
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
"""
|
||||
args = ["endpoint", "generate-jwt", endpoint_id]
|
||||
if scope:
|
||||
args += ["--scope", str(scope)]
|
||||
|
||||
cmd = self.raw_cli(args)
|
||||
cmd.check_returncode()
|
||||
|
||||
@@ -51,7 +51,7 @@ from fixtures.common_types import (
|
||||
TimelineId,
|
||||
)
|
||||
from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.endpoint.http import ComputeClaimsScope, EndpointHttpClient
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
from fixtures.neon_cli import NeonLocalCli, Pagectl
|
||||
@@ -1185,7 +1185,9 @@ class NeonEnv:
|
||||
"broker": {},
|
||||
"safekeepers": [],
|
||||
"pageservers": [],
|
||||
"endpoint_storage": {"port": self.port_distributor.get_port()},
|
||||
"endpoint_storage": {
|
||||
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
|
||||
},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
}
|
||||
|
||||
@@ -4218,7 +4220,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
self.config(config_lines)
|
||||
|
||||
self.__jwt = self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id)
|
||||
self.__jwt = self.generate_jwt()
|
||||
|
||||
return self
|
||||
|
||||
@@ -4265,6 +4267,14 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
return self
|
||||
|
||||
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
"""
|
||||
assert self.endpoint_id is not None
|
||||
return self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id, scope)
|
||||
|
||||
def endpoint_path(self) -> Path:
|
||||
"""Path to endpoint directory"""
|
||||
assert self.endpoint_id
|
||||
|
||||
@@ -206,7 +206,7 @@ class NeonProject:
|
||||
self.neon_api = neon_api
|
||||
self.pg_bin = pg_bin
|
||||
proj = self.neon_api.create_project(
|
||||
pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}"
|
||||
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
self.id: str = proj["project"]["id"]
|
||||
self.name: str = proj["project"]["name"]
|
||||
|
||||
78
test_runner/regress/test_compute_http.py
Normal file
78
test_runner/regress/test_compute_http.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from http.client import FORBIDDEN, UNAUTHORIZED
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import jwt
|
||||
import pytest
|
||||
from fixtures.endpoint.http import COMPUTE_AUDIENCE, ComputeClaimsScope, EndpointHttpClient
|
||||
from fixtures.utils import run_only_on_default_postgres
|
||||
from requests import RequestException
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
|
||||
def test_compute_no_scope_claim(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that if the JWT scope is not admin and no compute_id is specified,
|
||||
the external HTTP server returns a 403 Forbidden error.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# Encode nothing in the token
|
||||
token = jwt.encode({}, env.auth_keys.priv, algorithm="EdDSA")
|
||||
|
||||
# Create an admin-scoped HTTP client
|
||||
client = EndpointHttpClient(
|
||||
external_port=endpoint.external_http_port,
|
||||
internal_port=endpoint.internal_http_port,
|
||||
jwt=token,
|
||||
)
|
||||
|
||||
try:
|
||||
client.status()
|
||||
pytest.fail("Exception should have been raised")
|
||||
except RequestException as e:
|
||||
assert e.response is not None
|
||||
assert e.response.status_code == FORBIDDEN
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"audience",
|
||||
(COMPUTE_AUDIENCE, "invalid", None),
|
||||
ids=["with_audience", "with_invalid_audience", "without_audience"],
|
||||
)
|
||||
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
|
||||
def test_compute_admin_scope_claim(neon_simple_env: NeonEnv, audience: str | None):
|
||||
"""
|
||||
Test that an admin-scoped JWT can access the compute's external HTTP server
|
||||
without the compute_id being specified in the claims.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
data: dict[str, str | list[str]] = {"scope": str(ComputeClaimsScope.ADMIN)}
|
||||
if audience:
|
||||
data["aud"] = [audience]
|
||||
|
||||
token = jwt.encode(data, env.auth_keys.priv, algorithm="EdDSA")
|
||||
|
||||
# Create an admin-scoped HTTP client
|
||||
client = EndpointHttpClient(
|
||||
external_port=endpoint.external_http_port,
|
||||
internal_port=endpoint.internal_http_port,
|
||||
jwt=token,
|
||||
)
|
||||
|
||||
try:
|
||||
client.status()
|
||||
if audience != COMPUTE_AUDIENCE:
|
||||
pytest.fail("Exception should have been raised")
|
||||
except RequestException as e:
|
||||
assert e.response is not None
|
||||
assert e.response.status_code == UNAUTHORIZED
|
||||
@@ -4,10 +4,12 @@ import pytest
|
||||
from aiohttp import ClientSession
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import run_only_on_default_postgres
|
||||
from jwcrypto import jwk, jwt
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@run_only_on_default_postgres("test doesn't use postgres")
|
||||
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Inserts, retrieves, and deletes test file using a JWT token
|
||||
@@ -35,7 +37,6 @@ async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv)
|
||||
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
log.info(f"cache key url {key}")
|
||||
log.info(f"token {token}")
|
||||
|
||||
async with ClientSession(headers=headers) as session:
|
||||
async with session.get(key) as res:
|
||||
|
||||
@@ -1,11 +1,24 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from enum import Enum
|
||||
|
||||
import pytest
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import USE_LFC
|
||||
from prometheus_client.parser import text_string_to_metric_families as prom_parse_impl
|
||||
|
||||
|
||||
class LfcQueryMethod(Enum):
|
||||
COMPUTE_CTL = False
|
||||
POSTGRES = True
|
||||
|
||||
|
||||
PREWARM_LABEL = "compute_ctl_lfc_prewarm_requests_total"
|
||||
OFFLOAD_LABEL = "compute_ctl_lfc_offload_requests_total"
|
||||
QUERY_OPTIONS = LfcQueryMethod.POSTGRES, LfcQueryMethod.COMPUTE_CTL
|
||||
|
||||
|
||||
def check_pinned_entries(cur):
|
||||
@@ -19,11 +32,20 @@ def check_pinned_entries(cur):
|
||||
assert n_pinned == 0
|
||||
|
||||
|
||||
def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
|
||||
return {
|
||||
sample.name: sample.value
|
||||
for family in prom_parse_impl(client.metrics())
|
||||
for sample in family.samples
|
||||
if sample.name in (PREWARM_LABEL, OFFLOAD_LABEL)
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod):
|
||||
env = neon_simple_env
|
||||
n_records = 1000000
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
@@ -34,30 +56,57 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon version '1.6'")
|
||||
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
|
||||
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
|
||||
cur.execute("select get_local_cache_state()")
|
||||
lfc_state = cur.fetchall()[0][0]
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("create extension neon version '1.6'")
|
||||
pg_cur.execute("create database lfc")
|
||||
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
log.info(f"Inserting {n_records} rows")
|
||||
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
|
||||
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
|
||||
log.info(f"Inserted {n_records} rows")
|
||||
|
||||
http_client = endpoint.http_client()
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
status = http_client.prewarm_lfc_status()
|
||||
assert status["status"] == "not_prewarmed"
|
||||
assert "error" not in status
|
||||
http_client.offload_lfc()
|
||||
assert http_client.prewarm_lfc_status()["status"] == "not_prewarmed"
|
||||
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0}
|
||||
else:
|
||||
pg_cur.execute("select get_local_cache_state()")
|
||||
lfc_state = pg_cur.fetchall()[0][0]
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
|
||||
cur.execute("alter extension neon update to '1.6'")
|
||||
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
# wait until compute_ctl completes downgrade of extension to default version
|
||||
time.sleep(1)
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("alter extension neon update to '1.6'")
|
||||
|
||||
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
|
||||
lfc_used_pages = cur.fetchall()[0][0]
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.prewarm_lfc()
|
||||
else:
|
||||
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
|
||||
pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
|
||||
lfc_used_pages = pg_cur.fetchall()[0][0]
|
||||
log.info(f"Used LFC size: {lfc_used_pages}")
|
||||
cur.execute("select * from get_prewarm_info()")
|
||||
prewarm_info = cur.fetchall()[0]
|
||||
pg_cur.execute("select * from get_prewarm_info()")
|
||||
prewarm_info = pg_cur.fetchall()[0]
|
||||
log.info(f"Prewarm info: {prewarm_info}")
|
||||
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
|
||||
total, prewarmed, skipped, _ = prewarm_info
|
||||
progress = (prewarmed + skipped) * 100 // total
|
||||
log.info(f"Prewarm progress: {progress}%")
|
||||
|
||||
assert lfc_used_pages > 10000
|
||||
assert (
|
||||
@@ -66,18 +115,23 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
|
||||
)
|
||||
|
||||
cur.execute("select sum(pk) from t")
|
||||
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
lfc_cur.execute("select sum(pk) from t")
|
||||
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
|
||||
check_pinned_entries(cur)
|
||||
check_pinned_entries(pg_cur)
|
||||
|
||||
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
assert http_client.prewarm_lfc_status() == desired
|
||||
assert prom_parse(http_client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1}
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
|
||||
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
|
||||
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMethod):
|
||||
env = neon_simple_env
|
||||
n_records = 10000
|
||||
n_threads = 4
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
@@ -87,40 +141,58 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
|
||||
"neon.file_cache_prewarm_limit=1000000",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon version '1.6'")
|
||||
cur.execute(
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("create extension neon version '1.6'")
|
||||
pg_cur.execute("CREATE DATABASE lfc")
|
||||
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
lfc_cur.execute(
|
||||
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
|
||||
)
|
||||
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
|
||||
cur.execute("select get_local_cache_state()")
|
||||
lfc_state = cur.fetchall()[0][0]
|
||||
log.info(f"Inserting {n_records} rows")
|
||||
lfc_cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
|
||||
log.info(f"Inserted {n_records} rows")
|
||||
|
||||
http_client = endpoint.http_client()
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.offload_lfc()
|
||||
else:
|
||||
pg_cur.execute("select get_local_cache_state()")
|
||||
lfc_state = pg_cur.fetchall()[0][0]
|
||||
|
||||
running = True
|
||||
n_prewarms = 0
|
||||
|
||||
def workload():
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
n_transfers = 0
|
||||
while running:
|
||||
src = random.randint(1, n_records)
|
||||
dst = random.randint(1, n_records)
|
||||
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
|
||||
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
|
||||
lfc_cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
|
||||
lfc_cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
|
||||
n_transfers += 1
|
||||
log.info(f"Number of transfers: {n_transfers}")
|
||||
|
||||
def prewarm():
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
n_prewarms = 0
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
while running:
|
||||
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
pg_cur.execute("alter system set neon.file_cache_size_limit='1MB'")
|
||||
pg_cur.execute("select pg_reload_conf()")
|
||||
pg_cur.execute("alter system set neon.file_cache_size_limit='1GB'")
|
||||
pg_cur.execute("select pg_reload_conf()")
|
||||
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.prewarm_lfc()
|
||||
else:
|
||||
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
|
||||
nonlocal n_prewarms
|
||||
n_prewarms += 1
|
||||
log.info(f"Number of prewarms: {n_prewarms}")
|
||||
|
||||
@@ -140,8 +212,10 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
|
||||
t.join()
|
||||
prewarm_thread.join()
|
||||
|
||||
cur.execute("select sum(balance) from accounts")
|
||||
total_balance = cur.fetchall()[0][0]
|
||||
lfc_cur.execute("select sum(balance) from accounts")
|
||||
total_balance = lfc_cur.fetchall()[0][0]
|
||||
assert total_balance == 0
|
||||
|
||||
check_pinned_entries(cur)
|
||||
check_pinned_entries(pg_cur)
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: n_prewarms}
|
||||
|
||||
@@ -1334,6 +1334,13 @@ def test_sharding_split_failures(
|
||||
tenant_id, timeline_id, shard_count=initial_shard_count, placement_policy='{"Attached":1}'
|
||||
)
|
||||
|
||||
# Create bystander tenants with various shard counts. They should not be affected by the aborted
|
||||
# splits. Regression test for https://github.com/neondatabase/cloud/issues/28589.
|
||||
bystanders = {} # id → shard_count
|
||||
for bystander_shard_count in [1, 2, 4, 8]:
|
||||
id, _ = env.create_tenant(shard_count=bystander_shard_count)
|
||||
bystanders[id] = bystander_shard_count
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
# All split failures log a warning when then enqueue the abort operation
|
||||
@@ -1394,6 +1401,8 @@ def test_sharding_split_failures(
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
if tenant_shard_id.tenant_id != tenant_id:
|
||||
continue # skip bystanders
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
|
||||
assert tenant_shard_id.shard_count == initial_shard_count
|
||||
if loc[1]["mode"] == "Secondary":
|
||||
@@ -1414,6 +1423,8 @@ def test_sharding_split_failures(
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
if tenant_shard_id.tenant_id != tenant_id:
|
||||
continue # skip bystanders
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
|
||||
assert tenant_shard_id.shard_count == split_shard_count
|
||||
if loc[1]["mode"] == "Secondary":
|
||||
@@ -1496,6 +1507,12 @@ def test_sharding_split_failures(
|
||||
# the scheduler reaches an idle state
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=30)
|
||||
|
||||
# Check that all bystanders are still around.
|
||||
for bystander_id, bystander_shard_count in bystanders.items():
|
||||
response = env.storage_controller.tenant_describe(bystander_id)
|
||||
assert TenantId(response["tenant_id"]) == bystander_id
|
||||
assert len(response["shards"]) == bystander_shard_count
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user