Compare commits

..

3 Commits

Author SHA1 Message Date
Anastasia Lubennikova
751e7fbfd4 don't require shared_preload_libraries in spec in local tests -
we don't pass settings this way in local setup
2025-05-05 23:41:24 +01:00
Anastasia Lubennikova
807e00e9d2 Don't require pgaudit library in local tests 2025-05-05 15:10:29 +01:00
Anastasia Lubennikova
664a3e0953 impr(compute): always add pgaudit to shared_preload_libraries
This is necessary to handle audit_log_level downgrade,
because audit extension once enabled requires library to be always present
2025-05-05 15:02:44 +01:00
104 changed files with 766 additions and 2297 deletions

View File

@@ -53,77 +53,6 @@ concurrency:
cancel-in-progress: true
jobs:
cleanup:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
env:
ORG_ID: org-solitary-dew-09443886
LIMIT: 100
SEARCH: "GITHUB_RUN_ID="
BASE_URL: https://console-stage.neon.build/api/v2
DRY_RUN: "false" # Set to "true" to just test out the workflow
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cleanup inactive Neon projects left over from prior runs
env:
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
run: |
set -euo pipefail
NOW=$(date -u +%s)
DAYS_AGO=$((NOW - 5 * 86400))
REQUEST_URL="$BASE_URL/projects?limit=$LIMIT&search=$(printf '%s' "$SEARCH" | jq -sRr @uri)&org_id=$ORG_ID"
echo "Requesting project list from:"
echo "$REQUEST_URL"
response=$(curl -s -X GET "$REQUEST_URL" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}" )
echo "Response:"
echo "$response" | jq .
projects_to_delete=$(echo "$response" | jq --argjson cutoff "$DAYS_AGO" '
.projects[]
| select(.compute_last_active_at != null)
| select((.compute_last_active_at | fromdateiso8601) < $cutoff)
| {id, name, compute_last_active_at}
')
if [ -z "$projects_to_delete" ]; then
echo "No projects eligible for deletion."
exit 0
fi
echo "Projects that will be deleted:"
echo "$projects_to_delete" | jq -r '.id'
if [ "$DRY_RUN" = "false" ]; then
echo "$projects_to_delete" | jq -r '.id' | while read -r project_id; do
echo "Deleting project: $project_id"
curl -s -X DELETE "$BASE_URL/projects/$project_id" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
done
else
echo "Dry run enabled — no projects were deleted."
fi
bench:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

3
Cargo.lock generated
View File

@@ -1284,7 +1284,6 @@ name = "compute_tools"
version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
@@ -1303,7 +1302,6 @@ dependencies = [
"futures",
"http 1.1.0",
"indexmap 2.0.1",
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
@@ -1422,7 +1420,6 @@ dependencies = [
"clap",
"comfy-table",
"compute_api",
"endpoint_storage",
"futures",
"http-utils",
"humantime",

View File

@@ -243,7 +243,6 @@ azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rus
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }

View File

@@ -1084,12 +1084,23 @@ RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
# Version 14 is now required by a few
#########################################################################################
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
ARG PG_VERSION
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions pgrx14"
#
# Version 14 is now required by a few
# This layer should be used as a base for new pgrx extensions,
# and eventually get merged with `rust-extensions-build`
#
@@ -1322,8 +1333,8 @@ ARG PG_VERSION
# Do not update without approve from proxy team
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
WORKDIR /ext-src
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.1.tar.gz -O pg_session_jwt.tar.gz && \
echo "62fec9e472cb805c53ba24a0765afdb8ea2720cfc03ae7813e61687b36d1b0ad pg_session_jwt.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.3.0.tar.gz -O pg_session_jwt.tar.gz && \
echo "19be2dc0b3834d643706ed430af998bb4c2cdf24b3c45e7b102bb3a550e8660c pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.9", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/version = "0.12.6"/version = "0.12.9"/g' pgrx-tests/Cargo.toml && \
@@ -1351,8 +1362,7 @@ COPY compute/patches/anon_v2.patch .
# This is an experimental extension, never got to real production.
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/2.1.0/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
echo "48e7f5ae2f1ca516df3da86c5c739d48dd780a4e885705704ccaad0faa89d6c0 pg_anon.tar.gz" | sha256sum --check && \
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/latest/postgresql_anonymizer-latest.tar.gz -O pg_anon.tar.gz && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt && \
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "=0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \

View File

@@ -10,7 +10,6 @@ default = []
testing = ["fail/failpoints"]
[dependencies]
async-compression.workspace = true
base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
@@ -28,7 +27,6 @@ flate2.workspace = true
futures.workspace = true
http.workspace = true
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true

View File

@@ -60,16 +60,12 @@ use utils::failpoint_support;
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
fn parse_remote_ext_base_url(arg: &str) -> Result<String> {
const FALLBACK_PG_EXT_GATEWAY_BASE_URL: &str =
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local";
Ok(if arg.starts_with("http") {
arg
fn parse_remote_ext_config(arg: &str) -> Result<String> {
if arg.starts_with("http") {
Ok(arg.trim_end_matches('/').to_string())
} else {
FALLBACK_PG_EXT_GATEWAY_BASE_URL
Ok("http://pg-ext-s3-gateway".to_string())
}
.to_owned())
}
#[derive(Parser)]
@@ -78,10 +74,8 @@ struct Cli {
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
pub pgbin: String,
/// The base URL for the remote extension storage proxy gateway.
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
#[arg(short = 'r', long, value_parser = parse_remote_ext_base_url, alias = "remote-ext-config")]
pub remote_ext_base_url: Option<String>,
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
pub remote_ext_config: Option<String>,
/// The port to bind the external listening HTTP server to. Clients running
/// outside the compute will talk to the compute through this port. Keep
@@ -170,7 +164,7 @@ fn main() -> Result<()> {
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port,
remote_ext_base_url: cli.remote_ext_base_url.clone(),
ext_remote_storage: cli.remote_ext_config.clone(),
resize_swap_on_bind: cli.resize_swap_on_bind,
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
#[cfg(target_os = "linux")]
@@ -271,18 +265,4 @@ mod test {
fn verify_cli() {
Cli::command().debug_assert()
}
#[test]
fn parse_pg_ext_gateway_base_url() {
let arg = "http://pg-ext-s3-gateway2";
let result = super::parse_remote_ext_base_url(arg).unwrap();
assert_eq!(result, arg);
let arg = "pg-ext-s3-gateway";
let result = super::parse_remote_ext_base_url(arg).unwrap();
assert_eq!(
result,
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local"
);
}
}

View File

@@ -348,7 +348,6 @@ async fn run_dump_restore(
"--no-security-labels".to_string(),
"--no-subscriptions".to_string(),
"--no-tablespaces".to_string(),
"--no-event-triggers".to_string(),
// format
"--format".to_string(),
"directory".to_string(),

View File

@@ -1,26 +1,4 @@
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
};
use futures::StreamExt;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::collections::HashMap;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
@@ -29,6 +7,24 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
};
use futures::StreamExt;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use tokio::spawn;
use tracing::{Instrument, debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
@@ -96,7 +92,7 @@ pub struct ComputeNodeParams {
pub internal_http_port: u16,
/// the address of extension storage proxy gateway
pub remote_ext_base_url: Option<String>,
pub ext_remote_storage: Option<String>,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -154,9 +150,6 @@ pub struct ComputeState {
/// set up the span relationship ourselves.
pub startup_span: Option<tracing::span::Span>,
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_offload_state: LfcOffloadState,
pub metrics: ComputeMetrics,
}
@@ -170,8 +163,6 @@ impl ComputeState {
pspec: None,
startup_span: None,
metrics: ComputeMetrics::default(),
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
}
}
@@ -207,8 +198,6 @@ pub struct ParsedSpec {
pub pageserver_connstr: String,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
pub endpoint_storage_addr: Option<SocketAddr>,
pub endpoint_storage_token: Option<String>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
@@ -262,18 +251,6 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.or(Err("invalid timeline id"))?
};
let endpoint_storage_addr: Option<SocketAddr> = spec
.endpoint_storage_addr
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr"))
.unwrap_or_default()
.parse()
.ok();
let endpoint_storage_token = spec
.endpoint_storage_token
.clone()
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
Ok(ParsedSpec {
spec,
pageserver_connstr,
@@ -281,8 +258,6 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
storage_auth_token,
tenant_id,
timeline_id,
endpoint_storage_addr,
endpoint_storage_token,
})
}
}
@@ -330,39 +305,11 @@ struct StartVmMonitorResult {
impl ComputeNode {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
let mut conn_conf = postgres::config::Config::from_str(connstr)
let conn_conf = postgres::config::Config::from_str(connstr)
.context("cannot build postgres config from connstr")?;
let mut tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
.context("cannot build tokio postgres config from connstr")?;
// Users can set some configuration parameters per database with
// ALTER DATABASE ... SET ...
//
// There are at least these parameters:
//
// - role=some_other_role
// - default_transaction_read_only=on
// - statement_timeout=1, i.e., 1ms, which will cause most of the queries to fail
// - search_path=non_public_schema, this should be actually safe because
// we don't call any functions in user databases, but better to always reset
// it to public.
//
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
//
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
// as well. After rolling out this code, we can remove this parameter from control plane.
// In the meantime, double-passing is fine, the last value is applied.
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
let options = match conn_conf.get_options() {
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
None => EXTRA_OPTIONS.to_string(),
};
conn_conf.options(&options);
tokio_conn_conf.options(&options);
let mut new_state = ComputeState::new();
if let Some(spec) = config.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
@@ -789,9 +736,6 @@ impl ComputeNode {
// Log metrics so that we can search for slow operations in logs
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
if pspec.spec.prewarm_lfc_on_startup {
self.prewarm_lfc();
}
Ok(())
}
@@ -1478,20 +1422,15 @@ impl ComputeNode {
Err(e) => match e.code() {
Some(&SqlState::INVALID_PASSWORD)
| Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => {
// Connect with `zenith_admin` if `cloud_admin` could not authenticate
// Connect with zenith_admin if cloud_admin could not authenticate
info!(
"cannot connect to Postgres: {}, retrying with 'zenith_admin' username",
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
e
);
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
zenith_admin_conf.application_name("compute_ctl:apply_config");
zenith_admin_conf.user("zenith_admin");
// It doesn't matter what were the options before, here we just want
// to connect and create a new superuser role.
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
zenith_admin_conf.options(ZENITH_OPTIONS);
let mut client =
zenith_admin_conf.connect(NoTls)
.context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?;
@@ -1657,7 +1596,9 @@ impl ComputeNode {
self.pg_reload_conf()?;
if spec.mode == ComputeMode::Primary {
let conf = self.get_tokio_conn_conf(Some("compute_ctl:reconfigure"));
let mut conf =
tokio_postgres::Config::from_str(self.params.connstr.as_str()).unwrap();
conf.application_name("apply_config");
let conf = Arc::new(conf);
let spec = Arc::new(spec.clone());
@@ -1897,9 +1838,9 @@ LIMIT 100",
real_ext_name: String,
ext_path: RemotePath,
) -> Result<u64, DownloadError> {
let remote_ext_base_url =
let ext_remote_storage =
self.params
.remote_ext_base_url
.ext_remote_storage
.as_ref()
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
"Remote extensions storage is not configured",
@@ -1961,7 +1902,7 @@ LIMIT 100",
let download_size = extension_server::download_extension(
&real_ext_name,
&ext_path,
remote_ext_base_url,
ext_remote_storage,
&self.params.pgbin,
)
.await
@@ -1996,40 +1937,23 @@ LIMIT 100",
tokio::spawn(conn);
// TODO: support other types of grants apart from schemas?
// check the role grants first - to gracefully handle read-replicas.
let select = "SELECT privilege_type
FROM pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
JOIN pg_user users ON acl.grantee = users.usesysid
WHERE users.usename = $1
AND nspname = $2";
let rows = db_client
.query(select, &[role_name, schema_name])
.await
.with_context(|| format!("Failed to execute query: {select}"))?;
let already_granted: HashSet<String> = rows.into_iter().map(|row| row.get(0)).collect();
let grants = privileges
.iter()
.filter(|p| !already_granted.contains(p.as_str()))
// should not be quoted as it's part of the command.
// is already sanitized so it's ok
.map(|p| p.as_str())
.join(", ");
if !grants.is_empty() {
let query = format!(
"GRANT {} ON SCHEMA {} TO {}",
privileges
.iter()
// should not be quoted as it's part of the command.
// is already sanitized so it's ok
.map(|p| p.as_str())
.collect::<Vec<&'static str>>()
.join(", "),
// quote the schema and role name as identifiers to sanitize them.
let schema_name = schema_name.pg_quote();
let role_name = role_name.pg_quote();
let query = format!("GRANT {grants} ON SCHEMA {schema_name} TO {role_name}",);
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
}
schema_name.pg_quote(),
role_name.pg_quote(),
);
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
Ok(())
}
@@ -2087,7 +2011,7 @@ LIMIT 100",
&self,
spec: &ComputeSpec,
) -> Result<RemoteExtensionMetrics> {
if self.params.remote_ext_base_url.is_none() {
if self.params.ext_remote_storage.is_none() {
return Ok(RemoteExtensionMetrics {
num_ext_downloaded: 0,
largest_ext_size: 0,

View File

@@ -1,202 +0,0 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
use compute_api::responses::LfcOffloadState;
use compute_api::responses::LfcPrewarmState;
use http::StatusCode;
use reqwest::Client;
use std::sync::Arc;
use tokio::{io::AsyncReadExt, spawn};
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
pub struct LfcPrewarmStateWithProgress {
#[serde(flatten)]
base: LfcPrewarmState,
total: i32,
prewarmed: i32,
skipped: i32,
}
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
struct EndpointStoragePair {
url: String,
token: String,
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
};
let tenant_id = pspec.tenant_id;
let timeline_id = pspec.timeline_id;
let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
let Some(ref token) = pspec.endpoint_storage_token else {
bail!("pspec.endpoint_storage_token missing")
};
let token = token.clone();
Ok(EndpointStoragePair { url, token })
}
}
impl ComputeNode {
// If prewarm failed, we want to get overall number of segments as well as done ones.
// However, this function should be reliable even if querying postgres failed.
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
info!("requesting LFC prewarm state from postgres");
let mut state = LfcPrewarmStateWithProgress::default();
{
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
}
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
Ok(client) => client,
Err(err) => {
error!(%err, "connecting to postgres");
return state;
}
};
let row = match client
.query_one("select * from get_prewarm_info()", &[])
.await
{
Ok(row) => row,
Err(err) => {
error!(%err, "querying LFC prewarm status");
return state;
}
};
state.total = row.try_get(0).unwrap_or_default();
state.prewarmed = row.try_get(1).unwrap_or_default();
state.skipped = row.try_get(2).unwrap_or_default();
state
}
pub fn lfc_offload_state(&self) -> LfcOffloadState {
self.state.lock().unwrap().lfc_offload_state.clone()
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming =
std::mem::replace(state, LfcPrewarmState::Prewarming)
{
return false;
}
}
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl().await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
error: err.to_string(),
};
});
true
}
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
}
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
let status = res.status();
if status != StatusCode::OK {
bail!("{status} querying endpoint storage")
}
let mut uncompressed = Vec::new();
let lfc_state = res
.bytes()
.await
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())
}
/// Returns false if there is an offload request ongoing, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if let LfcOffloadState::Offloading =
std::mem::replace(state, LfcOffloadState::Offloading)
{
return false;
}
}
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.offload_lfc_impl().await else {
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
};
});
true
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
Err(err) => Err(err).context("writing to endpoint storage"),
}
}
}

View File

@@ -168,6 +168,35 @@ pub fn write_postgres_conf(
writeln!(file, "# Managed by compute_ctl: end")?;
}
// Always add pgaudit to shared_preload_libraries.
//
// This is needed to handle the downgrade scenario.
// pgaudit extension creates event triggers that require library to be loaded.
// so, once extension was installed it must always be present in shared_preload_libraries.
let mut extra_shared_preload_libraries = String::new();
let libs = {
// We don't distribute pgaudit in the testing image,
// and don't pass shared_preload_libraries via spec,
// so disable this logic there.
#[cfg(feature = "testing")]
{
String::new()
}
#[cfg(not(feature = "testing"))]
{
spec.cluster
.settings
.find("shared_preload_libraries")
.expect("shared_preload_libraries setting is missing in the spec")
}
};
#[cfg(not(feature = "testing"))]
if !libs.contains("pgaudit") {
extra_shared_preload_libraries.push_str(",pgaudit");
};
// If base audit logging is enabled, configure it.
// In this setup, the audit log will be written to the standard postgresql log.
//
@@ -177,29 +206,22 @@ pub fn write_postgres_conf(
// This way we always override the settings from the spec
// and don't allow the user or the control plane admin to change them.
match spec.audit_log_level {
ComputeAudit::Disabled => {}
ComputeAudit::Disabled => {
// this is the default, but let's be explicit
writeln!(file, "pgaudit.log='none'")?;
}
ComputeAudit::Log | ComputeAudit::Base => {
writeln!(file, "# Managed by compute_ctl base audit settings: start")?;
writeln!(file, "pgaudit.log='ddl,role'")?;
// Disable logging of catalog queries to reduce the noise
writeln!(file, "pgaudit.log_catalog=off")?;
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
let mut extra_shared_preload_libraries = String::new();
if !libs.contains("pgaudit") {
extra_shared_preload_libraries.push_str(",pgaudit");
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
)?;
} else {
// Typically, this should be unreacheable,
// because we always set at least some shared_preload_libraries in the spec
// but let's handle it explicitly anyway.
writeln!(file, "shared_preload_libraries='neon,pgaudit'")?;
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
)?;
writeln!(file, "# Managed by compute_ctl base audit settings: end")?;
}
ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => {
@@ -223,36 +245,20 @@ pub fn write_postgres_conf(
// TODO: tune this after performance testing
writeln!(file, "pgaudit.log_rotation_age=5")?;
// Enable audit logs for pg_session_jwt extension
writeln!(file, "pg_session_jwt.audit_log=on")?;
// Add audit shared_preload_libraries, if they are not present.
//
// The caller who sets the flag is responsible for ensuring that the necessary
// shared_preload_libraries are present in the compute image,
// otherwise the compute start will fail.
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
let mut extra_shared_preload_libraries = String::new();
if !libs.contains("pgaudit") {
extra_shared_preload_libraries.push_str(",pgaudit");
}
if !libs.contains("pgauditlogtofile") {
extra_shared_preload_libraries.push_str(",pgauditlogtofile");
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
)?;
} else {
// Typically, this should be unreacheable,
// because we always set at least some shared_preload_libraries in the spec
// but let's handle it explicitly anyway.
writeln!(
file,
"shared_preload_libraries='neon,pgaudit,pgauditlogtofile'"
)?;
if !libs.contains("pgauditlogtofile") {
extra_shared_preload_libraries.push_str(",pgauditlogtofile");
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
)?;
writeln!(
file,
"# Managed by compute_ctl compliance audit settings: end"

View File

@@ -158,14 +158,14 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
remote_ext_base_url: &str,
ext_remote_storage: &str,
pgbin: &str,
) -> Result<u64> {
info!("Download extension {:?} from {:?}", ext_name, ext_path);
// TODO add retry logic
let download_buffer =
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
Ok(buffer) => buffer,
Err(error_message) => {
return Err(anyhow::anyhow!(
@@ -272,8 +272,8 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
// Do request to extension storage proxy, e.g.,
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
// using HTTP GET and return the response body as bytes.
async fn download_extension_tar(remote_ext_base_url: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", remote_ext_base_url, ext_path);
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", ext_remote_storage, ext_path);
let filename = Path::new(ext_path)
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))

View File

@@ -1,10 +1,12 @@
use std::collections::HashSet;
use anyhow::{Result, anyhow};
use axum::{RequestExt, body::Body};
use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
};
use compute_api::requests::{COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope};
use compute_api::requests::ComputeClaims;
use futures::future::BoxFuture;
use http::{Request, Response, StatusCode};
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
@@ -23,14 +25,13 @@ pub(in crate::http) struct Authorize {
impl Authorize {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// Nothing is currently required
validation.required_spec_claims = HashSet::new();
validation.validate_exp = true;
// Unused by the control plane
validation.validate_nbf = false;
// Unused by the control plane
validation.validate_aud = false;
validation.set_audience(&[COMPUTE_AUDIENCE]);
// Nothing is currently required
validation.set_required_spec_claims(&[] as &[&str; 0]);
// Unused by the control plane
validation.validate_nbf = false;
Self {
compute_id,
@@ -63,47 +64,11 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
};
match data.claims.scope {
// TODO: We should validate audience for every token, but
// instead of this ad-hoc validation, we should turn
// [`Validation::validate_aud`] on. This is merely a stopgap
// while we roll out `aud` deployment. We return a 401
// Unauthorized because when we eventually do use
// [`Validation`], we will hit the above `Err` match arm which
// returns 401 Unauthorized.
Some(ComputeClaimsScope::Admin) => {
let Some(ref audience) = data.claims.audience else {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"missing audience in authorization token claims",
));
};
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid audience in authorization token claims",
));
}
}
// If the scope is not [`ComputeClaimsScope::Admin`], then we
// must validate the compute_id
_ => {
let Some(ref claimed_compute_id) = data.claims.compute_id else {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"missing compute_id in authorization token claims",
));
};
if *claimed_compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"invalid compute ID in authorization token claims",
));
}
}
if data.claims.compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid compute ID in authorization token claims",
));
}
// Make claims available to any subsequent middleware or request

View File

@@ -22,7 +22,7 @@ pub(in crate::http) async fn download_extension(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
// Don't even try to download extensions if no remote storage is configured
if compute.params.remote_ext_base_url.is_none() {
if compute.params.ext_remote_storage.is_none() {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"remote storage is not configured",

View File

@@ -1,39 +0,0 @@
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
Json(compute.lfc_prewarm_state().await)
}
// Following functions are marked async for axum, as it's more convenient than wrapping these
// in async lambdas at call site
pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadState> {
Json(compute.lfc_offload_state())
}
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(
StatusCode::TOO_MANY_REQUESTS,
"Multiple requests for prewarm are not allowed",
)
}
}
pub(in crate::http) async fn offload(compute: Compute) -> Response {
if compute.offload_lfc() {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(
StatusCode::TOO_MANY_REQUESTS,
"Multiple requests for prewarm offload are not allowed",
)
}
}

View File

@@ -11,7 +11,6 @@ pub(in crate::http) mod extensions;
pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod status;

View File

@@ -23,7 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, lfc, metrics, metrics_json, status, terminate,
grants, insights, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -85,8 +85,6 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))

View File

@@ -11,7 +11,6 @@ pub mod http;
pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;

View File

@@ -1,7 +1,7 @@
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounter, IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -97,24 +97,6 @@ pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::
.expect("failed to define a metric")
});
/// Needed as neon.file_cache_prewarm_batch == 0 doesn't mean we never tried to prewarm.
/// On the other hand, LFC_PREWARMED_PAGES is excessive as we can GET /lfc/prewarm
pub(crate) static LFC_PREWARM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarm_requests_total",
"Total number of LFC prewarm requests made by compute_ctl",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOAD_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offload_requests_total",
"Total number of LFC offload requests made by compute_ctl",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -124,7 +106,5 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics.extend(LFC_PREWARM_REQUESTS.collect());
metrics.extend(LFC_OFFLOAD_REQUESTS.collect());
metrics
}

View File

@@ -30,7 +30,6 @@ mod pg_helpers_tests {
r#"fsync = off
wal_level = logical
hot_standby = on
prewarm_lfc_on_startup = off
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
wal_log_hints = on
log_connections = on

View File

@@ -41,7 +41,7 @@ storage_broker.workspace = true
http-utils.workspace = true
utils.workspace = true
whoami.workspace = true
endpoint_storage.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true
tracing.workspace = true

View File

@@ -16,11 +16,10 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
@@ -644,10 +643,9 @@ struct EndpointStartCmdArgs {
#[clap(
long,
help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
alias = "remote-ext-config"
help = "Configure the remote extensions storage proxy gateway to request for extensions."
)]
remote_ext_base_url: Option<String>,
remote_ext_config: Option<String>,
#[clap(
long,
@@ -707,9 +705,6 @@ struct EndpointStopCmdArgs {
struct EndpointGenerateJwtCmdArgs {
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
scope: Option<ComputeClaimsScope>,
}
#[derive(clap::Subcommand)]
@@ -1023,7 +1018,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
})
.collect(),
endpoint_storage: EndpointStorageConf {
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
port: ENDPOINT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
@@ -1415,16 +1410,9 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
EndpointCmd::Start(args) => {
let endpoint_id = &args.endpoint_id;
let pageserver_id = args.endpoint_pageserver_id;
let remote_ext_base_url = &args.remote_ext_base_url;
let remote_ext_config = &args.remote_ext_config;
let default_generation = env
.storage_controller
.timelines_onto_safekeepers
.then_some(1);
let safekeepers_generation = args
.safekeepers_generation
.or(default_generation)
.map(SafekeeperGeneration::new);
let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
@@ -1496,29 +1484,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
None
};
let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
+ Duration::from_secs(86400))
.as_secs();
let claims = endpoint_storage::claims::EndpointStorageClaims {
tenant_id: endpoint.tenant_id,
timeline_id: endpoint.timeline_id,
endpoint_id: endpoint_id.to_string(),
exp,
};
let endpoint_storage_token = env.generate_auth_token(&claims)?;
let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
println!("Starting existing endpoint {endpoint_id}...");
endpoint
.start(
&auth_token,
endpoint_storage_token,
endpoint_storage_addr,
safekeepers_generation,
safekeepers,
pageservers,
remote_ext_base_url.as_ref(),
remote_ext_config.as_ref(),
stripe_size.0 as usize,
args.create_test_user,
args.start_timeout,
@@ -1567,16 +1540,12 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
endpoint.stop(&args.mode, args.destroy)?;
}
EndpointCmd::GenerateJwt(args) => {
let endpoint = {
let endpoint_id = &args.endpoint_id;
cplane
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
};
let jwt = endpoint.generate_jwt(args.scope)?;
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
.endpoints
.get(endpoint_id)
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let jwt = endpoint.generate_jwt()?;
print!("{jwt}");
}

View File

@@ -45,9 +45,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
use compute_api::requests::{ComputeClaims, ConfigurationRequest};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
};
@@ -632,17 +630,9 @@ impl Endpoint {
}
/// Generate a JWT with the correct claims.
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
pub fn generate_jwt(&self) -> Result<String> {
self.env.generate_auth_token(&ComputeClaims {
audience: match scope {
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
_ => None,
},
compute_id: match scope {
Some(ComputeClaimsScope::Admin) => None,
_ => Some(self.endpoint_id.clone()),
},
scope,
compute_id: self.endpoint_id.clone(),
})
}
@@ -650,12 +640,10 @@ impl Endpoint {
pub async fn start(
&self,
auth_token: &Option<String>,
endpoint_storage_token: String,
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
remote_ext_base_url: Option<&String>,
remote_ext_config: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
start_timeout: Duration,
@@ -745,9 +733,6 @@ impl Endpoint {
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
endpoint_storage_addr: Some(endpoint_storage_addr),
endpoint_storage_token: Some(endpoint_storage_token),
prewarm_lfc_on_startup: false,
};
// this strange code is needed to support respec() in tests
@@ -825,8 +810,8 @@ impl Endpoint {
.stderr(logfile.try_clone()?)
.stdout(logfile);
if let Some(remote_ext_base_url) = remote_ext_base_url {
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
if let Some(remote_ext_config) = remote_ext_config {
cmd.args(["--remote-ext-config", remote_ext_config]);
}
let child = cmd.spawn()?;
@@ -918,7 +903,7 @@ impl Endpoint {
self.external_http_address.port()
),
)
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.bearer_auth(self.generate_jwt()?)
.send()
.await?;
@@ -995,7 +980,7 @@ impl Endpoint {
self.external_http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.bearer_auth(self.generate_jwt()?)
.body(
serde_json::to_string(&ConfigurationRequest {
spec,

View File

@@ -3,19 +3,17 @@ use crate::local_env::LocalEnv;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::net::SocketAddr;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
pub const ENDPOINT_STORAGE_DEFAULT_ADDR: SocketAddr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9993);
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct EndpointStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub addr: SocketAddr,
pub port: u16,
}
impl EndpointStorage {
@@ -24,7 +22,7 @@ impl EndpointStorage {
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
addr: env.endpoint_storage.listen_addr,
port: env.endpoint_storage.port,
}
}
@@ -33,7 +31,7 @@ impl EndpointStorage {
}
fn listen_addr(&self) -> Utf8PathBuf {
format!("{}:{}", self.addr.ip(), self.addr.port()).into()
format!("127.0.0.1:{}", self.port).into()
}
pub fn init(&self) -> Result<()> {

View File

@@ -20,9 +20,7 @@ use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::broker::StorageBroker;
use crate::endpoint_storage::{
ENDPOINT_STORAGE_DEFAULT_ADDR, ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage,
};
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -153,10 +151,10 @@ pub struct NeonLocalInitConf {
pub generate_local_ssl_certs: bool,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct EndpointStorageConf {
pub listen_addr: SocketAddr,
pub port: u16,
}
/// Broker config for cluster internal communication.
@@ -243,14 +241,6 @@ impl Default for NeonStorageControllerConf {
}
}
impl Default for EndpointStorageConf {
fn default() -> Self {
Self {
listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
}
}
}
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {

View File

@@ -10,8 +10,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest,
SafekeeperSchedulingPolicyRequest, SkSchedulingPolicy, TenantCreateRequest,
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse,
};
use pageserver_api::models::{
@@ -21,7 +20,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Method, Response};
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -571,11 +570,6 @@ impl StorageController {
let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
.expect("failed to generate jwt token");
args.push(format!("--peer-jwt-token={peer_jwt_token}"));
let claims = Claims::new(None, Scope::SafekeeperData);
let jwt_token =
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
args.push(format!("--safekeeper-jwt-token={jwt_token}"));
}
if let Some(public_key) = &self.public_key {
@@ -620,10 +614,6 @@ impl StorageController {
self.env.base_data_dir.display()
));
if self.env.safekeepers.iter().any(|sk| sk.auth_enabled) && self.private_key.is_none() {
anyhow::bail!("Safekeeper set up for auth but no private key specified");
}
if self.config.timelines_onto_safekeepers {
args.push("--timelines-onto-safekeepers".to_string());
}
@@ -650,10 +640,6 @@ impl StorageController {
)
.await?;
if self.config.timelines_onto_safekeepers {
self.register_safekeepers().await?;
}
Ok(())
}
@@ -757,23 +743,6 @@ impl StorageController {
where
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
let response = self.dispatch_inner(method, path, body).await?;
Ok(response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
}
/// Simple HTTP request wrapper for calling into storage controller
async fn dispatch_inner<RQ>(
&self,
method: reqwest::Method,
path: String,
body: Option<RQ>,
) -> anyhow::Result<Response>
where
RQ: Serialize + Sized,
{
// In the special case of the `storage_controller start` subcommand, we wish
// to use the API endpoint of the newly started storage controller in order
@@ -816,31 +785,10 @@ impl StorageController {
let response = builder.send().await?;
let response = response.error_from_body().await?;
Ok(response)
}
/// Register the safekeepers in the storage controller
#[instrument(skip(self))]
async fn register_safekeepers(&self) -> anyhow::Result<()> {
for sk in self.env.safekeepers.iter() {
let sk_id = sk.id;
let body = serde_json::json!({
"id": sk_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "127.0.0.1",
"port": sk.pg_port,
"http_port": sk.http_port,
"https_port": sk.https_port,
"version": 5957,
"availability_zone_id": format!("us-east-2b-{sk_id}"),
});
self.upsert_safekeeper(sk_id, body).await?;
self.safekeeper_scheduling_policy(sk_id, SkSchedulingPolicy::Active)
.await?;
}
Ok(())
Ok(response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
@@ -868,42 +816,6 @@ impl StorageController {
Ok(response.generation)
}
#[instrument(skip(self))]
pub async fn upsert_safekeeper(
&self,
node_id: NodeId,
request: serde_json::Value,
) -> anyhow::Result<()> {
let resp = self
.dispatch_inner::<serde_json::Value>(
Method::POST,
format!("control/v1/safekeeper/{node_id}"),
Some(request),
)
.await?;
if !resp.status().is_success() {
anyhow::bail!(
"setting scheduling policy unsuccessful for safekeeper {node_id}: {}",
resp.status()
);
}
Ok(())
}
#[instrument(skip(self))]
pub async fn safekeeper_scheduling_policy(
&self,
node_id: NodeId,
scheduling_policy: SkSchedulingPolicy,
) -> anyhow::Result<()> {
self.dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
Method::POST,
format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
)
.await
}
#[instrument(skip(self))]
pub async fn inspect(
&self,

View File

@@ -12,7 +12,6 @@ ERROR: invalid JWT encoding
-- Test creating a session with an expired JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjE3NDI1NjQ0MzIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MjQyNDIsInN1YiI6InVzZXIxMjMifQ.A6FwKuaSduHB9O7Gz37g0uoD_U9qVS0JNtT7YABGVgB7HUD1AMFc9DeyhNntWBqncg8k5brv-hrNTuUh5JYMAw');
ERROR: Token used after it has expired
DETAIL: exp=1742564432
-- Test creating a session with a valid JWT
SELECT auth.jwt_session_init('eyJhbGciOiJFZERTQSJ9.eyJleHAiOjQ4OTYxNjQyNTIsImlhdCI6MTc0MjU2NDI1MiwianRpIjo0MzQzNDMsInN1YiI6InVzZXIxMjMifQ.2TXVgjb6JSUq6_adlvp-m_SdOxZSyGS30RS9TLB0xu2N83dMSs2NybwE1NMU8Fb0tcAZR_ET7M2rSxbTrphfCg');
jwt_session_init

View File

@@ -343,7 +343,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = endpoint_storage::claims::EndpointStorageClaims {
let claims = endpoint_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
@@ -489,8 +489,16 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
let claims = endpoint_storage::claims::DeletePrefixClaims {
#[derive(Serialize)]
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<endpoint_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
endpoint_id: parts.get(3).map(ToString::to_string),

View File

@@ -1,52 +0,0 @@
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use utils::id::{EndpointId, TenantId, TimelineId};
/// Claims to add, remove, or retrieve endpoint data. Used by compute_ctl
#[derive(Deserialize, Serialize, PartialEq)]
pub struct EndpointStorageClaims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
/// Claims to remove tenant, timeline, or endpoint data. Used by control plane
#[derive(Deserialize, Serialize, PartialEq)]
pub struct DeletePrefixClaims {
pub tenant_id: TenantId,
/// None when tenant is deleted (endpoint_id is also None in this case)
pub timeline_id: Option<TimelineId>,
/// None when timeline is deleted
pub endpoint_id: Option<EndpointId>,
pub exp: u64,
}
impl Display for EndpointStorageClaims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"EndpointClaims(tenant_id={} timeline_id={} endpoint_id={} exp={})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
impl Display for DeletePrefixClaims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DeletePrefixClaims(tenant_id={} timeline_id={} endpoint_id={}, exp={})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.exp
)
}
}

View File

@@ -1,5 +1,3 @@
pub mod claims;
use crate::claims::{DeletePrefixClaims, EndpointStorageClaims};
use anyhow::Result;
use axum::extract::{FromRequestParts, Path};
use axum::response::{IntoResponse, Response};
@@ -15,7 +13,7 @@ use std::result::Result as StdResult;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use utils::id::{EndpointId, TenantId, TimelineId};
use utils::id::{TenantId, TimelineId};
// simplified version of utils::auth::JwtAuth
pub struct JwtAuth {
@@ -81,6 +79,26 @@ pub struct Storage {
pub max_upload_file_limit: usize,
}
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
#[derive(Deserialize, Serialize, PartialEq)]
pub struct Claims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
impl Display for Claims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
#[derive(Deserialize, Serialize)]
struct KeyRequest {
tenant_id: TenantId,
@@ -89,13 +107,6 @@ struct KeyRequest {
path: String,
}
#[derive(Deserialize, Serialize, PartialEq)]
struct PrefixKeyRequest {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<EndpointId>,
}
#[derive(Debug, PartialEq)]
pub struct S3Path {
pub path: RemotePath,
@@ -154,7 +165,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: EndpointStorageClaims = state
let claims: Claims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
@@ -167,7 +178,7 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
path.endpoint_id.clone()
};
let route = EndpointStorageClaims {
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id,
@@ -182,13 +193,38 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
}
}
#[derive(Deserialize, Serialize, PartialEq)]
pub struct PrefixKeyPath {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub endpoint_id: Option<EndpointId>,
}
impl Display for PrefixKeyPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string())
)
}
}
#[derive(Debug, PartialEq)]
pub struct PrefixS3Path {
pub path: RemotePath,
}
impl From<&DeletePrefixClaims> for PrefixS3Path {
fn from(path: &DeletePrefixClaims) -> Self {
impl From<&PrefixKeyPath> for PrefixS3Path {
fn from(path: &PrefixKeyPath) -> Self {
let timeline_id = path
.timeline_id
.as_ref()
@@ -214,27 +250,21 @@ impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path) = parts
.extract::<Path<PrefixKeyRequest>>()
.extract::<Path<PrefixKeyPath>>()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: DeletePrefixClaims = state
let claims: PrefixKeyPath = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "invalid token"))?;
let route = DeletePrefixClaims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id: path.endpoint_id,
exp: claims.exp,
};
if route != claims {
return Err(unauthorized(route, claims));
if path != claims {
return Err(unauthorized(path, claims));
}
Ok((&route).into())
Ok((&path).into())
}
}
@@ -267,7 +297,7 @@ mod tests {
#[test]
fn s3_path() {
let auth = EndpointStorageClaims {
let auth = Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
@@ -297,11 +327,10 @@ mod tests {
#[test]
fn prefix_s3_path() {
let mut path = DeletePrefixClaims {
let mut path = PrefixKeyPath {
tenant_id: TENANT_ID,
timeline_id: None,
endpoint_id: None,
exp: 0,
};
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
assert_eq!(

View File

@@ -1,58 +1,16 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use crate::privilege::Privilege;
use crate::responses::ComputeCtlConfig;
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
/// The value to place in the [`ComputeClaims::audience`] claim.
pub static COMPUTE_AUDIENCE: &str = "compute";
/// Available scopes for a compute's JWT.
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ComputeClaimsScope {
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
/// facilities.
Admin,
}
impl FromStr for ComputeClaimsScope {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"admin" => Ok(ComputeClaimsScope::Admin),
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
}
}
}
/// When making requests to the `compute_ctl` external HTTP server, the client
/// must specify a set of claims in `Authorization` header JWTs such that
/// `compute_ctl` can authorize the request.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename = "snake_case")]
pub struct ComputeClaims {
/// The compute ID that will validate the token. The only case in which this
/// can be [`None`] is if [`Self::scope`] is
/// [`ComputeClaimsScope::Admin`].
pub compute_id: Option<String>,
/// The scope of what the token authorizes.
pub scope: Option<ComputeClaimsScope>,
/// The recipient the token is intended for.
///
/// See [RFC 7519](https://www.rfc-editor.org/rfc/rfc7519#section-4.1.3) for
/// more information.
///
/// TODO: Remove the [`Option`] wrapper when control plane learns to send
/// the claim.
#[serde(rename = "aud")]
pub audience: Option<Vec<String>>,
pub compute_id: String,
}
/// Request of the /configure API

View File

@@ -46,30 +46,6 @@ pub struct ExtensionInstallResponse {
pub version: ExtVersion,
}
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState {
#[default]
NotPrewarmed,
Prewarming,
Completed,
Failed {
error: String,
},
}
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState {
#[default]
NotOffloaded,
Offloading,
Completed,
Failed {
error: String,
},
}
/// Response of the /status API
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]

View File

@@ -172,15 +172,6 @@ pub struct ComputeSpec {
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
pub logs_export_host: Option<String>,
/// Address of endpoint storage service
pub endpoint_storage_addr: Option<String>,
/// JWT for authorizing requests to endpoint storage service
pub endpoint_storage_token: Option<String>,
/// If true, download LFC state from endpoint_storage and pass it to Postgres on startup
#[serde(default)]
pub prewarm_lfc_on_startup: bool,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.

View File

@@ -84,11 +84,6 @@
"value": "on",
"vartype": "bool"
},
{
"name": "prewarm_lfc_on_startup",
"value": "off",
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501",

View File

@@ -16,7 +16,6 @@ pub struct Collector {
const NMETRICS: usize = 2;
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
// SAFETY: libc::sysconf is safe, it merely returns a value.
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");

View File

@@ -182,7 +182,6 @@ pub struct ConfigToml {
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
pub timeline_import_config: TimelineImportConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -301,12 +300,6 @@ impl From<OtelExporterProtocol> for tracing_utils::Protocol {
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TimelineImportConfig {
pub import_job_concurrency: NonZeroUsize,
pub import_job_soft_size_limit: NonZeroUsize,
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -666,10 +659,6 @@ impl Default for ConfigToml {
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
timeline_import_config: TimelineImportConfig {
import_job_concurrency: NonZeroUsize::new(128).unwrap(),
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
},
}
}
}

View File

@@ -295,9 +295,6 @@ pub struct TenantId(Id);
id_newtype!(TenantId);
/// If needed, reuse small string from proxy/src/types.rc
pub type EndpointId = String;
// A pair uniquely identifying Neon instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TenantTimelineId {

View File

@@ -17,7 +17,7 @@ impl std::fmt::Display for RateLimitStats {
}
impl RateLimit {
pub const fn new(interval: Duration) -> Self {
pub fn new(interval: Duration) -> Self {
Self {
last: None,
interval,

View File

@@ -230,8 +230,6 @@ pub struct PageServerConf {
/// such as authentication requirements for HTTP and PostgreSQL APIs.
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
}
/// Token for authentication to safekeepers
@@ -406,7 +404,6 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
} = config_toml;
let mut conf = PageServerConf {
@@ -460,7 +457,6 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -1038,23 +1038,21 @@ impl PageServerHandler {
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
request_id = %req.hdr.reqid,
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since,
not_modified_since_lsn = %req.hdr.not_modified_since
)
}};
($shard_id:expr) => {{
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
request_id = %req.hdr.reqid,
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since,
shard_id = %$shard_id,
shard_id = %$shard_id
)
}};
}

View File

@@ -40,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::context::{PerfInstrumentFutureExt, RequestContext};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
@@ -275,30 +275,24 @@ impl Timeline {
continue;
}
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
};
@@ -314,17 +308,6 @@ impl Timeline {
let key = rel_block_to_key(*tag, *blknum);
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_BATCH",
batch_size = %page_count,
)
})
.attached_child();
let key_slots = keys_slots.entry(key).or_default();
key_slots.push((response_slot_idx, ctx));
@@ -340,7 +323,14 @@ impl Timeline {
let query = VersionedKeySpaceQuery::scattered(query);
let res = self
.get_vectored(query, io_concurrency, ctx)
.maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
.maybe_perf_instrument(ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"GET_BATCH",
batch_size = %page_count,
)
})
.await;
match res {

View File

@@ -94,23 +94,10 @@ impl Header {
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
Other(anyhow::Error),
}
impl WriteBlobError {
pub fn is_cancel(&self) -> bool {
match self {
WriteBlobError::Flush(e) => e.is_cancel(),
WriteBlobError::Other(_) => false,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
WriteBlobError::Flush(e) => e.into_anyhow(),
WriteBlobError::Other(e) => e,
}
}
WriteBlobRaw(anyhow::Error),
}
impl BlockCursor<'_> {
@@ -340,9 +327,7 @@ where
return (
(
io_buf.slice_len(),
Err(WriteBlobError::Other(anyhow::anyhow!(
"blob too large ({len} bytes)"
))),
Err(WriteBlobError::BlobTooLarge { len }),
),
srcbuf,
);
@@ -406,7 +391,7 @@ where
// Verify the header, to ensure we don't write invalid/corrupt data.
let header = match Header::decode(&raw_with_header)
.context("decoding blob header")
.map_err(WriteBlobError::Other)
.map_err(WriteBlobError::WriteBlobRaw)
{
Ok(header) => header,
Err(err) => return (raw_with_header, Err(err)),
@@ -416,7 +401,7 @@ where
let raw_len = raw_with_header.len();
return (
raw_with_header,
Err(WriteBlobError::Other(anyhow::anyhow!(
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
"header length mismatch: {header_total_len} != {raw_len}"
))),
);

View File

@@ -2,7 +2,6 @@
pub mod batch_split_writer;
pub mod delta_layer;
pub mod errors;
pub mod filter_iterator;
pub mod image_layer;
pub mod inmemory_layer;

View File

@@ -10,7 +10,6 @@ use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use super::errors::PutError;
use super::layer::S3_UPLOAD_LIMIT;
use super::{
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
@@ -236,7 +235,7 @@ impl<'a> SplitImageLayerWriter<'a> {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -254,8 +253,7 @@ impl<'a> SplitImageLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?;
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
@@ -348,7 +346,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -368,8 +366,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?,
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
@@ -389,8 +386,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?;
.await?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
@@ -400,11 +396,11 @@ impl<'a> SplitDeltaLayerWriter<'a> {
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
return Err(PutError::Other(anyhow::anyhow!(
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
)));
);
}
}
self.last_key_written = key;

View File

@@ -55,7 +55,6 @@ use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use super::errors::PutError;
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
@@ -478,15 +477,12 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
let (_, res) = self
.put_value_bytes(
key,
lsn,
Value::ser(&val)
.map_err(anyhow::Error::new)
.map_err(PutError::Other)?
.slice_len(),
Value::ser(&val)?.slice_len(),
val.will_init(),
ctx,
)
@@ -501,7 +497,7 @@ impl DeltaLayerWriterInner {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), PutError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{
@@ -517,24 +513,19 @@ impl DeltaLayerWriterInner {
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let res = res.map_err(PutError::WriteBlob);
let off = match res {
Ok((off, _)) => off,
Err(e) => return (val, Err(e)),
Err(e) => return (val, Err(anyhow::anyhow!(e))),
};
let blob_ref = BlobRef::new(off, will_init);
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
let res = self
.tree
.append(&delta_key.0, blob_ref.0)
.map_err(anyhow::Error::new)
.map_err(PutError::Other);
let res = self.tree.append(&delta_key.0, blob_ref.0);
self.num_keys += 1;
(val, res)
(val, res.map_err(|e| anyhow::anyhow!(e)))
}
fn size(&self) -> u64 {
@@ -703,7 +694,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
self.inner
.as_mut()
.unwrap()
@@ -718,7 +709,7 @@ impl DeltaLayerWriter {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), PutError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{
@@ -1450,6 +1441,14 @@ impl DeltaLayerInner {
offset
}
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
@@ -1635,6 +1634,7 @@ pub(crate) mod test {
use crate::tenant::disk_btree::tests::TestDisk;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
/// Construct an index for a fictional delta layer and and then
@@ -2311,7 +2311,8 @@ pub(crate) mod test {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut num_items = 0;
for _ in 0..3 {
iter.next_batch().await.unwrap();
@@ -2328,7 +2329,8 @@ pub(crate) mod test {
iter.key_values_batch.clear();
}
// Test if the result is correct
let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
assert_delta_iter_equal(&mut iter, &test_deltas).await;
}
}

View File

@@ -1,24 +0,0 @@
use crate::tenant::blob_io::WriteBlobError;
#[derive(Debug, thiserror::Error)]
pub enum PutError {
#[error(transparent)]
WriteBlob(WriteBlobError),
#[error(transparent)]
Other(anyhow::Error),
}
impl PutError {
pub fn is_cancel(&self) -> bool {
match self {
PutError::WriteBlob(e) => e.is_cancel(),
PutError::Other(_) => false,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
PutError::WriteBlob(e) => e.into_anyhow(),
PutError::Other(e) => e,
}
}
}

View File

@@ -157,7 +157,7 @@ mod tests {
.await
.unwrap();
let merge_iter = MergeIterator::create_for_testing(
let merge_iter = MergeIterator::create(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,
@@ -182,7 +182,7 @@ mod tests {
result.extend(test_deltas1[90..100].iter().cloned());
assert_filter_iter_equal(&mut filter_iter, &result).await;
let merge_iter = MergeIterator::create_for_testing(
let merge_iter = MergeIterator::create(
&[resident_layer_1.get_as_delta(&ctx).await.unwrap()],
&[],
&ctx,

View File

@@ -53,7 +53,6 @@ use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use super::errors::PutError;
use super::layer_name::ImageLayerName;
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
@@ -685,6 +684,14 @@ impl ImageLayerInner {
}
}
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
self.iter_with_options(
ctx,
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
)
}
pub(crate) fn iter_with_options<'a>(
&'a self,
ctx: &'a RequestContext,
@@ -843,14 +850,8 @@ impl ImageLayerWriterInner {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
if !self.key_range.contains(&key) {
return Err(PutError::Other(anyhow::anyhow!(
"key {:?} not in range {:?}",
key,
self.key_range
)));
}
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -860,7 +861,7 @@ impl ImageLayerWriterInner {
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res.map_err(PutError::WriteBlob)?;
let (off, compression_info) = res?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -872,10 +873,7 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree
.append(&keybuf, off)
.map_err(anyhow::Error::new)
.map_err(PutError::Other)?;
self.tree.append(&keybuf, off)?;
#[cfg(feature = "testing")]
{
@@ -1095,7 +1093,7 @@ impl ImageLayerWriter {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
@@ -1242,6 +1240,7 @@ mod test {
use crate::context::RequestContext;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{TenantShard, Timeline};
#[tokio::test]
@@ -1508,7 +1507,8 @@ mod test {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut iter = img_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut num_items = 0;
for _ in 0..3 {
iter.next_batch().await.unwrap();
@@ -1525,7 +1525,8 @@ mod test {
iter.key_values_batch.clear();
}
// Test if the result is correct
let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
let mut iter = img_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
}
}

View File

@@ -23,7 +23,7 @@ use super::{
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
};
use crate::config::PageServerConf;
use crate::context::{RequestContext, RequestContextBuilder};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -1076,17 +1076,24 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason,
)
})
.attached_child();
let ctx = if ctx.has_perf_span() {
let dl_ctx = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::LayerDownload)
.download_behavior(DownloadBehavior::Download)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason
)
})
.detached_child();
ctx.perf_follows_from(&dl_ctx);
dl_ctx
} else {
ctx.attached_child()
};
async move {
tracing::info!(%reason, "downloading on-demand");
@@ -1094,7 +1101,7 @@ impl LayerInner {
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
.download_init_and_wait(timeline, permit, ctx.attached_child())
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
@@ -1702,7 +1709,7 @@ impl DownloadError {
}
}
#[derive(Debug, PartialEq, Copy, Clone)]
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,
NotFile(std::fs::FileType),

View File

@@ -19,6 +19,14 @@ pub(crate) enum LayerRef<'a> {
}
impl<'a> LayerRef<'a> {
#[allow(dead_code)]
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
match self {
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
}
}
fn iter_with_options(
self,
ctx: &'a RequestContext,
@@ -314,28 +322,6 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
}
impl<'a> MergeIterator<'a> {
#[cfg(test)]
pub(crate) fn create_for_testing(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
/// Create a new merge iterator with custom options.
///
/// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
/// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
/// the buffer does not take too much memory.
///
/// The default options for L0 compactions are:
/// - max_read_size: 1024 * 8192 (8MB)
/// - max_batch_size: 1024
///
/// The default options for gc-compaction are:
/// - max_read_size: 128 * 8192 (1MB)
/// - max_batch_size: 128
pub fn create_with_options(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
@@ -365,6 +351,14 @@ impl<'a> MergeIterator<'a> {
}
}
pub fn create(
deltas: &[&'a DeltaLayerInner],
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
}
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
while let Some(mut iter) = self.heap.peek_mut() {
if !iter.is_loaded() {
@@ -483,7 +477,7 @@ mod tests {
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
.await
.unwrap();
let mut merge_iter = MergeIterator::create_for_testing(
let mut merge_iter = MergeIterator::create(
&[
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
@@ -555,7 +549,7 @@ mod tests {
let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
.await
.unwrap();
let mut merge_iter = MergeIterator::create_for_testing(
let mut merge_iter = MergeIterator::create(
&[
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
@@ -676,7 +670,7 @@ mod tests {
// Test with different layer order for MergeIterator::create to ensure the order
// is stable.
let mut merge_iter = MergeIterator::create_for_testing(
let mut merge_iter = MergeIterator::create(
&[
resident_layer_4.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
@@ -688,7 +682,7 @@ mod tests {
);
assert_merge_iter_equal(&mut merge_iter, &expect).await;
let mut merge_iter = MergeIterator::create_for_testing(
let mut merge_iter = MergeIterator::create(
&[
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
resident_layer_4.get_as_delta(&ctx).await.unwrap(),

View File

@@ -340,7 +340,7 @@ pub(crate) fn log_compaction_error(
} else {
match level {
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
Level::ERROR => error!("Compaction failed: {err:?}"),
Level::ERROR => error!("Compaction failed: {err:#}"),
Level::INFO => info!("Compaction failed: {err:#}"),
level => unimplemented!("unexpected level {level:?}"),
}

View File

@@ -987,16 +987,6 @@ impl From<PageReconstructError> for CreateImageLayersError {
}
}
impl From<super::storage_layer::errors::PutError> for CreateImageLayersError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CreateImageLayersError::Cancelled
} else {
CreateImageLayersError::Other(e.into_anyhow())
}
}
}
impl From<GetVectoredError> for CreateImageLayersError {
fn from(e: GetVectoredError) -> Self {
match e {
@@ -2127,14 +2117,22 @@ impl Timeline {
debug_assert_current_span_has_tenant_and_timeline_id();
// Regardless of whether we're going to try_freeze_and_flush
// or not, stop ingesting any more data.
// or not, stop ingesting any more data. Walreceiver only provides
// cancellation but no "wait until gone", because it uses the Timeline::gate.
// So, only after the self.gate.close() below will we know for sure that
// no walreceiver tasks are left.
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
// data during the call to `self.freeze_and_flush()` below.
// That's not ideal, but, we don't have the concept of a ChildGuard,
// which is what we'd need to properly model early shutdown of the walreceiver
// task sub-tree before the other Timeline task sub-trees.
let walreceiver = self.walreceiver.lock().unwrap().take();
tracing::debug!(
is_some = walreceiver.is_some(),
"Waiting for WalReceiverManager..."
);
if let Some(walreceiver) = walreceiver {
walreceiver.shutdown().await;
walreceiver.cancel();
}
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
@@ -5925,16 +5923,6 @@ impl From<layer_manager::Shutdown> for CompactionError {
}
}
impl From<super::storage_layer::errors::PutError> for CompactionError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CompactionError::ShuttingDown
} else {
CompactionError::Other(e.into_anyhow())
}
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);

View File

@@ -1277,8 +1277,6 @@ impl Timeline {
return Ok(CompactionOutcome::YieldForL0);
}
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
// 2. Repartition and create image layers if necessary
match self
.repartition(
@@ -1289,7 +1287,7 @@ impl Timeline {
)
.await
{
Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::from(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
@@ -1343,10 +1341,6 @@ impl Timeline {
}
}
Ok(_) => {
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
}
// Suppress errors when cancelled.
Err(_) if self.cancel.is_cancelled() => {}
Err(err) if err.is_cancel() => {}
@@ -2000,13 +1994,7 @@ impl Timeline {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create_with_options(
&deltas,
&[],
ctx,
1024 * 8192, /* 8 MiB buffer per layer iterator */
1024,
)
MergeIterator::create(&deltas, &[], ctx)
};
// This iterator walks through all keys and is needed to calculate size used by each key
@@ -2210,7 +2198,8 @@ impl Timeline {
.as_mut()
.unwrap()
.put_value(key, lsn, value, ctx)
.await?;
.await
.map_err(CompactionError::Other)?;
} else {
let owner = self.shard_identity.get_shard_number(&key);
@@ -2839,7 +2828,7 @@ impl Timeline {
Ok(())
}
/// Check to bail out of gc compaction early if it would use too much memory.
/// Check if the memory usage is within the limit.
async fn check_memory_usage(
self: &Arc<Self>,
layer_selection: &[Layer],
@@ -2852,8 +2841,7 @@ impl Timeline {
let layer_desc = layer.layer_desc();
if layer_desc.is_delta() {
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
// Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
// use 3MB layer size and we need to account for that).
// Multiply the layer size so that tests can pass.
estimated_memory_usage_mb +=
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
num_delta_layers += 1;
@@ -3612,13 +3600,6 @@ impl Timeline {
last_key = Some(key);
}
accumulated_values.push((key, lsn, val));
if accumulated_values.len() >= 65536 {
// Assume all of them are images, that would be 512MB of data in memory for a single key.
return Err(CompactionError::Other(anyhow!(
"too many values for a single key, giving up gc-compaction"
)));
}
} else {
let last_key: &mut Key = last_key.as_mut().unwrap();
stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction

View File

@@ -149,7 +149,14 @@ pub async fn doit(
}
.await?;
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
flow::run(
timeline.clone(),
base_lsn,
control_file,
storage.clone(),
ctx,
)
.await?;
//
// Communicate that shard is done.

View File

@@ -34,9 +34,7 @@ use std::sync::Arc;
use anyhow::{bail, ensure};
use bytes::Bytes;
use futures::stream::FuturesOrdered;
use itertools::Itertools;
use pageserver_api::config::TimelineImportConfig;
use pageserver_api::key::{
CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -48,9 +46,8 @@ use pageserver_api::shard::ShardIdentity;
use postgres_ffi::relfile_utils::parse_relfilename;
use postgres_ffi::{BLCKSZ, pg_constants};
use remote_storage::RemotePath;
use tokio::sync::Semaphore;
use tokio_stream::StreamExt;
use tracing::{debug, instrument};
use tokio::task::JoinSet;
use tracing::{Instrument, debug, info_span, instrument};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
@@ -66,40 +63,38 @@ use crate::tenant::storage_layer::{ImageLayerWriter, Layer};
pub async fn run(
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
control_file: ControlFile,
storage: RemoteStorageWrapper,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let planner = Planner {
Flow {
timeline,
pgdata_lsn,
control_file,
storage: storage.clone(),
shard: timeline.shard_identity,
tasks: Vec::default(),
};
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
plan.execute(timeline, import_config, ctx).await
tasks: Vec::new(),
storage,
}
.run(ctx)
.await
}
struct Planner {
struct Flow {
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
control_file: ControlFile,
storage: RemoteStorageWrapper,
shard: ShardIdentity,
tasks: Vec<AnyImportTask>,
storage: RemoteStorageWrapper,
}
struct Plan {
jobs: Vec<ChunkProcessingJob>,
}
impl Planner {
/// Creates an import plan
///
/// This function is and must remain pure: given the same input, it will generate the same import plan.
async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
impl Flow {
/// Perform the ingestion into [`Self::timeline`].
/// Assumes the timeline is empty (= no layers).
pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
self.pgdata_lsn = pgdata_lsn;
let datadir = PgDataDir::new(&self.storage).await?;
// Import dbdir (00:00:00 keyspace)
@@ -120,7 +115,7 @@ impl Planner {
}
// Import SLRUs
if self.shard.is_shard_zero() {
if self.timeline.tenant_shard_id.is_shard_zero() {
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
.await?;
@@ -171,16 +166,14 @@ impl Planner {
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
let mut current_chunk_size: usize = 0;
let mut jobs = Vec::new();
let mut parallel_jobs = Vec::new();
for task in std::mem::take(&mut self.tasks).into_iter() {
if current_chunk_size + task.total_size()
> import_config.import_job_soft_size_limit.into()
{
if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
let key_range = last_end_key..task.key_range().start;
jobs.push(ChunkProcessingJob::new(
parallel_jobs.push(ChunkProcessingJob::new(
key_range.clone(),
std::mem::take(&mut current_chunk),
pgdata_lsn,
&self,
));
last_end_key = key_range.end;
current_chunk_size = 0;
@@ -188,13 +181,45 @@ impl Planner {
current_chunk_size += task.total_size();
current_chunk.push(task);
}
jobs.push(ChunkProcessingJob::new(
parallel_jobs.push(ChunkProcessingJob::new(
last_end_key..Key::MAX,
current_chunk,
pgdata_lsn,
&self,
));
Ok(Plan { jobs })
// Start all jobs simultaneosly
let mut work = JoinSet::new();
// TODO: semaphore?
for job in parallel_jobs {
let ctx: RequestContext =
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
}
let mut results = Vec::new();
while let Some(result) = work.join_next().await {
match result {
Ok(res) => {
results.push(res);
}
Err(_joinset_err) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
}
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
@@ -241,7 +266,7 @@ impl Planner {
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
self.tasks
.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
self.shard,
*self.timeline.get_shard_identity(),
start_key..end_key,
&file.path,
self.storage.clone(),
@@ -264,7 +289,7 @@ impl Planner {
}
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
assert!(self.shard.is_shard_zero());
assert!(self.timeline.tenant_shard_id.is_shard_zero());
let segments = self.storage.listfilesindir(path).await?;
let segments: Vec<(String, u32, usize)> = segments
@@ -319,68 +344,6 @@ impl Planner {
}
}
impl Plan {
async fn execute(
self,
timeline: Arc<Timeline>,
import_config: &TimelineImportConfig,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut work = FuturesOrdered::new();
let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
let jobs_in_plan = self.jobs.len();
let mut jobs = self.jobs.into_iter().enumerate().peekable();
let mut results = Vec::new();
// Run import jobs concurrently up to the limit specified by the pageserver configuration.
// Note that we process completed futures in the oreder of insertion. This will be the
// building block for resuming imports across pageserver restarts or tenant migrations.
while results.len() < jobs_in_plan {
tokio::select! {
permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
let permit = permit.expect("never closed");
let (job_idx, job) = jobs.next().expect("we peeked");
let job_timeline = timeline.clone();
let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
work.push_back(tokio::task::spawn(async move {
let _permit = permit;
let res = job.run(job_timeline, &ctx).await;
(job_idx, res)
}));
},
maybe_complete_job_idx = work.next() => {
match maybe_complete_job_idx {
Some(Ok((_job_idx, res))) => {
results.push(res);
},
Some(Err(_)) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
}
None => {}
}
}
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
}
}
//
// dbdir iteration tools
//
@@ -750,6 +713,7 @@ impl From<ImportSlruBlocksTask> for AnyImportTask {
}
struct ChunkProcessingJob {
timeline: Arc<Timeline>,
range: Range<Key>,
tasks: Vec<AnyImportTask>,
@@ -757,24 +721,25 @@ struct ChunkProcessingJob {
}
impl ChunkProcessingJob {
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
assert!(pgdata_lsn.is_valid());
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
assert!(env.pgdata_lsn.is_valid());
Self {
timeline: env.timeline.clone(),
range,
tasks,
pgdata_lsn,
pgdata_lsn: env.pgdata_lsn,
}
}
async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut writer = ImageLayerWriter::new(
timeline.conf,
timeline.timeline_id,
timeline.tenant_shard_id,
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&timeline.gate,
timeline.cancel.clone(),
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;
@@ -786,20 +751,24 @@ impl ChunkProcessingJob {
let resident_layer = if nimages > 0 {
let (desc, path) = writer.finish(ctx).await?;
Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
} else {
// dropping the writer cleans up
return Ok(());
};
// this is sharing the same code as create_image_layers
let mut guard = timeline.layers.write().await;
let mut guard = self.timeline.layers.write().await;
guard
.open_mut()?
.track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
.track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
crate::tenant::timeline::drop_wlock(guard);
timeline
// Schedule the layer for upload but don't add barriers such as
// wait for completion or index upload, so we don't inhibit upload parallelism.
// TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
// TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
self.timeline
.remote_client
.schedule_layer_file_upload(resident_layer)?;

View File

@@ -63,7 +63,6 @@ pub struct WalReceiver {
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
cancel: CancellationToken,
task: tokio::task::JoinHandle<()>,
}
impl WalReceiver {
@@ -80,7 +79,7 @@ impl WalReceiver {
let loop_status = Arc::new(std::sync::RwLock::new(None));
let manager_status = Arc::clone(&loop_status);
let cancel = timeline.cancel.child_token();
let task = WALRECEIVER_RUNTIME.spawn({
WALRECEIVER_RUNTIME.spawn({
let cancel = cancel.clone();
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -121,25 +120,14 @@ impl WalReceiver {
Self {
manager_status,
cancel,
task,
}
}
#[instrument(skip_all, level = tracing::Level::DEBUG)]
pub async fn shutdown(self) {
pub fn cancel(&self) {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("cancelling walreceiver tasks");
self.cancel.cancel();
match self.task.await {
Ok(()) => debug!("Shutdown success"),
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// already logged by panic hook
}
Err(je) => {
error!("shutdown walreceiver task join error: {je}")
}
}
}
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {

View File

@@ -111,17 +111,13 @@ pub(crate) fn get() -> IoEngine {
use std::os::unix::prelude::FileExt;
use std::sync::atomic::{AtomicU8, Ordering};
#[cfg(target_os = "linux")]
use {std::time::Duration, tracing::info};
use super::owned_buffers_io::io_buf_ext::FullSlice;
use super::owned_buffers_io::slice::SliceMutExt;
use super::{FileGuard, Metadata};
#[cfg(target_os = "linux")]
pub(super) fn epoll_uring_error_to_std(
e: tokio_epoll_uring::Error<std::io::Error>,
) -> std::io::Error {
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
@@ -153,11 +149,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) =
retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async {
system.read(file_guard, offset, slice).await
})
.await;
let (resources, res) = system.read(file_guard, offset, slice).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -172,10 +164,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.fsync(file_guard).await
})
.await;
let (resources, res) = system.fsync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -193,10 +182,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.fdatasync(file_guard).await
})
.await;
let (resources, res) = system.fdatasync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -215,10 +201,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.statx(file_guard).await
})
.await;
let (resources, res) = system.statx(file_guard).await;
(
resources,
res.map_err(epoll_uring_error_to_std).map(Metadata::from),
@@ -241,7 +224,6 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
// TODO: ftruncate op for tokio-epoll-uring
// Don't forget to use retry_ecanceled_once
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
@@ -263,11 +245,8 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let ((file_guard, slice), res) = retry_ecanceled_once(
(file_guard, buf.into_raw_slice()),
async |(file_guard, buf)| system.write(file_guard, offset, buf).await,
)
.await;
let ((file_guard, slice), res) =
system.write(file_guard, offset, buf.into_raw_slice()).await;
(
(file_guard, FullSlice::must_new(slice)),
res.map_err(epoll_uring_error_to_std),
@@ -303,56 +282,6 @@ impl IoEngine {
}
}
/// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data,
/// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED.
/// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals.
/// Investigation ticket: <https://github.com/neondatabase/neon/issues/11446>
///
/// This function retries the operation once if it fails with ECANCELED.
/// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations.
#[cfg(target_os = "linux")]
pub(super) async fn retry_ecanceled_once<F, Fut, T, V>(
resources: T,
f: F,
) -> (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)
where
F: Fn(T) -> Fut,
Fut: std::future::Future<Output = (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)>,
T: Send,
V: Send,
{
let (resources, res) = f(resources).await;
let Err(e) = res else {
return (resources, res);
};
let tokio_epoll_uring::Error::Op(err) = e else {
return (resources, Err(e));
};
if err.raw_os_error() != Some(nix::libc::ECANCELED) {
return (resources, Err(tokio_epoll_uring::Error::Op(err)));
}
{
static RATE_LIMIT: std::sync::Mutex<utils::rate_limit::RateLimit> =
std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1)));
let mut guard = RATE_LIMIT.lock().unwrap();
guard.call2(|rate_limit_stats| {
info!(
%rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited"
);
});
drop(guard);
}
tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners
let (resources, res) = f(resources).await;
(resources, res)
}
pub(super) fn panic_operation_must_be_idempotent() {
panic!(
"unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)"
)
}
pub enum FeatureTestResult {
PlatformPreferred(IoEngineKind),
Worse {

View File

@@ -110,23 +110,18 @@ impl OpenOptions {
self
}
/// Don't use, `O_APPEND` is not supported.
pub fn append(&mut self, _append: bool) {
super::io_engine::panic_operation_must_be_idempotent();
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match &self.inner {
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async {
let res = system.open(path, x).await;
((), res)
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})
.await;
res.map_err(super::io_engine::epoll_uring_error_to_std)
}
}
}
@@ -145,9 +140,6 @@ impl OpenOptions {
}
pub fn custom_flags(mut self, flags: i32) -> Self {
if flags & nix::libc::O_APPEND != 0 {
super::io_engine::panic_operation_must_be_idempotent();
}
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);

View File

@@ -247,19 +247,6 @@ pub enum FlushTaskError {
Cancelled,
}
impl FlushTaskError {
pub fn is_cancel(&self) -> bool {
match self {
FlushTaskError::Cancelled => true,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
FlushTaskError::Cancelled => anyhow::anyhow!(self),
}
}
}
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBufAligned + Send + Sync,

View File

@@ -425,12 +425,15 @@ compact_prefetch_buffers(void)
* point inside and outside PostgreSQL.
*
* This still does throw errors when it receives malformed responses from PS.
*
* When we're not called from CHECK_FOR_INTERRUPTS (indicated by
* IsHandlingInterrupts) we also report we've ended prefetch receive work,
* just in case state tracking was lost due to an error in the sync getPage
* response code.
*/
void
communicator_prefetch_pump_state(void)
communicator_prefetch_pump_state(bool IsHandlingInterrupts)
{
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive != MyPState->ring_flush)
{
NeonResponse *response;
@@ -479,7 +482,9 @@ communicator_prefetch_pump_state(void)
}
}
END_PREFETCH_RECEIVE_WORK();
/* We never pump the prefetch state while handling other pages */
if (!IsHandlingInterrupts)
END_PREFETCH_RECEIVE_WORK();
communicator_reconfigure_timeout_if_needed();
}
@@ -667,10 +672,9 @@ prefetch_wait_for(uint64 ring_index)
Assert(MyPState->ring_unused > ring_index);
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive <= ring_index)
{
START_PREFETCH_RECEIVE_WORK();
entry = GetPrfSlot(MyPState->ring_receive);
Assert(entry->status == PRFS_REQUESTED);
@@ -679,18 +683,17 @@ prefetch_wait_for(uint64 ring_index)
result = false;
break;
}
END_PREFETCH_RECEIVE_WORK();
CHECK_FOR_INTERRUPTS();
}
if (result)
{
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
PrefetchRequest *slot = GetPrfSlot(ring_index);
result = slot->status == PRFS_RECEIVED;
return slot->status == PRFS_RECEIVED;
}
END_PREFETCH_RECEIVE_WORK();
return result;
return false;
;
}
@@ -717,7 +720,6 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
Assert(readpage_reentrant_guard);
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
@@ -800,7 +802,6 @@ communicator_prefetch_receive(BufferTag tag)
PrfHashEntry *entry;
PrefetchRequest hashkey;
Assert(readpage_reentrant_guard);
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
@@ -820,12 +821,8 @@ communicator_prefetch_receive(BufferTag tag)
void
prefetch_on_ps_disconnect(void)
{
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
MyPState->ring_flush = MyPState->ring_unused;
/* Prohibit callig of prefetch_pump_state */
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive < MyPState->ring_unused)
{
PrefetchRequest *slot;
@@ -854,9 +851,6 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
/* Restore guard */
readpage_reentrant_guard = save_readpage_reentrant_guard;
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -2515,7 +2509,7 @@ communicator_processinterrupts(void)
if (timeout_signaled)
{
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(true);
timeout_signaled = false;
communicator_reconfigure_timeout_if_needed();

View File

@@ -44,7 +44,7 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
void *buffer);
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(void);
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
#endif

View File

@@ -150,7 +150,7 @@ NeonWALReaderFree(NeonWALReader *state)
* fetched from timeline 'tli'.
*
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
* occurs, in which case 'err' has the description. Error always closes remote
* occurs, in which case 'err' has the desciption. Error always closes remote
* connection, if there was any, so socket subscription should be removed.
*
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with

View File

@@ -1179,7 +1179,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum += iterblocks;
}
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
return false;
}
@@ -1218,7 +1218,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
return false;
}
@@ -1262,7 +1262,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
*/
neon_log(SmgrTrace, "writeback noop");
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1315,7 +1315,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
}
/* Try to read PS results if they are available */
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
@@ -1339,7 +1339,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -1449,7 +1449,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
nblocks, PG_IOV_MAX);
/* Try to read PS results if they are available */
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
@@ -1480,7 +1480,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -1665,7 +1665,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1727,7 +1727,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1902,7 +1902,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
communicator_prefetch_pump_state();
communicator_prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1989,14 +1989,8 @@ neon_start_unlogged_build(SMgrRelation reln)
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
#if PG_MAJORVERSION_NUM >= 17
/*
* We have to disable this check for pg14-16 because sorted build of GIST index requires
* to perform unlogged build several times
*/
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
#endif
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;

View File

@@ -124,7 +124,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
else
{
wp->safekeepers_generation = INVALID_GENERATION;
host = wp->config->safekeepers_list;
}
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
@@ -757,7 +756,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.members.m[i];
if (sk_id->node_id == sk->greetResponse.nodeId)
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
{
/*
* If mconf or list of safekeepers to connect to changed (the
@@ -782,7 +781,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
if (sk_id->node_id == sk->greetResponse.nodeId)
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
{
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
{
@@ -1072,6 +1071,7 @@ RecvVoteResponse(Safekeeper *sk)
/* ready for elected message */
sk->state = SS_WAIT_ELECTED;
wp->n_votes++;
/* Are we already elected? */
if (wp->state == WPS_CAMPAIGN)
{

View File

@@ -845,6 +845,9 @@ typedef struct WalProposer
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;

View File

@@ -409,22 +409,14 @@ impl JwkCacheEntryLock {
if let Some(exp) = payload.expiration {
if now >= exp + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
exp.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
)));
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired));
}
}
if let Some(nbf) = payload.not_before {
if nbf >= now + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(
JwtClaimsError::JwtTokenNotYetReadyToUse(
nbf.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
),
JwtClaimsError::JwtTokenNotYetReadyToUse,
));
}
}
@@ -542,10 +534,10 @@ struct JwtPayload<'a> {
#[serde(rename = "aud", default)]
audience: OneOrMany,
/// Expiration - Time after which the JWT expires
#[serde(rename = "exp", deserialize_with = "numeric_date_opt", default)]
#[serde(deserialize_with = "numeric_date_opt", rename = "exp", default)]
expiration: Option<SystemTime>,
/// Not before - Time before which the JWT is not valid
#[serde(rename = "nbf", deserialize_with = "numeric_date_opt", default)]
/// Not before - Time after which the JWT expires
#[serde(deserialize_with = "numeric_date_opt", rename = "nbf", default)]
not_before: Option<SystemTime>,
// the following entries are only extracted for the sake of debug logging.
@@ -617,15 +609,8 @@ impl<'de> Deserialize<'de> for OneOrMany {
}
fn numeric_date_opt<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
<Option<u64>>::deserialize(d)?
.map(|t| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(t))
.ok_or_else(|| {
serde::de::Error::custom(format_args!("timestamp out of bounds: {t}"))
})
})
.transpose()
let d = <Option<u64>>::deserialize(d)?;
Ok(d.map(|n| SystemTime::UNIX_EPOCH + Duration::from_secs(n)))
}
struct JwkRenewalPermit<'a> {
@@ -761,11 +746,11 @@ pub enum JwtClaimsError {
#[error("invalid JWT token audience")]
InvalidJwtTokenAudience,
#[error("JWT token has expired (exp={0})")]
JwtTokenHasExpired(u64),
#[error("JWT token has expired")]
JwtTokenHasExpired,
#[error("JWT token is not yet ready to use (nbf={0})")]
JwtTokenNotYetReadyToUse(u64),
#[error("JWT token is not yet ready to use")]
JwtTokenNotYetReadyToUse,
}
#[allow(dead_code, reason = "Debug use only")]
@@ -1248,14 +1233,14 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
"nbf": now + 60,
"aud": "neon",
}},
error: JwtClaimsError::JwtTokenNotYetReadyToUse(now + 60),
error: JwtClaimsError::JwtTokenNotYetReadyToUse,
},
Test {
body: json! {{
"exp": now - 60,
"aud": ["neon"],
}},
error: JwtClaimsError::JwtTokenHasExpired(now - 60),
error: JwtClaimsError::JwtTokenHasExpired,
},
Test {
body: json! {{

View File

@@ -12,9 +12,9 @@ use tracing::{debug, warn};
use crate::auth::password_hack::parse_endpoint_param;
use crate::context::RequestContext;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, SniGroup, SniKind};
use crate::metrics::{Metrics, SniKind};
use crate::proxy::NeonOptions;
use crate::serverless::{AUTH_BROKER_SNI, SERVERLESS_DRIVER_SNI};
use crate::serverless::SERVERLESS_DRIVER_SNI;
use crate::types::{EndpointId, RoleName};
#[derive(Debug, Error, PartialEq, Eq, Clone)]
@@ -65,7 +65,7 @@ pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<
if !common_names.contains(common_name) {
return None;
}
if subdomain == SERVERLESS_DRIVER_SNI || subdomain == AUTH_BROKER_SNI {
if subdomain == SERVERLESS_DRIVER_SNI {
return None;
}
Some(EndpointId::from(subdomain))
@@ -128,23 +128,22 @@ impl ComputeUserInfoMaybeEndpoint {
let metrics = Metrics::get();
debug!(%user, "credentials");
let protocol = ctx.protocol();
let kind = if sni.is_some() {
if sni.is_some() {
debug!("Connection with sni");
SniKind::Sni
metrics.proxy.accepted_connections_by_sni.inc(SniKind::Sni);
} else if endpoint.is_some() {
metrics
.proxy
.accepted_connections_by_sni
.inc(SniKind::NoSni);
debug!("Connection without sni");
SniKind::NoSni
} else {
metrics
.proxy
.accepted_connections_by_sni
.inc(SniKind::PasswordHack);
debug!("Connection with password hack");
SniKind::PasswordHack
};
metrics
.proxy
.accepted_connections_by_sni
.inc(SniGroup { protocol, kind });
}
let options = NeonOptions::parse_params(params);

View File

@@ -115,8 +115,8 @@ pub struct ProxyMetrics {
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
pub allowed_vpc_endpoint_ids: Histogram<10>,
/// Number of connections, by the method we used to determine the endpoint.
pub accepted_connections_by_sni: CounterVec<SniSet>,
/// Number of connections (per sni).
pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
/// Number of connection failures (per kind).
pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
@@ -342,20 +342,11 @@ pub enum LatencyExclusions {
ClientCplaneComputeRetry,
}
#[derive(LabelGroup)]
#[label(set = SniSet)]
pub struct SniGroup {
pub protocol: Protocol,
pub kind: SniKind,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "kind")]
pub enum SniKind {
/// Domain name based routing. SNI for libpq/websockets. Host for HTTP
Sni,
/// Metadata based routing. `options` for libpq/websockets. Header for HTTP
NoSni,
/// Metadata based routing, using the password field.
PasswordHack,
}

View File

@@ -41,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.3.1";
pub(crate) const EXT_VERSION: &str = "0.3.0";
pub(crate) const EXT_SCHEMA: &str = "auth";
#[derive(Clone)]

View File

@@ -56,7 +56,6 @@ use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
pub async fn task_main(
config: &'static ProxyConfig,

View File

@@ -38,7 +38,7 @@ use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::{ReadBodyError, read_body_with_limit};
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{NeonOptions, run_until_cancelled};
use crate::serverless::backend::HttpConnError;
use crate::types::{DbName, RoleName};
@@ -227,32 +227,6 @@ fn get_conn_info(
}
}
// check the URL that was used, for metrics
{
let host_endpoint = headers
// get the host header
.get("host")
// extract the domain
.and_then(|h| {
let (host, _port) = h.to_str().ok()?.split_once(':')?;
Some(host)
})
// get the endpoint prefix
.map(|h| h.split_once('.').map_or(h, |(prefix, _)| prefix));
let kind = if host_endpoint == Some(&*endpoint) {
SniKind::Sni
} else {
SniKind::NoSni
};
let protocol = ctx.protocol();
Metrics::get()
.proxy
.accepted_connections_by_sni
.inc(SniGroup { protocol, kind });
}
ctx.set_user_agent(
headers
.get(hyper::header::USER_AGENT)

View File

@@ -121,20 +121,6 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn switch_timeline_membership(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &models::TimelineMembershipSwitchRequest,
) -> Result<models::TimelineMembershipSwitchResponse> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/membership",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self.put(&uri, req).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self

View File

@@ -243,7 +243,8 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let resp =
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
.await?;
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp)
}

View File

@@ -7,7 +7,6 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use reqwest::Certificate;
use safekeeper_api::Term;
@@ -31,7 +30,7 @@ use utils::pausable_failpoint;
use crate::control_file::CONTROL_FILE_NAME;
use crate::state::{EvictionState, TimelinePersistentState};
use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
use crate::timeline::{Timeline, WalResidentTimeline};
use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
use crate::wal_storage::open_wal_file;
use crate::{GlobalTimelines, debug_dump, wal_backup};
@@ -396,7 +395,7 @@ pub async fn handle_request(
sk_auth_token: Option<SecretString>,
ssl_ca_certs: Vec<Certificate>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<PullTimelineResponse, ApiError> {
) -> Result<PullTimelineResponse> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
@@ -412,9 +411,7 @@ pub async fn handle_request(
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let http_client = http_client.build()?;
let http_hosts = request.http_hosts.clone();
@@ -446,10 +443,10 @@ pub async fn handle_request(
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
bail!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)));
)
}
// Find the most advanced safekeeper
@@ -468,7 +465,7 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
match pull_timeline(
pull_timeline(
status,
safekeeper_host,
sk_auth_token,
@@ -476,21 +473,6 @@ pub async fn handle_request(
global_timelines,
)
.await
{
Ok(resp) => Ok(resp),
Err(e) => {
match e.downcast_ref::<TimelineError>() {
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
safekeeper_host: None,
}),
Some(TimelineError::CreationInProgress(_)) => {
// We don't return success here because creation might still fail.
Err(ApiError::Conflict("Creation in progress".to_owned()))
}
_ => Err(ApiError::InternalServerError(e)),
}
}
}
}
async fn pull_timeline(

View File

@@ -98,23 +98,6 @@ impl SafekeeperClient {
)
}
#[allow(unused)]
pub(crate) async fn switch_timeline_membership(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &models::TimelineMembershipSwitchRequest,
) -> Result<models::TimelineMembershipSwitchResponse> {
measured_request!(
"switch_timeline_membership",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.switch_timeline_membership(tenant_id, timeline_id, req)
.await
)
}
pub(crate) async fn delete_tenant(
&self,
tenant_id: TenantId,

View File

@@ -3886,10 +3886,10 @@ impl Service {
None
} else if safekeepers {
// Note that for imported timelines, we do not create the timeline on the safekeepers
// straight away. Instead, we do it once the import finalized such that we know what
// start LSN to provide for the safekeepers. This is done in
// [`Self::finalize_timeline_import`].
// Note that we do not support creating the timeline on the safekeepers
// for imported timelines. The `start_lsn` of the timeline is not known
// until the import finshes.
// https://github.com/neondatabase/neon/issues/11569
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
@@ -3966,22 +3966,11 @@ impl Service {
let active = self.timeline_active_on_all_shards(&import).await?;
match active {
Some(timeline_info) => {
true => {
tracing::info!("Timeline became active on all shards");
if self.config.timelines_onto_safekeepers {
// Now that we know the start LSN of this timeline, create it on the
// safekeepers.
self.tenant_timeline_create_safekeepers_until_success(
import.tenant_id,
timeline_info,
)
.await?;
}
break;
}
None => {
false => {
tracing::info!("Timeline not active on all shards yet");
tokio::select! {
@@ -4015,6 +4004,9 @@ impl Service {
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
tracing::info!(%import_failed, "Timeline import complete");
Ok(())
@@ -4029,16 +4021,10 @@ impl Service {
.await;
}
/// If the timeline is active on all shards, returns the [`TimelineInfo`]
/// collected from shard 0.
///
/// An error is returned if the shard layout has changed during the import.
/// This is guarded against within the storage controller and the pageserver,
/// and, therefore, unexpected.
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
) -> anyhow::Result<Option<TimelineInfo>> {
) -> anyhow::Result<bool> {
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -4062,17 +4048,13 @@ impl Service {
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
} else {
return Ok(None);
return Ok(false);
}
}
targets
};
if targets.is_empty() {
anyhow::bail!("No shards found to finalize import for");
}
let results = self
.tenant_for_shards_api(
targets,
@@ -4088,17 +4070,10 @@ impl Service {
)
.await;
let all_active = results.iter().all(|res| match res {
Ok(results.into_iter().all(|res| match res {
Ok(info) => info.state == TimelineState::Active,
Err(_) => false,
});
if all_active {
// Both unwraps are validated above
Ok(Some(results.into_iter().next().unwrap().unwrap()))
} else {
Ok(None)
}
}))
}
pub(crate) async fn tenant_timeline_archival_config(
@@ -5206,8 +5181,7 @@ impl Service {
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
tenants
.retain(|id, s| !(id.tenant_id == *tenant_id && s.shard.count == *new_shard_count));
tenants.retain(|_id, s| s.shard.count != *new_shard_count);
detach_locations
};
@@ -8510,7 +8484,7 @@ impl Service {
// By default, live migrations are generous about the wait time for getting
// the secondary location up to speed. When draining, give up earlier in order
// to not stall the operation when a cold secondary is encountered.
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
@@ -8843,7 +8817,7 @@ impl Service {
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)

View File

@@ -1,9 +1,4 @@
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, atomic::AtomicU64},
time::Duration,
};
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use clashmap::{ClashMap, Entry};
use safekeeper_api::models::PullTimelineRequest;
@@ -174,17 +169,10 @@ pub(crate) struct ScheduleRequest {
pub(crate) kind: SafekeeperTimelineOpKind,
}
/// A way to keep ongoing/queued reconcile requests apart
#[derive(Copy, Clone, PartialEq, Eq)]
struct TokenId(u64);
type OngoingTokens = ClashMap<(TenantId, Option<TimelineId>), (CancellationToken, TokenId)>;
/// Handle to per safekeeper reconciler.
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>,
ongoing_tokens: Arc<OngoingTokens>,
token_id_counter: AtomicU64,
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
cancel: CancellationToken,
}
@@ -197,28 +185,24 @@ impl ReconcilerHandle {
&self,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
) -> (CancellationToken, TokenId) {
let token_id = self
.token_id_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let token_id = TokenId(token_id);
) -> CancellationToken {
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
if let Entry::Occupied(entry) = &entry {
let (cancel, _) = entry.get();
let cancel: &CancellationToken = entry.get();
cancel.cancel();
}
entry.insert((self.cancel.child_token(), token_id)).clone()
entry.insert(self.cancel.child_token()).clone()
}
/// Cancel an ongoing reconciliation
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
cancel.cancel();
}
}
fn schedule_reconcile(&self, req: ScheduleRequest) {
let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
let hostname = req.safekeeper.skp.host.clone();
if let Err(err) = self.tx.send((req, cancel, token_id)) {
if let Err(err) = self.tx.send((req, cancel)) {
tracing::info!("scheduling request onto {hostname} returned error: {err}");
}
}
@@ -227,14 +211,13 @@ impl ReconcilerHandle {
pub(crate) struct SafekeeperReconciler {
inner: SafekeeperReconcilerInner,
concurrency_limiter: Arc<Semaphore>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
cancel: CancellationToken,
}
/// Thin wrapper over `Service` to not clutter its inherent functions
#[derive(Clone)]
struct SafekeeperReconcilerInner {
ongoing_tokens: Arc<OngoingTokens>,
service: Arc<Service>,
}
@@ -243,20 +226,15 @@ impl SafekeeperReconciler {
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
let (tx, rx) = mpsc::unbounded_channel();
let concurrency = service.config.safekeeper_reconciler_concurrency;
let ongoing_tokens = Arc::new(ClashMap::new());
let mut reconciler = SafekeeperReconciler {
inner: SafekeeperReconcilerInner {
service,
ongoing_tokens: ongoing_tokens.clone(),
},
inner: SafekeeperReconcilerInner { service },
rx,
concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
cancel: cancel.clone(),
};
let handle = ReconcilerHandle {
tx,
ongoing_tokens,
token_id_counter: AtomicU64::new(0),
ongoing_tokens: Arc::new(ClashMap::new()),
cancel,
};
tokio::spawn(async move { reconciler.run().await });
@@ -268,9 +246,7 @@ impl SafekeeperReconciler {
req = self.rx.recv() => req,
_ = self.cancel.cancelled() => break,
};
let Some((req, req_cancel, req_token_id)) = req else {
break;
};
let Some((req, req_cancel)) = req else { break };
let permit_res = tokio::select! {
req = self.concurrency_limiter.clone().acquire_owned() => req,
@@ -289,7 +265,7 @@ impl SafekeeperReconciler {
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
inner
.reconcile_one(req, req_cancel, req_token_id)
.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
@@ -304,14 +280,8 @@ impl SafekeeperReconciler {
}
impl SafekeeperReconcilerInner {
async fn reconcile_one(
&self,
req: ScheduleRequest,
req_cancel: CancellationToken,
req_token_id: TokenId,
) {
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
let req_host = req.safekeeper.skp.host.clone();
let success;
match req.kind {
SafekeeperTimelineOpKind::Pull => {
let Some(timeline_id) = req.timeline_id else {
@@ -332,22 +302,19 @@ impl SafekeeperReconcilerInner {
tenant_id: req.tenant_id,
timeline_id,
};
success = self
.reconcile_inner(
&req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
if let Some(host) = resp.safekeeper_host {
tracing::info!("pulled timeline from {host} onto {req_host}");
} else {
tracing::info!(
"timeline already present on safekeeper on {req_host}"
);
}
},
req_cancel,
)
.await;
self.reconcile_inner(
req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
if let Some(host) = resp.safekeeper_host {
tracing::info!("pulled timeline from {host} onto {req_host}");
} else {
tracing::info!("timeline already present on safekeeper on {req_host}");
}
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Exclude => {
// TODO actually exclude instead of delete here
@@ -358,23 +325,22 @@ impl SafekeeperReconcilerInner {
);
return;
};
success = self
.reconcile_inner(
&req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
if let Some(timeline_id) = req.timeline_id {
success = self
let deleted = self
.reconcile_inner(
&req,
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
@@ -382,13 +348,13 @@ impl SafekeeperReconcilerInner {
req_cancel,
)
.await;
if success {
if deleted {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
} else {
success = self
let deleted = self
.reconcile_inner(
&req,
req,
async |client| client.delete_tenant(tenant_id).await,
|_resp| {
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
@@ -396,21 +362,12 @@ impl SafekeeperReconcilerInner {
req_cancel,
)
.await;
if success {
if deleted {
self.delete_tenant_timelines_from_db(tenant_id).await;
}
}
}
}
if success {
self.ongoing_tokens.remove_if(
&(req.tenant_id, req.timeline_id),
|_ttid, (_cancel, token_id)| {
// Ensure that this request is indeed the request we just finished and not a new one
req_token_id == *token_id
},
);
}
}
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
match self
@@ -464,10 +421,10 @@ impl SafekeeperReconcilerInner {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
}
/// Returns whether the reconciliation happened successfully (or we got cancelled)
/// Returns whether the reconciliation happened successfully
async fn reconcile_inner<T, F, U>(
&self,
req: &ScheduleRequest,
req: ScheduleRequest,
closure: impl Fn(SafekeeperClient) -> F,
log_success: impl FnOnce(T) -> U,
req_cancel: CancellationToken,

View File

@@ -323,42 +323,6 @@ impl Service {
})
}
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: TimelineInfo,
) -> anyhow::Result<()> {
const BACKOFF: Duration = Duration::from_secs(5);
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
}
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.await;
match res {
Ok(_) => {
tracing::info!("Timeline created on safekeepers");
break;
}
Err(err) => {
tracing::error!("Failed to create timeline on safekeepers: {err}");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
},
_ = tokio::time::sleep(BACKOFF) => {}
};
}
}
}
Ok(())
}
/// Directly insert the timeline into the database without reconciling it with safekeepers.
///
/// Useful if the timeline already exists on the specified safekeepers,

View File

@@ -165,17 +165,16 @@ pub(crate) async fn branch_cleanup_and_check_errors(
.head_object(&path, &CancellationToken::new())
.await;
if let Err(e) = response {
if response.is_err() {
// Object is not present.
let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
let msg = format!(
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {}) with error: {}",
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
layer,
metadata.generation.get_suffix(),
metadata.shard,
is_l0,
e,
);
if is_l0 || ignore_error {

View File

@@ -137,10 +137,11 @@ struct TenantRefAccumulator {
impl TenantRefAccumulator {
fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
let this_shard_idx = ttid.tenant_shard_id.to_index();
self.shards_seen
(*self
.shards_seen
.entry(ttid.tenant_shard_id.tenant_id)
.or_default()
.insert(this_shard_idx);
.or_default())
.insert(this_shard_idx);
let mut ancestor_refs = Vec::new();
for (layer_name, layer_metadata) in &index_part.layer_metadata {
@@ -766,13 +767,10 @@ pub async fn pageserver_physical_gc(
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
);
Ok(try_stream! {
let mut cnt = 0;
while let Some(ttid_res) = timelines.next().await {
let ttid = ttid_res?;
cnt += 1;
yield (ttid, tenant_manifest_arc.clone());
}
tracing::info!(%tenant_shard_id, "Found {} timelines", cnt);
})
}
});
@@ -792,7 +790,6 @@ pub async fn pageserver_physical_gc(
&accumulator,
tenant_manifest_arc,
)
.instrument(info_span!("gc_timeline", %ttid))
});
let timelines = timelines.try_buffered(CONCURRENCY);
let mut timelines = std::pin::pin!(timelines);

View File

@@ -153,10 +153,7 @@ pub async fn scan_pageserver_metadata(
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| {
tracing::info!("Found tenant: {}", t);
stream_tenant_timelines(&remote_client, &target, t)
});
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();

View File

@@ -24,6 +24,7 @@ pub struct SnapshotDownloader {
remote_client: GenericRemoteStorage,
#[allow(dead_code)]
target: RootTarget,
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
@@ -42,6 +43,7 @@ impl SnapshotDownloader {
Ok(Self {
remote_client,
target,
bucket_config,
tenant_id,
output_path,
concurrency,
@@ -216,9 +218,11 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (remote_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId
let shards =
stream_tenant_shards(&self.remote_client, &self.target, self.tenant_id).await?;
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
let shards: Vec<TenantShardId> = shards.try_collect().await?;
// Only read from shards that have the highest count: avoids redundantly downloading
@@ -236,8 +240,7 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId
let timelines =
stream_tenant_timelines(&self.remote_client, &self.target, shard).await?;
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
@@ -248,8 +251,8 @@ impl SnapshotDownloader {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines
.map_ok(|ttid| load_timeline_index(&self.remote_client, &self.target, ttid));
let timelines =
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import urllib.parse
from enum import StrEnum
from typing import TYPE_CHECKING, final
import requests
@@ -10,23 +9,11 @@ from requests.auth import AuthBase
from typing_extensions import override
from fixtures.log_helper import log
from fixtures.utils import wait_until
if TYPE_CHECKING:
from requests import PreparedRequest
COMPUTE_AUDIENCE = "compute"
"""
The value to place in the `aud` claim.
"""
@final
class ComputeClaimsScope(StrEnum):
ADMIN = "admin"
@final
class BearerAuth(AuthBase):
"""
@@ -63,35 +50,6 @@ class EndpointHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def prewarm_lfc_status(self) -> dict[str, str]:
res = self.get(f"http://localhost:{self.external_port}/lfc/prewarm")
res.raise_for_status()
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self):
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, error {err}"
wait_until(prewarmed)
def offload_lfc(self):
url = f"http://localhost:{self.external_port}/lfc/offload"
self.post(url).raise_for_status()
def offloaded():
res = self.get(url)
res.raise_for_status()
json = res.json()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, error {err}"
wait_until(offloaded)
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",

View File

@@ -21,7 +21,6 @@ if TYPE_CHECKING:
Any,
)
from fixtures.endpoint.http import ComputeClaimsScope
from fixtures.pg_version import PgVersion
@@ -536,16 +535,12 @@ class NeonLocalCli(AbstractNeonCli):
res.check_returncode()
return res
def endpoint_generate_jwt(
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
) -> str:
def endpoint_generate_jwt(self, endpoint_id: str) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.
"""
args = ["endpoint", "generate-jwt", endpoint_id]
if scope:
args += ["--scope", str(scope)]
cmd = self.raw_cli(args)
cmd.check_returncode()
@@ -557,7 +552,7 @@ class NeonLocalCli(AbstractNeonCli):
endpoint_id: str,
safekeepers_generation: int | None = None,
safekeepers: list[int] | None = None,
remote_ext_base_url: str | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
create_test_user: bool = False,
@@ -572,8 +567,8 @@ class NeonLocalCli(AbstractNeonCli):
extra_env_vars = env or {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
if remote_ext_base_url is not None:
args.extend(["--remote-ext-base-url", remote_ext_base_url])
if remote_ext_config is not None:
args.extend(["--remote-ext-config", remote_ext_config])
if safekeepers_generation is not None:
args.extend(["--safekeepers-generation", str(safekeepers_generation)])

View File

@@ -51,7 +51,7 @@ from fixtures.common_types import (
TimelineId,
)
from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
from fixtures.endpoint.http import ComputeClaimsScope, EndpointHttpClient
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.neon_cli import NeonLocalCli, Pagectl
@@ -1185,9 +1185,7 @@ class NeonEnv:
"broker": {},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
},
"endpoint_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
@@ -1299,6 +1297,13 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
if self.pageserver_virtual_file_io_mode is not None:
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
if not config.test_may_use_compatibility_snapshot_binaries:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
else:
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
if self.pageserver_wal_receiver_protocol is not None:
key, value = PageserverWalReceiverProtocol.to_config_key_value(
self.pageserver_wal_receiver_protocol
@@ -1402,6 +1407,30 @@ class NeonEnv:
for f in futs:
f.result()
# Last step: register safekeepers at the storage controller
if (
self.storage_controller_config is not None
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
):
for sk_id, sk in enumerate(self.safekeepers):
# 0 is an invalid safekeeper id
sk_id = sk_id + 1
body = {
"id": sk_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "127.0.0.1",
"port": sk.port.pg,
"http_port": sk.port.http,
"https_port": None,
"version": 5957,
"availability_zone_id": f"us-east-2b-{sk_id}",
}
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
@@ -3835,7 +3864,7 @@ class NeonAuthBroker:
external_http_port: int,
auth_backend: NeonAuthBroker.ProxyV1,
):
self.domain = "local.neon.build" # resolves to 127.0.0.1
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
self.host = "127.0.0.1"
self.http_port = http_port
self.external_http_port = external_http_port
@@ -3852,7 +3881,7 @@ class NeonAuthBroker:
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs(f"apiauth.{self.domain}", key_path, crt_path)
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -3896,10 +3925,10 @@ class NeonAuthBroker:
log.info(f"Executing http query: {query}")
connstr = f"postgresql://{user}@ep-foo-bar-1234.{self.domain}/postgres"
connstr = f"postgresql://{user}@{self.domain}/postgres"
async with httpx.AsyncClient(verify=str(self.test_output_dir / "proxy.crt")) as client:
response = await client.post(
f"https://apiauth.{self.domain}:{self.external_http_port}/sql",
f"https://{self.domain}:{self.external_http_port}/sql",
json={"query": query, "params": args},
headers={
"Neon-Connection-String": connstr,
@@ -4189,13 +4218,13 @@ class Endpoint(PgProtocol, LogUtils):
self.config(config_lines)
self.__jwt = self.generate_jwt()
self.__jwt = self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id)
return self
def start(
self,
remote_ext_base_url: str | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
safekeeper_generation: int | None = None,
safekeepers: list[int] | None = None,
@@ -4221,7 +4250,7 @@ class Endpoint(PgProtocol, LogUtils):
self.endpoint_id,
safekeepers_generation=safekeeper_generation,
safekeepers=self.active_safekeepers,
remote_ext_base_url=remote_ext_base_url,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
create_test_user=create_test_user,
@@ -4236,14 +4265,6 @@ class Endpoint(PgProtocol, LogUtils):
return self
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.
"""
assert self.endpoint_id is not None
return self.env.neon_cli.endpoint_generate_jwt(self.endpoint_id, scope)
def endpoint_path(self) -> Path:
"""Path to endpoint directory"""
assert self.endpoint_id
@@ -4436,7 +4457,7 @@ class Endpoint(PgProtocol, LogUtils):
hot_standby: bool = False,
lsn: Lsn | None = None,
config_lines: list[str] | None = None,
remote_ext_base_url: str | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
@@ -4455,7 +4476,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
).start(
remote_ext_base_url=remote_ext_base_url,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
basebackup_request_tries=basebackup_request_tries,
@@ -4539,7 +4560,7 @@ class EndpointFactory:
lsn: Lsn | None = None,
hot_standby: bool = False,
config_lines: list[str] | None = None,
remote_ext_base_url: str | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
basebackup_request_tries: int | None = None,
) -> Endpoint:
@@ -4559,7 +4580,7 @@ class EndpointFactory:
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
remote_ext_base_url=remote_ext_base_url,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
basebackup_request_tries=basebackup_request_tries,
)
@@ -4613,10 +4634,7 @@ class EndpointFactory:
return self
def new_replica(
self,
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
@@ -4632,10 +4650,7 @@ class EndpointFactory:
)
def new_replica_start(
self,
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
@@ -5452,13 +5467,6 @@ def wait_for_last_flush_lsn(
if last_flush_lsn is None:
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
# The last_flush_lsn may not correspond to a record boundary.
# For example, if the compute flushed WAL on a page boundary,
# the remaining part of the record might not be flushed for a long time.
# This would prevent the pageserver from reaching last_flush_lsn promptly.
# To ensure the rest of the record reaches the pageserver quickly,
# we forcibly flush the WAL by using CHECKPOINT.
endpoint.safe_psql("CHECKPOINT")
results = []
for tenant_shard_id, pageserver in shards:

View File

@@ -122,10 +122,6 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*Call to node.*management API.*failed.*Timeout.*",
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
# Many tests will take safekeepers offline
".*Call to safekeeper.*management API.*failed.*receive body.*",
".*Call to safekeeper.*management API.*failed.*ReceiveBody.*",
".*Call to safekeeper.*management API.*failed.*Timeout.*",
# Many tests will start up with a node offline
".*startup_reconcile: Could not scan node.*",
# Tests run in dev mode

View File

@@ -1,5 +1,4 @@
import math # Add this import
import os
import time
import traceback
from pathlib import Path
@@ -88,10 +87,7 @@ def test_cumulative_statistics_persistence(
- insert additional tuples that by itself are not enough to trigger auto-vacuum but in combination with the previous tuples are
- verify that autovacuum is triggered by the combination of tuples inserted before and after endpoint suspension
"""
project = neon_api.create_project(
pg_version,
f"Test cumulative statistics persistence, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
)
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
endpoint_id = project["endpoints"][0]["id"]

View File

@@ -62,9 +62,7 @@ def test_ro_replica_lag(
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
project = neon_api.create_project(
pg_version, f"Test readonly replica lag, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
@@ -197,9 +195,7 @@ def test_replication_start_stop(
pgbench_duration = f"-T{2**num_replicas * configuration_test_time_sec}"
error_occurred = False
project = neon_api.create_project(
pg_version, f"Test replication start stop, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])

View File

@@ -206,7 +206,7 @@ class NeonProject:
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}"
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]

View File

@@ -544,69 +544,3 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
)
role = cursor.fetchone()
assert role is None
def test_db_with_custom_settings(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can work with databases that have some custom settings.
For example, role=some_other_role, default_transaction_read_only=on,
search_path=non_public_schema, statement_timeout=1 (1ms).
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
TEST_ROLE = "some_other_role"
TEST_DB = "db_with_custom_settings"
TEST_SCHEMA = "non_public_schema"
endpoint.respec_deep(
**{
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"databases": [
{
"name": TEST_DB,
"owner": TEST_ROLE,
}
],
"roles": [
{
"name": TEST_ROLE,
}
],
},
}
}
)
endpoint.reconfigure()
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute(f"CREATE SCHEMA {TEST_SCHEMA}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET role = {TEST_ROLE}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET default_transaction_read_only = on")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET search_path = {TEST_SCHEMA}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET statement_timeout = 1")
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute("SELECT current_role")
role = cursor.fetchone()
assert role is not None
assert role[0] == TEST_ROLE
cursor.execute("SHOW default_transaction_read_only")
default_transaction_read_only = cursor.fetchone()
assert default_transaction_read_only is not None
assert default_transaction_read_only[0] == "on"
cursor.execute("SHOW search_path")
search_path = cursor.fetchone()
assert search_path is not None
assert search_path[0] == TEST_SCHEMA
# Do not check statement_timeout, because we force it to 2min
# in `endpoint.cursor()` fixture.
endpoint.reconfigure()

View File

@@ -1,78 +0,0 @@
from __future__ import annotations
from http.client import FORBIDDEN, UNAUTHORIZED
from typing import TYPE_CHECKING
import jwt
import pytest
from fixtures.endpoint.http import COMPUTE_AUDIENCE, ComputeClaimsScope, EndpointHttpClient
from fixtures.utils import run_only_on_default_postgres
from requests import RequestException
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
def test_compute_no_scope_claim(neon_simple_env: NeonEnv):
"""
Test that if the JWT scope is not admin and no compute_id is specified,
the external HTTP server returns a 403 Forbidden error.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
# Encode nothing in the token
token = jwt.encode({}, env.auth_keys.priv, algorithm="EdDSA")
# Create an admin-scoped HTTP client
client = EndpointHttpClient(
external_port=endpoint.external_http_port,
internal_port=endpoint.internal_http_port,
jwt=token,
)
try:
client.status()
pytest.fail("Exception should have been raised")
except RequestException as e:
assert e.response is not None
assert e.response.status_code == FORBIDDEN
@pytest.mark.parametrize(
"audience",
(COMPUTE_AUDIENCE, "invalid", None),
ids=["with_audience", "with_invalid_audience", "without_audience"],
)
@run_only_on_default_postgres("The code path being tested is not dependent on Postgres version")
def test_compute_admin_scope_claim(neon_simple_env: NeonEnv, audience: str | None):
"""
Test that an admin-scoped JWT can access the compute's external HTTP server
without the compute_id being specified in the claims.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
data: dict[str, str | list[str]] = {"scope": str(ComputeClaimsScope.ADMIN)}
if audience:
data["aud"] = [audience]
token = jwt.encode(data, env.auth_keys.priv, algorithm="EdDSA")
# Create an admin-scoped HTTP client
client = EndpointHttpClient(
external_port=endpoint.external_http_port,
internal_port=endpoint.internal_http_port,
jwt=token,
)
try:
client.status()
if audience != COMPUTE_AUDIENCE:
pytest.fail("Exception should have been raised")
except RequestException as e:
assert e.response is not None
assert e.response.status_code == UNAUTHORIZED

View File

@@ -221,7 +221,7 @@ def test_remote_extensions(
endpoint.create_remote_extension_spec(spec)
endpoint.start(remote_ext_base_url=extensions_endpoint)
endpoint.start(remote_ext_config=extensions_endpoint)
with endpoint.connect() as conn:
with conn.cursor() as cur:
@@ -249,7 +249,7 @@ def test_remote_extensions(
# Remove the extension files to force a redownload of the extension.
extension.remove(test_output_dir, pg_version)
endpoint.start(remote_ext_base_url=extensions_endpoint)
endpoint.start(remote_ext_config=extensions_endpoint)
# Test that ALTER EXTENSION UPDATE statements also fetch remote extensions.
with endpoint.connect() as conn:

View File

@@ -4,12 +4,10 @@ import pytest
from aiohttp import ClientSession
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import run_only_on_default_postgres
from jwcrypto import jwk, jwt
@pytest.mark.asyncio
@run_only_on_default_postgres("test doesn't use postgres")
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
"""
Inserts, retrieves, and deletes test file using a JWT token
@@ -37,6 +35,7 @@ async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv)
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
headers = {"Authorization": f"Bearer {token}"}
log.info(f"cache key url {key}")
log.info(f"token {token}")
async with ClientSession(headers=headers) as session:
async with session.get(key) as res:

View File

@@ -1,28 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
#
# Test unlogged build for GIST index
#
def test_gist(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
con = endpoint.connect()
cur = con.cursor()
iterations = 100
for _ in range(iterations):
cur.execute(
"CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off)"
)
cur.execute(
"INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i"
)
cur.execute("CREATE INDEX gist_pvactst ON pvactst USING gist (p)")
cur.execute("VACUUM pvactst")
cur.execute("DROP TABLE pvactst")

View File

@@ -24,7 +24,6 @@ from fixtures.utils import (
skip_in_debug_build,
wait_until,
)
from fixtures.workload import Workload
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -98,10 +97,6 @@ def test_pgdata_import_smoke(
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
if neon_env_builder.storage_controller_config is None:
neon_env_builder.storage_controller_config = {}
neon_env_builder.storage_controller_config["timelines_onto_safekeepers"] = True
env = neon_env_builder.init_start()
# The test needs LocalFs support, which is only built in testing mode.
@@ -291,28 +286,34 @@ def test_pgdata_import_smoke(
#
# validate that we can write
#
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
workload.init()
workload.write_rows(64)
workload.validate()
rw_endpoint = env.endpoints.create_start(
branch_name=import_branch_name,
endpoint_id="rw",
tenant_id=tenant_id,
config_lines=ep_config,
)
rw_endpoint.safe_psql("create table othertable(values text)")
rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
rw_lsn = Lsn(workload.endpoint().safe_psql_scalar("select pg_current_wal_flush_lsn()"))
# TODO: consider using `class Workload` here
# to do compaction and whatnot?
#
# validate that we can branch (important use case)
#
# ... at the tip
child_timeline_id = env.create_branch(
_ = env.create_branch(
new_branch_name="br-tip",
ancestor_branch_name=import_branch_name,
tenant_id=tenant_id,
ancestor_start_lsn=rw_lsn,
)
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
child_workload.validate()
validate_vanilla_equivalence(child_workload.endpoint())
br_tip_endpoint = env.endpoints.create_start(
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id, config_lines=ep_config
)
validate_vanilla_equivalence(br_tip_endpoint)
br_tip_endpoint.safe_psql("select * from othertable")
# ... at the initdb lsn
_ = env.create_branch(
@@ -329,7 +330,7 @@ def test_pgdata_import_smoke(
)
validate_vanilla_equivalence(br_initdb_endpoint)
with pytest.raises(psycopg2.errors.UndefinedTable):
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
br_initdb_endpoint.safe_psql("select * from othertable")
@run_only_on_default_postgres(reason="PG version is irrelevant here")
@@ -640,55 +641,6 @@ def test_fast_import_binary(
assert res[0][0] == 10
def test_fast_import_event_triggers(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
):
vanilla_pg.start()
vanilla_pg.safe_psql("""
CREATE FUNCTION test_event_trigger_for_drops()
RETURNS event_trigger LANGUAGE plpgsql AS $$
DECLARE
obj record;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects()
LOOP
RAISE NOTICE '% dropped object: % %.% %',
tg_tag,
obj.object_type,
obj.schema_name,
obj.object_name,
obj.object_identity;
END LOOP;
END
$$;
CREATE EVENT TRIGGER test_event_trigger_for_drops
ON sql_drop
EXECUTE PROCEDURE test_event_trigger_for_drops();
""")
pg_port = port_distributor.get_port()
p = fast_import.run_pgdata(pg_port=pg_port, source_connection_string=vanilla_pg.connstr())
assert p.returncode == 0
vanilla_pg.stop()
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
with VanillaPostgres(
fast_import.workdir / "pgdata", pgbin, pg_port, False
) as new_pgdata_vanilla_pg:
new_pgdata_vanilla_pg.start()
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
res = conn.safe_psql("SELECT count(*) FROM pg_event_trigger;")
log.info(f"Result: {res}")
assert res[0][0] == 0, f"Neon does not support importing event triggers, got: {res[0][0]}"
def test_fast_import_restore_to_connstring(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -1,24 +1,11 @@
import random
import threading
import time
from enum import Enum
import pytest
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
from prometheus_client.parser import text_string_to_metric_families as prom_parse_impl
class LfcQueryMethod(Enum):
COMPUTE_CTL = False
POSTGRES = True
PREWARM_LABEL = "compute_ctl_lfc_prewarm_requests_total"
OFFLOAD_LABEL = "compute_ctl_lfc_offload_requests_total"
QUERY_OPTIONS = LfcQueryMethod.POSTGRES, LfcQueryMethod.COMPUTE_CTL
def check_pinned_entries(cur):
@@ -32,20 +19,11 @@ def check_pinned_entries(cur):
assert n_pinned == 0
def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
return {
sample.name: sample.value
for family in prom_parse_impl(client.metrics())
for sample in family.samples
if sample.name in (PREWARM_LABEL, OFFLOAD_LABEL)
}
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod):
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
@@ -56,57 +34,30 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod):
"neon.file_cache_prewarm_limit=1000",
],
)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon version '1.6'")
pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
log.info(f"Inserting {n_records} rows")
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
http_client = endpoint.http_client()
if query is LfcQueryMethod.COMPUTE_CTL:
status = http_client.prewarm_lfc_status()
assert status["status"] == "not_prewarmed"
assert "error" not in status
http_client.offload_lfc()
assert http_client.prewarm_lfc_status()["status"] == "not_prewarmed"
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0}
else:
pg_cur.execute("select get_local_cache_state()")
lfc_state = pg_cur.fetchall()[0][0]
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
endpoint.stop()
endpoint.start()
# wait until compute_ctl completes downgrade of extension to default version
time.sleep(1)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("alter extension neon update to '1.6'")
conn = endpoint.connect()
cur = conn.cursor()
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
cur.execute("alter extension neon update to '1.6'")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = pg_cur.fetchall()[0][0]
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
pg_cur.execute("select * from get_prewarm_info()")
prewarm_info = pg_cur.fetchall()[0]
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
total, prewarmed, skipped, _ = prewarm_info
progress = (prewarmed + skipped) * 100 // total
log.info(f"Prewarm progress: {progress}%")
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
assert lfc_used_pages > 10000
assert (
@@ -115,23 +66,18 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod):
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
)
lfc_cur.execute("select sum(pk) from t")
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
check_pinned_entries(pg_cur)
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
if query is LfcQueryMethod.COMPUTE_CTL:
assert http_client.prewarm_lfc_status() == desired
assert prom_parse(http_client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1}
check_pinned_entries(cur)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMethod):
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 10000
n_threads = 4
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
@@ -141,58 +87,40 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
"neon.file_cache_prewarm_limit=1000000",
],
)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create extension neon version '1.6'")
pg_cur.execute("CREATE DATABASE lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
lfc_cur.execute(
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute(
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
)
log.info(f"Inserting {n_records} rows")
lfc_cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
http_client = endpoint.http_client()
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.offload_lfc()
else:
pg_cur.execute("select get_local_cache_state()")
lfc_state = pg_cur.fetchall()[0][0]
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
running = True
n_prewarms = 0
def workload():
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
conn = endpoint.connect()
cur = conn.cursor()
n_transfers = 0
while running:
src = random.randint(1, n_records)
dst = random.randint(1, n_records)
lfc_cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
lfc_cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
n_transfers += 1
log.info(f"Number of transfers: {n_transfers}")
def prewarm():
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
conn = endpoint.connect()
cur = conn.cursor()
n_prewarms = 0
while running:
pg_cur.execute("alter system set neon.file_cache_size_limit='1MB'")
pg_cur.execute("select pg_reload_conf()")
pg_cur.execute("alter system set neon.file_cache_size_limit='1GB'")
pg_cur.execute("select pg_reload_conf()")
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
nonlocal n_prewarms
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
cur.execute("select pg_reload_conf()")
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
n_prewarms += 1
log.info(f"Number of prewarms: {n_prewarms}")
@@ -212,10 +140,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
t.join()
prewarm_thread.join()
lfc_cur.execute("select sum(balance) from accounts")
total_balance = lfc_cur.fetchall()[0][0]
cur.execute("select sum(balance) from accounts")
total_balance = cur.fetchall()[0][0]
assert total_balance == 0
check_pinned_entries(pg_cur)
if query is LfcQueryMethod.COMPUTE_CTL:
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: n_prewarms}
check_pinned_entries(cur)

View File

@@ -248,20 +248,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
last_generation = max(
[s[1] for s in last_state.values() if s[1] is not None], default=None
)
is_last_generation = last_generation == generation
# It's also only valid to connect if there are no conflicting attachments (i.e. no
# Pageserver in AttachedSingle with other attachments in same generation).
num_attached_single = len(
[s for s in last_state.values() if s[0] == "AttachedSingle" and s[1] == last_generation]
)
num_attached = len(
[s for s in last_state.values() if s[0].startswith("Attached") and s[1] == last_generation]
)
valid_attachment = (num_attached_single == 1 and num_attached == 1) \
or num_attached_single == 0
if mode.startswith("Attached") and is_last_generation and valid_attachment:
if mode.startswith("Attached") and generation == last_generation:
# This is a basic test: we are validating that he endpoint works properly _between_
# configuration changes. A stronger test would be to validate that clients see
# no errors while we are making the changes.

View File

@@ -39,10 +39,3 @@ def test_role_grants(neon_simple_env: NeonEnv):
res = cur.fetchall()
assert res == [(1,)], "select should not succeed"
# confirm that replicas can also ensure the grants are correctly set.
replica = env.endpoints.new_replica_start(endpoint)
replica_client = replica.http_client()
replica_client.set_role_grants(
"test_role_grants", "test_role", "test_schema", ["CREATE", "USAGE"]
)

View File

@@ -1334,13 +1334,6 @@ def test_sharding_split_failures(
tenant_id, timeline_id, shard_count=initial_shard_count, placement_policy='{"Attached":1}'
)
# Create bystander tenants with various shard counts. They should not be affected by the aborted
# splits. Regression test for https://github.com/neondatabase/cloud/issues/28589.
bystanders = {} # id → shard_count
for bystander_shard_count in [1, 2, 4, 8]:
id, _ = env.create_tenant(shard_count=bystander_shard_count)
bystanders[id] = bystander_shard_count
env.storage_controller.allowed_errors.extend(
[
# All split failures log a warning when then enqueue the abort operation
@@ -1401,8 +1394,6 @@ def test_sharding_split_failures(
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
if tenant_shard_id.tenant_id != tenant_id:
continue # skip bystanders
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
assert tenant_shard_id.shard_count == initial_shard_count
if loc[1]["mode"] == "Secondary":
@@ -1423,8 +1414,6 @@ def test_sharding_split_failures(
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
if tenant_shard_id.tenant_id != tenant_id:
continue # skip bystanders
log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}")
assert tenant_shard_id.shard_count == split_shard_count
if loc[1]["mode"] == "Secondary":
@@ -1507,12 +1496,6 @@ def test_sharding_split_failures(
# the scheduler reaches an idle state
env.storage_controller.reconcile_until_idle(timeout_secs=30)
# Check that all bystanders are still around.
for bystander_id, bystander_shard_count in bystanders.items():
response = env.storage_controller.tenant_describe(bystander_id)
assert TenantId(response["tenant_id"]) == bystander_id
assert len(response["shards"]) == bystander_shard_count
env.storage_controller.consistency_check()

View File

@@ -1822,7 +1822,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
endpoint2.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
)
lsn3 = wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
lsn3 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
@@ -1839,7 +1839,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
assert all_reparented == set([])
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
), "main branch unaffected"

Some files were not shown because too many files have changed in this diff Show More