Compare commits

...

15 Commits

Author SHA1 Message Date
Cihan Demirci
7b0f9e4baa login to dev ECR via OIDC
Dev account counterpart of https://github.com/neondatabase/neon/pull/9146
2024-09-26 22:06:24 +01:00
Conrad Ludgate
ec07a1ecc9 proxy: make local-proxy config by signal with PID, refine JWKS apis with role caching (#9164) 2024-09-26 19:01:48 +01:00
Arseny Sher
c4cdfe66ac Fix flakiness of test_timeline_copy.
Timeline might be not initialized when timeline_start_lsn is
queried. Spotted by CI.
2024-09-26 19:01:45 +03:00
Alex Chi Z.
42e19e952f fix(pageserver): categorize client error in basebackup metrics (#9110)
We separated client error from basebackup error log lines in
https://github.com/neondatabase/neon/pull/7523, but we didn't do
anything for the metrics. In this patch, we fixed it.

ref https://github.com/neondatabase/neon/issues/8970

## Summary of changes

We use the same criteria as in `log_query_error` producing an info line
(instead of error) for the metrics. We added a `client_error` category
for the basebackup query time metrics.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-26 11:38:19 -04:00
John Spray
3d255d601b pageserver: rename control plane client & chunk validation requests (#8997)
## Problem

- In https://github.com/neondatabase/neon/pull/8784, the validate
controller API is modified to check generations directly in the
database. It batches tenants into separate queries to avoid generating a
huge statement, but
- While updating this, I realized that "control_plane_client" is a kind
of confusing name for the client code now that it primarily talks to the
storage controller (the case of talking to the control plane will go
away in a few months).

## Summary of changes

- Big rename to "ControllerUpcallClient" -- this reflects the storage
controller's api naming, where the paths used by the pageserver are in
`/upcall/`
- When sending validate requests, break them up into chunks so that we
avoid possible edge cases of generating any HTTP requests that require
database I/O across many thousands of tenants.

This PR mixes a functional change with a refactor, but the commits are
cleanly separated -- only the last commit is a functional change.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-09-26 16:06:34 +01:00
Arthur Petukhovsky
80e974d05b fix(compute_ctl): race condition in configurator (#9162)
There was a tricky race condition in compute_ctl, that sometimes makes
configurator skip updates. It makes a deadlock because:
- control-plane cannot configure compute, because it's in
ConfigurationPending state
- compute_ctl doesn't do any reconfiguration because
`configurator_main_loop` missed notification for it

Full sequence that reproduces the issue:
1. `start_compute` finishes works and changes status
`self.set_status(ComputeStatus::Running);`
2. configurator received update about `Running` state and dropped the
mutex lock in the iteration
3. `/configure` request was triggered at the same time as step 1, and
got the mutex lock
4. same `/configure` request set the spec and updated the state to
`ConfigurationPending`, also sent a notification
5. next iteration in configurator got the mutex lock, but missed the
notification

There are more details in this slack thread:
https://neondb.slack.com/archives/C03438W3FLZ/p1727281028478689?thread_ts=1727261220.483799&cid=C03438W3FLZ

---------

Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
2024-09-26 15:42:17 +01:00
Alexander Bayandin
7fdf1ab5b6 CI: run compatibility tests on Postgres 17 (#9145)
## Problem

The latest storage release has generated artifacts for Postgres 17,
so we can enable compatibility tests this version

## Summary of changes
- Unskip `test_backward_compatibility` / `test_forward_compatibility` on
Postgres 17
2024-09-26 15:17:01 +01:00
Arpad Müller
7bae78186b Forbid creation of child timelines of archived timeline (#9122)
We don't want to allow any new child timelines of archived timelines. If
you want any new child timelines, you should first un-archive the
timeline.
 
Part of #8088
2024-09-26 02:05:25 +02:00
Heikki Linnakangas
7e560dd00e chore: Silence clippy warning with nightly (#9157)
The warning:

    warning: first doc comment paragraph is too long
      --> pageserver/src/tenant/checks.rs:7:1
       |
7 | / /// Checks whether a layer map is valid (i.e., is a valid result
of the current compaction algorithm if no...
8 | | /// The function checks if we can split the LSN range of a delta
layer only at the LSNs of the delta layer...
    9  | | ///
    10 | | /// ```plain
       | |_
       |
= help: for further information visit
https://rust-lang.github.io/rust-clippy/master/index.html#too_long_first_doc_paragraph
= note: `#[warn(clippy::too_long_first_doc_paragraph)]` on by default
    help: add an empty line
       |
7 ~ /// Checks whether a layer map is valid (i.e., is a valid result of
the current compaction algorithm if nothing goes wrong).
    8  + ///
       |

Fix by applying the suggestion.
2024-09-25 21:29:16 +00:00
Tristan Partin
684e924211 Fix compute_logical_snapshot_files for v14
The function, pg_ls_logicalsnapdir(), was added in version 15.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-09-25 16:25:17 -05:00
Tristan Partin
8ace9ea25f Format long single DATA line in pgxn/Makefile
This should be a little more readable.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-09-25 16:25:17 -05:00
Alex Chi Z.
6a4f49b08b fix(pageserver): passthrough partition cancel error (#9154)
close https://github.com/neondatabase/neon/issues/9142

## Summary of changes

passthrough CollectKeyspaceError::Cancelled to
CompactionError::ShuttingDown

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-25 21:35:33 +01:00
Alexander Bayandin
c6e89445e2 CI(promote-images): fix prod ECR auth (#9146)
A cherry-pick from the previous release (#9131)

## Problem
Login to prod ECR doesn't work anymore:
```
Retrieving registries data through *** SDK...
*** ECR detected with eu-central-1 region
Error: The security token included in the request is invalid.
```

## Summary of changes
- Fix login to prod ECR by using `aws-actions/configure-aws-credentials`
2024-09-25 18:22:39 +01:00
Vlad Lazar
04f32b9526 tests: remove patching up of az id column (#8968)
This was required since the compat tests used a snapshot generated from
a version of neon local which didn't contain the availability_zone_id
column.
2024-09-25 17:22:32 +01:00
Heikki Linnakangas
6f2333f52b CI: Leave out unnecessary build files from binary artifact (#9135)
The pg_install/build directory contains .o files and such intermediate
results from the build, which are not needed in the final tarball.
Except for src/test/regress/regress.so and a few other .so files in that
directory; keep those.

This reduces the size of the neon-Linux-X64-release-artifact.tar.zst
artifact from about 1.5 GB to 700 MB.

(I attempted this a long time ago already, by moving the build/
directory out of pg_install altogether, see PR #2127. But I never got
around to finish that work.)

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-09-25 19:07:20 +03:00
36 changed files with 442 additions and 253 deletions

View File

@@ -20,3 +20,4 @@ config-variables:
- REMOTE_STORAGE_AZURE_REGION
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
- DEV_AWS_OIDC_ROLE_ARN
- DEV_GHA_OIDC_ROLE

View File

@@ -257,7 +257,15 @@ jobs:
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install postgres binaries
run: cp -a pg_install /tmp/neon/pg_install
run: |
# Use tar to copy files matching the pattern, preserving the paths in the destionation
tar c \
pg_install/v* \
pg_install/build/*/src/test/regress/*.so \
pg_install/build/*/src/test/regress/pg_regress \
pg_install/build/*/src/test/isolation/isolationtester \
pg_install/build/*/src/test/isolation/pg_isolation_regress \
| tar x -C /tmp/neon
- name: Upload Neon artifact
uses: ./.github/actions/upload

View File

@@ -862,6 +862,9 @@ jobs:
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04
permissions:
id-token: write # for `aws-actions/configure-aws-credentials`
env:
VERSIONS: v14 v15 v16 v17
@@ -871,12 +874,17 @@ jobs:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Configure AWS-dev credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
mask-aws-account-id: true
role-to-assume: ${{ vars.DEV_GHA_OIDC_ROLE }}
- name: Login to dev ECR
uses: docker/login-action@v3
with:
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Copy vm-compute-node images to ECR
run: |
@@ -906,13 +914,19 @@ jobs:
docker buildx imagetools create -t neondatabase/neon-test-extensions-v16:latest \
neondatabase/neon-test-extensions-v16:${{ needs.tag.outputs.build-tag }}
- name: Configure AWS-prod credentials
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
mask-aws-account-id: true
role-to-assume: ${{ secrets.PROD_GHA_OIDC_ROLE }}
- name: Login to prod ECR
uses: docker/login-action@v3
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
with:
registry: 093970136003.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_ACCESS_KEY_ID }}
password: ${{ secrets.PROD_GHA_RUNNER_LIMITED_AWS_SECRET_ACCESS_KEY }}
- name: Copy all images to prod ECR
if: github.ref_name == 'release'|| github.ref_name == 'release-proxy'
@@ -1181,10 +1195,9 @@ jobs:
files_to_promote+=("s3://${BUCKET}/${s3_key}")
# TODO Add v17
for pg_version in v14 v15 v16; do
for pg_version in v14 v15 v16 v17; do
# We run less tests for debug builds, so we don't need to promote them
if [ "${build_type}" == "debug" ] && { [ "${arch}" == "ARM64" ] || [ "${pg_version}" != "v16" ] ; }; then
if [ "${build_type}" == "debug" ] && { [ "${arch}" == "ARM64" ] || [ "${pg_version}" != "v17" ] ; }; then
continue
fi

1
Cargo.lock generated
View File

@@ -4296,6 +4296,7 @@ dependencies = [
"camino-tempfile",
"chrono",
"clap",
"compute_api",
"consumption_metrics",
"dashmap",
"ecdsa 0.16.9",

View File

@@ -195,7 +195,7 @@ metrics:
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These
-- temporary snapshot files are renamed to the actual snapshot files after they are
-- completely built. We only WAL-log the completely built snapshot files.
(SELECT COUNT(*) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
# In all the below metrics, we cast LSNs to floats because Prometheus only supports floats.
# It's probably fine because float64 can store integers from -2^53 to +2^53 exactly.
@@ -244,4 +244,3 @@ metrics:
SELECT slot_name,
CASE WHEN wal_status = 'lost' THEN 1 ELSE 0 END AS wal_is_lost
FROM pg_replication_slots;

View File

@@ -11,9 +11,17 @@ use crate::compute::ComputeNode;
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let state = compute.state.lock().unwrap();
let mut state = compute.state_changed.wait(state).unwrap();
let mut state = compute.state.lock().unwrap();
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
// Re-check the status after waking up
if state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
state.status = ComputeStatus::Configuration;

View File

@@ -268,6 +268,22 @@ pub struct GenericOption {
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
/// Configured the local-proxy application with the relevant JWKS and roles it should
/// use for authorizing connect requests using JWT.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LocalProxySpec {
pub jwks: Vec<JwksSettings>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JwksSettings {
pub id: String,
pub role_names: Vec<String>,
pub jwks_url: String,
pub provider_name: String,
pub jwt_audience: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -984,6 +984,7 @@ pub fn short_error(e: &QueryError) -> String {
}
fn log_query_error(query: &str, e: &QueryError) {
// If you want to change the log level of a specific error, also re-categorize it in `BasebackupQueryTimeOngoingRecording`.
match e {
QueryError::Disconnected(ConnectionError::Io(io_error)) => {
if is_expected_io_error(io_error) {

View File

@@ -15,7 +15,7 @@ use clap::{Arg, ArgAction, Command};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::config::PageserverIdentity;
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::controller_upcall_client::ControllerUpcallClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME};
@@ -396,7 +396,7 @@ fn start_pageserver(
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
ControlPlaneClient::new(conf, &shutdown_pageserver),
ControllerUpcallClient::new(conf, &shutdown_pageserver),
conf,
);
if let Some(deletion_workers) = deletion_workers {

View File

@@ -17,9 +17,12 @@ use utils::{backoff, failpoint_support, generation::Generation, id::NodeId};
use crate::{config::PageServerConf, virtual_file::on_fatal_io_error};
use pageserver_api::config::NodeMetadata;
/// The Pageserver's client for using the control plane API: this is a small subset
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
pub struct ControlPlaneClient {
/// The Pageserver's client for using the storage controller upcall API: this is a small API
/// for dealing with generations (see docs/rfcs/025-generation-numbers.md).
///
/// The server presenting this API may either be the storage controller or some other
/// service (such as the Neon control plane) providing a store of generation numbers.
pub struct ControllerUpcallClient {
http_client: reqwest::Client,
base_url: Url,
node_id: NodeId,
@@ -45,7 +48,7 @@ pub trait ControlPlaneGenerationsApi {
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
}
impl ControlPlaneClient {
impl ControllerUpcallClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
@@ -114,7 +117,7 @@ impl ControlPlaneClient {
}
}
impl ControlPlaneGenerationsApi for ControlPlaneClient {
impl ControlPlaneGenerationsApi for ControllerUpcallClient {
/// Block until we get a successful response, or error out if we are shut down
async fn re_attach(
&self,
@@ -216,29 +219,38 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
.join("validate")
.expect("Failed to build validate path");
let request = ValidateRequest {
tenants: tenants
.into_iter()
.map(|(id, gen)| ValidateRequestTenant {
id,
gen: gen
.into()
.expect("Generation should always be valid for a Tenant doing deletions"),
})
.collect(),
};
// When sending validate requests, break them up into chunks so that we
// avoid possible edge cases of generating any HTTP requests that
// require database I/O across many thousands of tenants.
let mut result: HashMap<TenantShardId, bool> = HashMap::with_capacity(tenants.len());
for tenant_chunk in (tenants).chunks(128) {
let request = ValidateRequest {
tenants: tenant_chunk
.iter()
.map(|(id, generation)| ValidateRequestTenant {
id: *id,
gen: (*generation).into().expect(
"Generation should always be valid for a Tenant doing deletions",
),
})
.collect(),
};
failpoint_support::sleep_millis_async!("control-plane-client-validate-sleep", &self.cancel);
if self.cancel.is_cancelled() {
return Err(RetryForeverError::ShuttingDown);
failpoint_support::sleep_millis_async!(
"control-plane-client-validate-sleep",
&self.cancel
);
if self.cancel.is_cancelled() {
return Err(RetryForeverError::ShuttingDown);
}
let response: ValidateResponse =
self.retry_http_forever(&re_attach_path, request).await?;
for rt in response.tenants {
result.insert(rt.id, rt.valid);
}
}
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
Ok(response
.tenants
.into_iter()
.map(|rt| (rt.id, rt.valid))
.collect())
Ok(result.into_iter().collect())
}
}

View File

@@ -6,7 +6,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
@@ -622,7 +622,7 @@ impl DeletionQueue {
/// If remote_storage is None, then the returned workers will also be None.
pub fn new<C>(
remote_storage: GenericRemoteStorage,
control_plane_client: Option<C>,
controller_upcall_client: Option<C>,
conf: &'static PageServerConf,
) -> (Self, Option<DeletionQueueWorkers<C>>)
where
@@ -662,7 +662,7 @@ impl DeletionQueue {
conf,
backend_rx,
executor_tx,
control_plane_client,
controller_upcall_client,
lsn_table.clone(),
cancel.clone(),
),
@@ -704,7 +704,7 @@ mod test {
use tokio::task::JoinHandle;
use crate::{
control_plane_client::RetryForeverError,
controller_upcall_client::RetryForeverError,
repository::Key,
tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
};

View File

@@ -25,8 +25,8 @@ use tracing::info;
use tracing::warn;
use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
use crate::controller_upcall_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::MaybeFatalIo;
@@ -61,7 +61,7 @@ where
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
// Client for calling into control plane API for validation of deletes
control_plane_client: Option<C>,
controller_upcall_client: Option<C>,
// DeletionLists which are waiting generation validation. Not safe to
// execute until [`validate`] has processed them.
@@ -94,7 +94,7 @@ where
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
control_plane_client: Option<C>,
controller_upcall_client: Option<C>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
) -> Self {
@@ -102,7 +102,7 @@ where
conf,
rx,
tx,
control_plane_client,
controller_upcall_client,
lsn_table,
pending_lists: Vec::new(),
validated_lists: Vec::new(),
@@ -145,8 +145,8 @@ where
return Ok(());
}
let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client {
match control_plane_client
let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
match controller_upcall_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{

View File

@@ -589,6 +589,10 @@ async fn timeline_create_handler(
StatusCode::SERVICE_UNAVAILABLE,
HttpErrorBody::from_msg(e.to_string()),
),
Err(e @ tenant::CreateTimelineError::AncestorArchived) => json_response(
StatusCode::NOT_ACCEPTABLE,
HttpErrorBody::from_msg(e.to_string()),
),
Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
StatusCode::SERVICE_UNAVAILABLE,
HttpErrorBody::from_msg("tenant shutting down".to_string()),

View File

@@ -6,7 +6,7 @@ pub mod basebackup;
pub mod config;
pub mod consumption_metrics;
pub mod context;
pub mod control_plane_client;
pub mod controller_upcall_client;
pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;

View File

@@ -8,6 +8,8 @@ use metrics::{
};
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, QueryError};
use pq_proto::framed::ConnectionError;
use strum::{EnumCount, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use tracing::warn;
@@ -1508,6 +1510,7 @@ static COMPUTE_STARTUP_BUCKETS: Lazy<[f64; 28]> = Lazy::new(|| {
pub(crate) struct BasebackupQueryTime {
ok: Histogram,
error: Histogram,
client_error: Histogram,
}
pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
@@ -1521,6 +1524,7 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|
BasebackupQueryTime {
ok: vec.get_metric_with_label_values(&["ok"]).unwrap(),
error: vec.get_metric_with_label_values(&["error"]).unwrap(),
client_error: vec.get_metric_with_label_values(&["client_error"]).unwrap(),
}
});
@@ -1557,7 +1561,7 @@ impl BasebackupQueryTime {
}
impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
pub(crate) fn observe<T, E>(self, res: &Result<T, E>) {
pub(crate) fn observe<T>(self, res: &Result<T, QueryError>) {
let elapsed = self.start.elapsed();
let ex_throttled = self
.ctx
@@ -1576,10 +1580,15 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
elapsed
}
};
let metric = if res.is_ok() {
&self.parent.ok
} else {
&self.parent.error
// If you want to change categorize of a specific error, also change it in `log_query_error`.
let metric = match res {
Ok(_) => &self.parent.ok,
Err(QueryError::Disconnected(ConnectionError::Io(io_error)))
if is_expected_io_error(io_error) =>
{
&self.parent.client_error
}
Err(_) => &self.parent.error,
};
metric.observe(ex_throttled.as_secs_f64());
}

View File

@@ -563,6 +563,8 @@ pub enum CreateTimelineError {
AncestorLsn(anyhow::Error),
#[error("ancestor timeline is not active")]
AncestorNotActive,
#[error("ancestor timeline is archived")]
AncestorArchived,
#[error("tenant shutting down")]
ShuttingDown,
#[error(transparent)]
@@ -1698,6 +1700,11 @@ impl Tenant {
return Err(CreateTimelineError::AncestorNotActive);
}
if ancestor_timeline.is_archived() == Some(true) {
info!("tried to branch archived timeline");
return Err(CreateTimelineError::AncestorArchived);
}
if let Some(lsn) = ancestor_start_lsn.as_mut() {
*lsn = lsn.align();

View File

@@ -5,6 +5,7 @@ use itertools::Itertools;
use super::storage_layer::LayerName;
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
///
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain

View File

@@ -30,8 +30,8 @@ use utils::{backoff, completion, crashsafe};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::control_plane_client::{
ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
use crate::controller_upcall_client::{
ControlPlaneGenerationsApi, ControllerUpcallClient, RetryForeverError,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
@@ -122,7 +122,7 @@ pub(crate) enum ShardSelector {
Known(ShardIndex),
}
/// A convenience for use with the re_attach ControlPlaneClient function: rather
/// A convenience for use with the re_attach ControllerUpcallClient function: rather
/// than the serializable struct, we build this enum that encapsulates
/// the invariant that attached tenants always have generations.
///
@@ -341,7 +341,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
} else if let Some(client) = ControllerUpcallClient::new(conf, cancel) {
info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {

View File

@@ -3601,7 +3601,7 @@ impl Timeline {
ctx,
)
.await
.map_err(|e| FlushLayerError::from_anyhow(self, e))?;
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
@@ -3840,16 +3840,20 @@ impl Timeline {
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> {
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
anyhow::bail!("repartition() called concurrently, this should not happen");
return Err(CompactionError::Other(anyhow!(
"repartition() called concurrently, this should not happen"
)));
};
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
if lsn < *partition_lsn {
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
return Err(CompactionError::Other(anyhow!(
"repartition() called with LSN going backwards, this should not happen"
)));
}
let distance = lsn.0 - partition_lsn.0;
@@ -4451,6 +4455,12 @@ pub(crate) enum CompactionError {
Other(anyhow::Error),
}
impl CompactionError {
pub fn is_cancelled(&self) -> bool {
matches!(self, CompactionError::ShuttingDown)
}
}
impl From<CollectKeySpaceError> for CompactionError {
fn from(err: CollectKeySpaceError) -> Self {
match err {

View File

@@ -390,7 +390,7 @@ impl Timeline {
// error but continue.
//
// Suppress error when it's due to cancellation
if !self.cancel.is_cancelled() {
if !self.cancel.is_cancelled() && !err.is_cancelled() {
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
}
(1, false)

View File

@@ -25,7 +25,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql neon--1.4--1.5.sql neon--1.5--1.4.sql
DATA = \
neon--1.0.sql \
neon--1.0--1.1.sql \
neon--1.1--1.2.sql \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \
neon--1.2--1.1.sql \
neon--1.1--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -24,6 +24,7 @@ bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
chrono.workspace = true
clap.workspace = true
compute_api.workspace = true
consumption_metrics.workspace = true
dashmap.workspace = true
env_logger.workspace = true

View File

@@ -12,7 +12,10 @@ use serde::{Deserialize, Deserializer};
use signature::Verifier;
use tokio::time::Instant;
use crate::{context::RequestMonitoring, http::parse_json_body_with_limit, EndpointId, RoleName};
use crate::{
context::RequestMonitoring, http::parse_json_body_with_limit, intern::RoleNameInt, EndpointId,
RoleName,
};
// TODO(conrad): make these configurable.
const CLOCK_SKEW_LEEWAY: Duration = Duration::from_secs(30);
@@ -27,7 +30,6 @@ pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
&self,
ctx: &RequestMonitoring,
endpoint: EndpointId,
role_name: RoleName,
) -> impl Future<Output = anyhow::Result<Vec<AuthRule>>> + Send;
}
@@ -35,10 +37,11 @@ pub(crate) struct AuthRule {
pub(crate) id: String,
pub(crate) jwks_url: url::Url,
pub(crate) audience: Option<String>,
pub(crate) role_names: Vec<RoleNameInt>,
}
#[derive(Default)]
pub(crate) struct JwkCache {
pub struct JwkCache {
client: reqwest::Client,
map: DashMap<(EndpointId, RoleName), Arc<JwkCacheEntryLock>>,
@@ -54,18 +57,28 @@ pub(crate) struct JwkCacheEntry {
}
impl JwkCacheEntry {
fn find_jwk_and_audience(&self, key_id: &str) -> Option<(&jose_jwk::Jwk, Option<&str>)> {
self.key_sets.values().find_map(|key_set| {
key_set
.find_key(key_id)
.map(|jwk| (jwk, key_set.audience.as_deref()))
})
fn find_jwk_and_audience(
&self,
key_id: &str,
role_name: &RoleName,
) -> Option<(&jose_jwk::Jwk, Option<&str>)> {
self.key_sets
.values()
// make sure our requested role has access to the key set
.filter(|key_set| key_set.role_names.iter().any(|role| **role == **role_name))
// try and find the requested key-id in the key set
.find_map(|key_set| {
key_set
.find_key(key_id)
.map(|jwk| (jwk, key_set.audience.as_deref()))
})
}
}
struct KeySet {
jwks: jose_jwk::JwkSet,
audience: Option<String>,
role_names: Vec<RoleNameInt>,
}
impl KeySet {
@@ -106,7 +119,6 @@ impl JwkCacheEntryLock {
ctx: &RequestMonitoring,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
auth_rules: &F,
) -> anyhow::Result<Arc<JwkCacheEntry>> {
// double check that no one beat us to updating the cache.
@@ -119,11 +131,10 @@ impl JwkCacheEntryLock {
}
}
let rules = auth_rules
.fetch_auth_rules(ctx, endpoint, role_name)
.await?;
let rules = auth_rules.fetch_auth_rules(ctx, endpoint).await?;
let mut key_sets =
ahash::HashMap::with_capacity_and_hasher(rules.len(), ahash::RandomState::new());
// TODO(conrad): run concurrently
// TODO(conrad): strip the JWKs urls (should be checked by cplane as well - cloud#16284)
for rule in rules {
@@ -151,6 +162,7 @@ impl JwkCacheEntryLock {
KeySet {
jwks,
audience: rule.audience,
role_names: rule.role_names,
},
);
}
@@ -173,7 +185,6 @@ impl JwkCacheEntryLock {
ctx: &RequestMonitoring,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
fetch: &F,
) -> Result<Arc<JwkCacheEntry>, anyhow::Error> {
let now = Instant::now();
@@ -183,9 +194,7 @@ impl JwkCacheEntryLock {
let Some(cached) = guard else {
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let permit = self.acquire_permit().await;
return self
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
.await;
return self.renew_jwks(permit, ctx, client, endpoint, fetch).await;
};
let last_update = now.duration_since(cached.last_retrieved);
@@ -196,9 +205,7 @@ impl JwkCacheEntryLock {
let permit = self.acquire_permit().await;
// it's been too long since we checked the keys. wait for them to update.
return self
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
.await;
return self.renew_jwks(permit, ctx, client, endpoint, fetch).await;
}
// every 5 minutes we should spawn a job to eagerly update the token.
@@ -212,7 +219,7 @@ impl JwkCacheEntryLock {
let ctx = ctx.clone();
tokio::spawn(async move {
if let Err(e) = entry
.renew_jwks(permit, &ctx, &client, endpoint, role_name, &fetch)
.renew_jwks(permit, &ctx, &client, endpoint, &fetch)
.await
{
tracing::warn!(error=?e, "could not fetch JWKs in background job");
@@ -232,7 +239,7 @@ impl JwkCacheEntryLock {
jwt: &str,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
role_name: &RoleName,
fetch: &F,
) -> Result<(), anyhow::Error> {
// JWT compact form is defined to be
@@ -254,30 +261,26 @@ impl JwkCacheEntryLock {
let sig = base64::decode_config(signature, base64::URL_SAFE_NO_PAD)
.context("Provided authentication token is not a valid JWT encoding")?;
ensure!(header.typ == "JWT");
ensure!(
header.typ == "JWT",
"Provided authentication token is not a valid JWT encoding"
);
let kid = header.key_id.context("missing key id")?;
let mut guard = self
.get_or_update_jwk_cache(ctx, client, endpoint.clone(), role_name.clone(), fetch)
.get_or_update_jwk_cache(ctx, client, endpoint.clone(), fetch)
.await?;
// get the key from the JWKs if possible. If not, wait for the keys to update.
let (jwk, expected_audience) = loop {
match guard.find_jwk_and_audience(kid) {
match guard.find_jwk_and_audience(kid, role_name) {
Some(jwk) => break jwk,
None if guard.last_retrieved.elapsed() > MIN_RENEW => {
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let permit = self.acquire_permit().await;
guard = self
.renew_jwks(
permit,
ctx,
client,
endpoint.clone(),
role_name.clone(),
fetch,
)
.renew_jwks(permit, ctx, client, endpoint.clone(), fetch)
.await?;
}
_ => {
@@ -320,11 +323,14 @@ impl JwkCacheEntryLock {
let now = SystemTime::now();
if let Some(exp) = payload.expiration {
ensure!(now < exp + CLOCK_SKEW_LEEWAY);
ensure!(now < exp + CLOCK_SKEW_LEEWAY, "JWT token has expired");
}
if let Some(nbf) = payload.not_before {
ensure!(nbf < now + CLOCK_SKEW_LEEWAY);
ensure!(
nbf < now + CLOCK_SKEW_LEEWAY,
"JWT token is not yet ready to use"
);
}
Ok(())
@@ -336,7 +342,7 @@ impl JwkCache {
&self,
ctx: &RequestMonitoring,
endpoint: EndpointId,
role_name: RoleName,
role_name: &RoleName,
fetch: &F,
jwt: &str,
) -> Result<(), anyhow::Error> {
@@ -572,7 +578,7 @@ mod tests {
format!("{header}.{body}")
}
fn new_ec_jwt(kid: String, key: p256::SecretKey) -> String {
fn new_ec_jwt(kid: String, key: &p256::SecretKey) -> String {
use p256::ecdsa::{Signature, SigningKey};
let payload = build_jwt_payload(kid, jose_jwa::Signing::Es256);
@@ -660,11 +666,6 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
let (ec1, jwk3) = new_ec_jwk("3".into());
let (ec2, jwk4) = new_ec_jwk("4".into());
let jwt1 = new_rsa_jwt("1".into(), rs1);
let jwt2 = new_rsa_jwt("2".into(), rs2);
let jwt3 = new_ec_jwt("3".into(), ec1);
let jwt4 = new_ec_jwt("4".into(), ec2);
let foo_jwks = jose_jwk::JwkSet {
keys: vec![jwk1, jwk3],
};
@@ -706,47 +707,98 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
let client = reqwest::Client::new();
#[derive(Clone)]
struct Fetch(SocketAddr);
struct Fetch(SocketAddr, Vec<RoleNameInt>);
impl FetchAuthRules for Fetch {
async fn fetch_auth_rules(
&self,
_ctx: &RequestMonitoring,
_endpoint: EndpointId,
_role_name: RoleName,
) -> anyhow::Result<Vec<AuthRule>> {
Ok(vec![
AuthRule {
id: "foo".to_owned(),
jwks_url: format!("http://{}/foo", self.0).parse().unwrap(),
audience: None,
role_names: self.1.clone(),
},
AuthRule {
id: "bar".to_owned(),
jwks_url: format!("http://{}/bar", self.0).parse().unwrap(),
audience: None,
role_names: self.1.clone(),
},
])
}
}
let role_name = RoleName::from("user");
let role_name1 = RoleName::from("anonymous");
let role_name2 = RoleName::from("authenticated");
let fetch = Fetch(
addr,
vec![
RoleNameInt::from(&role_name1),
RoleNameInt::from(&role_name2),
],
);
let endpoint = EndpointId::from("ep");
let jwk_cache = Arc::new(JwkCacheEntryLock::default());
for token in [jwt1, jwt2, jwt3, jwt4] {
jwk_cache
.check_jwt(
&RequestMonitoring::test(),
&token,
&client,
endpoint.clone(),
role_name.clone(),
&Fetch(addr),
)
.await
.unwrap();
let jwt1 = new_rsa_jwt("1".into(), rs1);
let jwt2 = new_rsa_jwt("2".into(), rs2);
let jwt3 = new_ec_jwt("3".into(), &ec1);
let jwt4 = new_ec_jwt("4".into(), &ec2);
// had the wrong kid, therefore will have the wrong ecdsa signature
let bad_jwt = new_ec_jwt("3".into(), &ec2);
// this role_name is not accepted
let bad_role_name = RoleName::from("cloud_admin");
let err = jwk_cache
.check_jwt(
&RequestMonitoring::test(),
&bad_jwt,
&client,
endpoint.clone(),
&role_name1,
&fetch,
)
.await
.unwrap_err();
assert!(err.to_string().contains("signature error"));
let err = jwk_cache
.check_jwt(
&RequestMonitoring::test(),
&jwt1,
&client,
endpoint.clone(),
&bad_role_name,
&fetch,
)
.await
.unwrap_err();
assert!(err.to_string().contains("jwk not found"));
let tokens = [jwt1, jwt2, jwt3, jwt4];
let role_names = [role_name1, role_name2];
for role in &role_names {
for token in &tokens {
jwk_cache
.check_jwt(
&RequestMonitoring::test(),
token,
&client,
endpoint.clone(),
role,
&fetch,
)
.await
.unwrap();
}
}
}
}

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, net::SocketAddr};
use std::net::SocketAddr;
use anyhow::Context;
use arc_swap::ArcSwapOption;
@@ -10,8 +10,8 @@ use crate::{
NodeInfo,
},
context::RequestMonitoring,
intern::{BranchIdInt, BranchIdTag, EndpointIdTag, InternId, ProjectIdInt, ProjectIdTag},
EndpointId, RoleName,
intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag},
EndpointId,
};
use super::jwt::{AuthRule, FetchAuthRules, JwkCache};
@@ -48,26 +48,17 @@ impl LocalBackend {
#[derive(Clone, Copy)]
pub(crate) struct StaticAuthRules;
pub static JWKS_ROLE_MAP: ArcSwapOption<JwksRoleSettings> = ArcSwapOption::const_empty();
#[derive(Debug, Clone)]
pub struct JwksRoleSettings {
pub roles: HashMap<RoleName, EndpointJwksResponse>,
pub project_id: ProjectIdInt,
pub branch_id: BranchIdInt,
}
pub static JWKS_ROLE_MAP: ArcSwapOption<EndpointJwksResponse> = ArcSwapOption::const_empty();
impl FetchAuthRules for StaticAuthRules {
async fn fetch_auth_rules(
&self,
_ctx: &RequestMonitoring,
_endpoint: EndpointId,
role_name: RoleName,
) -> anyhow::Result<Vec<AuthRule>> {
let mappings = JWKS_ROLE_MAP.load();
let role_mappings = mappings
.as_deref()
.and_then(|m| m.roles.get(&role_name))
.context("JWKs settings for this role were not configured")?;
let mut rules = vec![];
for setting in &role_mappings.jwks {
@@ -75,6 +66,7 @@ impl FetchAuthRules for StaticAuthRules {
id: setting.id.clone(),
jwks_url: setting.jwks_url.clone(),
audience: setting.jwt_audience.clone(),
role_names: setting.role_names.clone(),
});
}

View File

@@ -1,34 +1,35 @@
use std::{
net::SocketAddr,
path::{Path, PathBuf},
pin::pin,
sync::Arc,
time::Duration,
};
use std::{net::SocketAddr, pin::pin, str::FromStr, sync::Arc, time::Duration};
use anyhow::{bail, ensure};
use anyhow::{bail, ensure, Context};
use camino::{Utf8Path, Utf8PathBuf};
use compute_api::spec::LocalProxySpec;
use dashmap::DashMap;
use futures::{future::Either, FutureExt};
use futures::future::Either;
use proxy::{
auth::backend::local::{JwksRoleSettings, LocalBackend, JWKS_ROLE_MAP},
auth::backend::local::{LocalBackend, JWKS_ROLE_MAP},
cancellation::CancellationHandlerMain,
config::{self, AuthenticationConfig, HttpConfig, ProxyConfig, RetryConfig},
console::{locks::ApiLocks, messages::JwksRoleMapping},
console::{
locks::ApiLocks,
messages::{EndpointJwksResponse, JwksSettings},
},
http::health_server::AppMetrics,
intern::RoleNameInt,
metrics::{Metrics, ThreadPoolMetrics},
rate_limiter::{BucketRateLimiter, EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo},
scram::threadpool::ThreadPool,
serverless::{self, cancel_set::CancelSet, GlobalConnPoolOptions},
RoleName,
};
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
use clap::Parser;
use tokio::{net::TcpListener, task::JoinSet};
use tokio::{net::TcpListener, sync::Notify, task::JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::{project_build_tag, project_git_version, sentry_init::init_sentry};
use utils::{pid_file, project_build_tag, project_git_version, sentry_init::init_sentry};
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
@@ -72,9 +73,12 @@ struct LocalProxyCliArgs {
/// Address of the postgres server
#[clap(long, default_value = "127.0.0.1:5432")]
compute: SocketAddr,
/// File address of the local proxy config file
/// Path of the local proxy config file
#[clap(long, default_value = "./localproxy.json")]
config_path: PathBuf,
config_path: Utf8PathBuf,
/// Path of the local proxy PID file
#[clap(long, default_value = "./localproxy.pid")]
pid_path: Utf8PathBuf,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -126,6 +130,24 @@ async fn main() -> anyhow::Result<()> {
let args = LocalProxyCliArgs::parse();
let config = build_config(&args)?;
// before we bind to any ports, write the process ID to a file
// so that compute-ctl can find our process later
// in order to trigger the appropriate SIGHUP on config change.
//
// This also claims a "lock" that makes sure only one instance
// of local-proxy runs at a time.
let _process_guard = loop {
match pid_file::claim_for_current_process(&args.pid_path) {
Ok(guard) => break guard,
Err(e) => {
// compute-ctl might have tried to read the pid-file to let us
// know about some config change. We should try again.
error!(path=?args.pid_path, "could not claim PID file guard: {e:?}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
};
let metrics_listener = TcpListener::bind(args.metrics).await?.into_std()?;
let http_listener = TcpListener::bind(args.http).await?;
let shutdown = CancellationToken::new();
@@ -139,12 +161,30 @@ async fn main() -> anyhow::Result<()> {
16,
));
refresh_config(args.config_path.clone()).await;
// write the process ID to a file so that compute-ctl can find our process later
// in order to trigger the appropriate SIGHUP on config change.
let pid = std::process::id();
info!("process running in PID {pid}");
std::fs::write(args.pid_path, format!("{pid}\n")).context("writing PID to file")?;
let mut maintenance_tasks = JoinSet::new();
maintenance_tasks.spawn(proxy::handle_signals(shutdown.clone(), move || {
refresh_config(args.config_path.clone()).map(Ok)
let refresh_config_notify = Arc::new(Notify::new());
maintenance_tasks.spawn(proxy::handle_signals(shutdown.clone(), {
let refresh_config_notify = Arc::clone(&refresh_config_notify);
move || {
refresh_config_notify.notify_one();
}
}));
// trigger the first config load **after** setting up the signal hook
// to avoid the race condition where:
// 1. No config file registered when local-proxy starts up
// 2. The config file is written but the signal hook is not yet received
// 3. local-proxy completes startup but has no config loaded, despite there being a registerd config.
refresh_config_notify.notify_one();
tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify));
maintenance_tasks.spawn(proxy::http::health_server::task_main(
metrics_listener,
AppMetrics {
@@ -245,81 +285,84 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
})))
}
async fn refresh_config(path: PathBuf) {
match refresh_config_inner(&path).await {
Ok(()) => {}
Err(e) => {
error!(error=?e, ?path, "could not read config file");
async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc<Notify>) {
loop {
rx.notified().await;
match refresh_config_inner(&path).await {
Ok(()) => {}
Err(e) => {
error!(error=?e, ?path, "could not read config file");
}
}
}
}
async fn refresh_config_inner(path: &Path) -> anyhow::Result<()> {
async fn refresh_config_inner(path: &Utf8Path) -> anyhow::Result<()> {
let bytes = tokio::fs::read(&path).await?;
let mut data: JwksRoleMapping = serde_json::from_slice(&bytes)?;
let data: LocalProxySpec = serde_json::from_slice(&bytes)?;
let mut settings = None;
let mut jwks_set = vec![];
for mapping in data.roles.values_mut() {
for jwks in &mut mapping.jwks {
ensure!(
jwks.jwks_url.has_authority()
&& (jwks.jwks_url.scheme() == "http" || jwks.jwks_url.scheme() == "https"),
"Invalid JWKS url. Must be HTTP",
);
for jwks in data.jwks {
let mut jwks_url = url::Url::from_str(&jwks.jwks_url).context("parsing JWKS url")?;
ensure!(
jwks.jwks_url
.host()
.is_some_and(|h| h != url::Host::Domain("")),
"Invalid JWKS url. No domain listed",
);
ensure!(
jwks_url.has_authority()
&& (jwks_url.scheme() == "http" || jwks_url.scheme() == "https"),
"Invalid JWKS url. Must be HTTP",
);
// clear username, password and ports
jwks.jwks_url.set_username("").expect(
ensure!(
jwks_url.host().is_some_and(|h| h != url::Host::Domain("")),
"Invalid JWKS url. No domain listed",
);
// clear username, password and ports
jwks_url
.set_username("")
.expect("url can be a base and has a valid host and is not a file. should not error");
jwks_url
.set_password(None)
.expect("url can be a base and has a valid host and is not a file. should not error");
// local testing is hard if we need to have a specific restricted port
if cfg!(not(feature = "testing")) {
jwks_url.set_port(None).expect(
"url can be a base and has a valid host and is not a file. should not error",
);
jwks.jwks_url.set_password(None).expect(
"url can be a base and has a valid host and is not a file. should not error",
);
// local testing is hard if we need to have a specific restricted port
if cfg!(not(feature = "testing")) {
jwks.jwks_url.set_port(None).expect(
"url can be a base and has a valid host and is not a file. should not error",
);
}
// clear query params
jwks.jwks_url.set_fragment(None);
jwks.jwks_url.query_pairs_mut().clear().finish();
if jwks.jwks_url.scheme() != "https" {
// local testing is hard if we need to set up https support.
if cfg!(not(feature = "testing")) {
jwks.jwks_url
.set_scheme("https")
.expect("should not error to set the scheme to https if it was http");
} else {
warn!(scheme = jwks.jwks_url.scheme(), "JWKS url is not HTTPS");
}
}
let (pr, br) = settings.get_or_insert((jwks.project_id, jwks.branch_id));
ensure!(
*pr == jwks.project_id,
"inconsistent project IDs configured"
);
ensure!(*br == jwks.branch_id, "inconsistent branch IDs configured");
}
// clear query params
jwks_url.set_fragment(None);
jwks_url.query_pairs_mut().clear().finish();
if jwks_url.scheme() != "https" {
// local testing is hard if we need to set up https support.
if cfg!(not(feature = "testing")) {
jwks_url
.set_scheme("https")
.expect("should not error to set the scheme to https if it was http");
} else {
warn!(scheme = jwks_url.scheme(), "JWKS url is not HTTPS");
}
}
jwks_set.push(JwksSettings {
id: jwks.id,
jwks_url,
provider_name: jwks.provider_name,
jwt_audience: jwks.jwt_audience,
role_names: jwks
.role_names
.into_iter()
.map(RoleName::from)
.map(|s| RoleNameInt::from(&s))
.collect(),
})
}
if let Some((project_id, branch_id)) = settings {
JWKS_ROLE_MAP.store(Some(Arc::new(JwksRoleSettings {
roles: data.roles,
project_id,
branch_id,
})));
}
info!("successfully loaded new config");
JWKS_ROLE_MAP.store(Some(Arc::new(EndpointJwksResponse { jwks: jwks_set })));
Ok(())
}

View File

@@ -133,9 +133,7 @@ async fn main() -> anyhow::Result<()> {
proxy_listener,
cancellation_token.clone(),
));
let signals_task = tokio::spawn(proxy::handle_signals(cancellation_token, || async {
Ok(())
}));
let signals_task = tokio::spawn(proxy::handle_signals(cancellation_token, || {}));
// the signal task cant ever succeed.
// the main task can error, or can succeed on cancellation.

View File

@@ -461,10 +461,7 @@ async fn main() -> anyhow::Result<()> {
// maintenance tasks. these never return unless there's an error
let mut maintenance_tasks = JoinSet::new();
maintenance_tasks.spawn(proxy::handle_signals(
cancellation_token.clone(),
|| async { Ok(()) },
));
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone(), || {}));
maintenance_tasks.spawn(http::health_server::task_main(
http_listener,
AppMetrics {

View File

@@ -1,13 +1,11 @@
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{self, Display};
use crate::auth::IpPattern;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::proxy::retry::CouldRetry;
use crate::RoleName;
/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
@@ -348,11 +346,6 @@ impl ColdStartInfo {
}
}
#[derive(Debug, Deserialize, Clone)]
pub struct JwksRoleMapping {
pub roles: HashMap<RoleName, EndpointJwksResponse>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct EndpointJwksResponse {
pub jwks: Vec<JwksSettings>,
@@ -361,11 +354,10 @@ pub struct EndpointJwksResponse {
#[derive(Debug, Deserialize, Clone)]
pub struct JwksSettings {
pub id: String,
pub project_id: ProjectIdInt,
pub branch_id: BranchIdInt,
pub jwks_url: url::Url,
pub provider_name: String,
pub jwt_audience: Option<String>,
pub role_names: Vec<RoleNameInt>,
}
#[cfg(test)]

View File

@@ -130,14 +130,14 @@ impl<Id: InternId> Default for StringInterner<Id> {
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct RoleNameTag;
pub struct RoleNameTag;
impl InternId for RoleNameTag {
fn get_interner() -> &'static StringInterner<Self> {
static ROLE_NAMES: OnceLock<StringInterner<RoleNameTag>> = OnceLock::new();
ROLE_NAMES.get_or_init(Default::default)
}
}
pub(crate) type RoleNameInt = InternedString<RoleNameTag>;
pub type RoleNameInt = InternedString<RoleNameTag>;
impl From<&RoleName> for RoleNameInt {
fn from(value: &RoleName) -> Self {
RoleNameTag::get_interner().get_or_intern(value)

View File

@@ -82,7 +82,7 @@
impl_trait_overcaptures,
)]
use std::{convert::Infallible, future::Future};
use std::convert::Infallible;
use anyhow::{bail, Context};
use intern::{EndpointIdInt, EndpointIdTag, InternId};
@@ -117,13 +117,12 @@ pub mod usage_metrics;
pub mod waiters;
/// Handle unix signals appropriately.
pub async fn handle_signals<F, Fut>(
pub async fn handle_signals<F>(
token: CancellationToken,
mut refresh_config: F,
) -> anyhow::Result<Infallible>
where
F: FnMut() -> Fut,
Fut: Future<Output = anyhow::Result<()>>,
F: FnMut(),
{
use tokio::signal::unix::{signal, SignalKind};
@@ -136,7 +135,7 @@ where
// Hangup is commonly used for config reload.
_ = hangup.recv() => {
warn!("received SIGHUP");
refresh_config().await?;
refresh_config();
}
// Shut down the whole application.
_ = interrupt.recv() => {

View File

@@ -43,6 +43,13 @@ impl ThreadPool {
pub fn new(n_workers: u8) -> Arc<Self> {
// rayon would be nice here, but yielding in rayon does not work well afaict.
if n_workers == 0 {
return Arc::new(Self {
runtime: None,
metrics: Arc::new(ThreadPoolMetrics::new(n_workers as usize)),
});
}
Arc::new_cyclic(|pool| {
let pool = pool.clone();
let worker_id = AtomicUsize::new(0);

View File

@@ -119,7 +119,7 @@ impl PoolingBackend {
.check_jwt(
ctx,
user_info.endpoint.clone(),
user_info.user.clone(),
&user_info.user,
&StaticAuthRules,
jwt,
)

View File

@@ -642,9 +642,6 @@ class NeonEnvBuilder:
patch_script = ""
for ps in self.env.pageservers:
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
# This is a temporary to get the backward compat test happy
# since the compat snapshot was generated with an older version of neon local
patch_script += f"UPDATE nodes SET availability_zone_id='{ps.az_id}' WHERE node_id = '{ps.id}' AND availability_zone_id IS NULL;"
patch_script_path.write_text(patch_script)
# Update the config with info about tenants and timelines

View File

@@ -8,6 +8,7 @@ import requests
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.utils import wait_until
# Walreceiver as returned by sk's timeline status endpoint.
@@ -161,6 +162,16 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
walreceivers=walreceivers,
)
# Get timeline_start_lsn, waiting until it's nonzero. It is a way to ensure
# that the timeline is fully initialized at the safekeeper.
def get_non_zero_timeline_start_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
def timeline_start_lsn_non_zero() -> Lsn:
s = self.timeline_status(tenant_id, timeline_id).timeline_start_lsn
assert s > Lsn(0)
return s
return wait_until(30, 1, timeline_start_lsn_non_zero)
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn

View File

@@ -21,7 +21,7 @@ from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
)
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
from fixtures.workload import Workload
@@ -156,9 +156,6 @@ ingest_lag_log_line = ".*ingesting record with timestamp lagging more than wait_
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@skip_on_postgres(
PgVersion.V17, "There are no snapshots yet"
) # TODO: revert this once we have snapshots
def test_backward_compatibility(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
@@ -206,9 +203,6 @@ def test_backward_compatibility(
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@skip_on_postgres(
PgVersion.V17, "There are no snapshots yet"
) # TODO: revert this once we have snapshots
def test_forward_compatibility(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,

View File

@@ -2084,8 +2084,13 @@ def test_timeline_copy(neon_env_builder: NeonEnvBuilder, insert_rows: int):
endpoint.safe_psql("create table t(key int, value text)")
timeline_status = env.safekeepers[0].http_client().timeline_status(tenant_id, timeline_id)
timeline_start_lsn = timeline_status.timeline_start_lsn
# Note: currently timelines on sks are created by compute and commit of
# transaction above is finished when 2/3 sks received it, so there is a
# small chance that timeline on this sk is not created/initialized yet,
# hence the usage of waiting function to prevent flakiness.
timeline_start_lsn = (
env.safekeepers[0].http_client().get_non_zero_timeline_start_lsn(tenant_id, timeline_id)
)
log.info(f"Timeline start LSN: {timeline_start_lsn}")
current_percent = 0.0