Compare commits

...

29 Commits

Author SHA1 Message Date
Konstantin Knizhnik
d05cefaae1 Fix mistyping 2025-06-12 08:07:17 +03:00
Konstantin Knizhnik
fad69227d6 Fix bug in neon_redo_read_buffer_filter 2025-06-11 20:20:54 +03:00
Konstantin Knizhnik
4a397638bf Prohibit partial redo of wal records: if record affects several pages then either all of them are reconstructed, either all skipped 2025-06-11 18:19:04 +03:00
Konstantin Knizhnik
b34648c136 Remove XLogWaitForReplayOf at replica to avoid deadlock 2025-06-11 17:56:25 +03:00
Konstantin Knizhnik
24038033bf Remove default from DROP FUNCTION (#12202)
## Problem

DROP FUNCTION doesn't allow to specify default for parameters.

## Summary of changes

Remove DEFAULT clause from pgxn/neon/neon--1.6--1.5.sql

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-11 13:16:58 +00:00
Mikhail
1b935b1958 endpoint_storage: add ?from_endpoint= to /lfc/prewarm (#12195)
Related: https://github.com/neondatabase/cloud/issues/24225
Add optional from_endpoint parameter to allow prewarming from other
endpoint
2025-06-10 19:25:32 +00:00
a-masterov
3f16ca2c18 Respect limits for projects for the Random Operations test (#12184)
## Problem
The project limits were not respected, resulting in errors.
## Summary of changes
Now limits are checked before running an action, and if the action is
not possible to run, another random action will be run.

---------

Co-authored-by: Peter Bendel <peterbendel@neon.tech>
2025-06-10 15:59:51 +00:00
Conrad Ludgate
67b94c5992 [proxy] per endpoint configuration for rate limits (#12148)
https://github.com/neondatabase/cloud/issues/28333

Adds a new `rate_limit` response type to EndpointAccessControl, uses it
for rate limiting, and adds a generic invalidation for the cache.
2025-06-10 14:26:08 +00:00
Folke Behrens
e38193c530 proxy: Move connect_to_compute back to proxy (#12181)
It's mostly responsible for waking, retrying, and caching. A new, thin
wrapper around compute_once will be PGLB's entry point
2025-06-10 11:23:03 +00:00
Konstantin Knizhnik
21949137ed Return last ring index instead of min_ring_index in prefetch_register_bufferv (#12039)
## Problem

See https://github.com/neondatabase/neon/issues/12018

Now `prefetch_register_bufferv` calculates min_ring_index of all vector
requests.
But because of pump prefetch state or connection failure, previous slots
can be already proceeded and reused.

## Summary of changes

Instead of returning minimal index, this function should return last
slot index.
Actually result of this function is used only in two places. A first
place just fort checking (and this check is redundant because the same
check is done in `prefetch_register_bufferv` itself.
And in the second place where index of filled slot is actually used,
there is just one request.
Sp fortunately this bug can cause only assert failure in debug build.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-10 10:09:46 +00:00
Trung Dinh
02f94edb60 Remove global static TENANTS (#12169)
## Problem
There is this TODO in code:
https://github.com/neondatabase/neon/blob/main/pageserver/src/tenant/mgr.rs#L300-L302
This is an old TODO by @jcsp.

## Summary of changes
This PR addresses the TODO. Specifically, it removes a global static
`TENANTS`. Instead the `TenantManager` now directly manages the tenant
map. Enhancing abstraction.

Essentially, this PR moves all module-level methods to inside the
implementation of `TenantManager`.
2025-06-10 09:26:40 +00:00
Conrad Ludgate
58327ef74d [proxy] fix sql-over-http password setting (#12177)
## Problem

Looks like our sql-over-http tests get to rely on "trust"
authentication, so the path that made sure the authkeys data was set was
never being hit.

## Summary of changes

Slight refactor to WakeComputeBackends, as well as making sure auth keys
are propagated. Fix tests to ensure passwords are tested.
2025-06-10 08:46:29 +00:00
Dmitrii Kovalkov
73be6bb736 fix(compute): use proper safekeeper in VotesCollectedMset (#12175)
## Problem
`VotesCollectedMset` uses the wrong safekeeper to update truncateLsn.
This led to some failed assert later in the code during running
safekeeper migration tests.
- Relates to https://github.com/neondatabase/neon/issues/11823

## Summary of changes
Use proper safekeeper to update truncateLsn in VotesCollectedMset
2025-06-10 07:16:42 +00:00
Alex Chi Z.
40d7583906 feat(pageserver): use hostname as feature flag resolver property (#12141)
## Problem

part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

Collect pageserver hostname property so that we can use it in the
PostHog UI. Not sure if this is the best way to do that -- open to
suggestions.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-10 07:10:41 +00:00
Alex Chi Z.
7a68699abb feat(pageserver): support azure time-travel recovery (in an okay way) (#12140)
## Problem

part of https://github.com/neondatabase/neon/issues/7546

Add Azure time travel recovery support. The tricky thing is how Azure
handles deletes in its blob version API. For the following sequence:

```
upload file_1 = a
upload file_1 = b
delete file_1
upload file_1 = c
```

The "delete file_1" won't be stored as a version (as AWS did).
Therefore, we can never rollback to a state where file_1 is temporarily
invisible. If we roll back to the time before file_1 gets created for
the first time, it will be removed correctly.

However, this is fine for pageservers, because (1) having extra files in
the tenant storage is usually fine (2) for things like
timelines/X/index_part-Y.json, it will only be deleted once, so it can
always be recovered to a correct state. Therefore, I don't expect any
issues when this functionality is used on pageserver recovery.

TODO: unit tests for time-travel recovery.

## Summary of changes

Add Azure blob storage time-travel recovery support.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-10 05:32:58 +00:00
Konstantin Knizhnik
f42d44342d Increase statement timeout for test_pageserver_restarts_under_workload test (#12139)
\## Problem

See
https://github.com/neondatabase/neon/issues/12119#issuecomment-2942586090

Page server restarts with interval 1 seconds increases time of vacuum
especially off prefetch is enabled and so cause test failure because of
statement timeout expiration.

## Summary of changes

Increase statement timeout to 360 seconds.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Alexander Lakhin <alexander.lakhin@neon.tech>
2025-06-10 05:32:03 +00:00
Konstantin Knizhnik
d759fcb8bd Increase wait LFC prewarm timeout (#12174)
## Problem

See https://github.com/neondatabase/neon/issues/12171

## Summary of changes

Increase LFC prewarm wait timeout to 1 minute

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-09 18:01:30 +00:00
Alex Chi Z.
76f95f06d8 feat(pageserver): add global timeline count metrics (#12159)
## Problem

We are getting tenants with a lot of branches and num of timelines is a
good indicator of pageserver loads. I added this metrics to help us
better plan pageserver capacities.

## Summary of changes

Add `pageserver_timeline_states_count` with two labels: active +
offloaded.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-09 09:57:36 +00:00
Mikhail
7efd4554ab endpoint_storage: allow bypassing s3 write check on startup (#12165)
Related: https://github.com/neondatabase/cloud/issues/27195
2025-06-06 18:08:02 +00:00
Erik Grinaker
3c7235669a pageserver: don't delete parent shard files until split is committed (#12146)
## Problem

If a shard split fails and must roll back, the tenant may hit a cold
start as the parent shard's files have already been removed from local
disk.

External contribution with minor adjustments, see
https://neondb.slack.com/archives/C08TE3203RQ/p1748246398269309.

## Summary of changes

Keep the parent shard's files on local disk until the split has been
committed, such that they are available if the spilt is rolled back. If
all else fails, the files will be removed on the next Pageserver
restart.

This should also be fine in a mixed version:

* New storcon, old Pageserver: the Pageserver will delete the files
during the split, storcon will log an error when the cleanup detach
fails.

* Old storcon, new Pageserver: the Pageserver will leave the parent's
files around until the next Pageserver restart.

The change looks good to me, but shard splits are delicate so I'd like
some extra eyes on this.
2025-06-06 15:55:14 +00:00
Conrad Ludgate
6dd84041a1 refactor and simplify the invalidation notification structure (#12154)
The current cache invalidation messages are far too specific. They
should be more generic since it only ends up triggering a
`GetEndpointAccessControl` message anyway.

Mappings:
* `/allowed_ips_updated`, `/block_public_or_vpc_access_updated`, and
`/allowed_vpc_endpoints_updated_for_projects` ->
`/project_settings_update`.
* `/allowed_vpc_endpoints_updated_for_org` ->
`/account_settings_update`.
* `/password_updated` -> `/role_setting_update`.

I've also introduced `/endpoint_settings_update`.

All message types support singular or multiple entries, which allows us
to simplify things both on our side and on cplane side.

I'm opening a PR to cplane to apply the above mappings, but for now
using the old phrases to allow both to roll out independently.

This change is inspired by my need to add yet another cached entry to
`GetEndpointAccessControl` for
https://github.com/neondatabase/cloud/issues/28333
2025-06-06 12:49:29 +00:00
Arpad Müller
df7e301a54 safekeeper: special error if a timeline has been deleted (#12155)
We might delete timelines on safekeepers before we are deleting them on
pageservers. This should be an exceptional situation, but can occur. As
the first step to improve behaviour here, emit a special error that is
less scary/obscure than "was not found in global map".

It is for example emitted when the pageserver tries to run
`IDENTIFY_SYSTEM` on a timeline that has been deleted on the safekeeper.

Found when analyzing the failure of
`test_scrubber_physical_gc_timeline_deletion` when enabling
`--timelines-onto-safekeepers` on the pytests.

Due to safekeeper restarts, there is no hard guarantee that we will keep
issuing this error, so we need to think of something better if we start
encountering this in staging/prod. But I would say that the introduction
of `--timelines-onto-safekeepers` in the pytests and into staging won't
change much about this: we are already deleting timelines from there. In
`test_scrubber_physical_gc_timeline_deletion`, we'd just be leaking the
timeline before on the safekeepers.

Part of #11712
2025-06-06 11:54:07 +00:00
Mikhail
470c7d5e0e endpoint_storage: default listen port, allow inline config (#12152)
Related: https://github.com/neondatabase/cloud/issues/27195
2025-06-06 11:48:01 +00:00
Conrad Ludgate
4d99b6ff4d [proxy] separate compute connect from compute authentication (#12145)
## Problem

PGLB/Neonkeeper needs to separate the concerns of connecting to compute,
and authenticating to compute.

Additionally, the code within `connect_to_compute` is rather messy,
spending effort on recovering the authentication info after
wake_compute.

## Summary of changes

Split `ConnCfg` into `ConnectInfo` and `AuthInfo`. `wake_compute` only
returns `ConnectInfo` and `AuthInfo` is determined separately from the
`handshake`/`authenticate` process.

Additionally, `ConnectInfo::connect_raw` is in-charge or establishing
the TLS connection, and the `postgres_client::Config::connect_raw` is
configured to use `NoTls` which will force it to skip the TLS
negotiation. This should just work.
2025-06-06 10:29:55 +00:00
Alexander Sarantcev
590301df08 storcon: Introduce deletion tombstones to support flaky node scenario (#12096)
## Problem

Removed nodes can re-add themselves on restart if not properly
tombstoned. We need a mechanism (e.g. soft-delete flag) to prevent this,
especially in cases where the node is unreachable.

More details there: #12036

## Summary of changes

- Introduced `NodeLifecycle` enum to represent node lifecycle states.
- Added a string representation of `NodeLifecycle` to the `nodes` table.
- Implemented node removal using a tombstone mechanism.
- Introduced `/debug/v1/tombstone*` handlers to manage the tombstone
state.
2025-06-06 10:16:55 +00:00
Erik Grinaker
c511786548 pageserver: move spawn_grpc to GrpcPageServiceHandler::spawn (#12147)
Mechanical move, no logic changes.
2025-06-06 10:01:58 +00:00
Alex Chi Z.
fe31baf985 feat(build): add aws cli into the docker image (#12161)
## Problem

Makes it easier to debug AWS permission issues (i.e., storage scrubber)

## Summary of changes

Install awscliv2 into the docker image.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-06 09:38:58 +00:00
Alex Chi Z.
b23e75ebfe test(pageserver): ensure offload cleans up metrics (#12127)
Add a test to ensure timeline metrics are fully cleaned up after
offloading.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-06 06:50:54 +00:00
Arpad Müller
24d7c37e6e neon_local timeline import: create timelines on safekeepers (#12138)
neon_local's timeline import subcommand creates timelines manually, but
doesn't create them on the safekeepers. If a test then tries to open an
endpoint to read from the timeline, it will error in the new world with
`--timelines-onto-safekeepers`.

Therefore, if that flag is enabled, create the timelines on the
safekeepers.

Note that this import functionality is different from the fast import
feature (https://github.com/neondatabase/neon/issues/10188, #11801).

Part of #11670
As well as part of #11712
2025-06-05 18:53:14 +00:00
89 changed files with 2364 additions and 1245 deletions

18
Cargo.lock generated
View File

@@ -753,6 +753,7 @@ dependencies = [
"axum",
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"headers",
"http 1.1.0",
@@ -761,6 +762,8 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"serde_html_form",
"serde_path_to_error",
"tower 0.5.2",
"tower-layer",
"tower-service",
@@ -1445,6 +1448,7 @@ dependencies = [
"regex",
"reqwest",
"safekeeper_api",
"safekeeper_client",
"scopeguard",
"serde",
"serde_json",
@@ -2054,6 +2058,7 @@ dependencies = [
"axum-extra",
"camino",
"camino-tempfile",
"clap",
"futures",
"http-body-util",
"itertools 0.10.5",
@@ -6420,6 +6425,19 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "serde_html_form"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
dependencies = [
"form_urlencoded",
"indexmap 2.9.0",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_json"
version = "1.0.125"

View File

@@ -71,7 +71,7 @@ aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header"] }
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.71"

View File

@@ -110,6 +110,19 @@ RUN set -e \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
openssl \
unzip \
curl \
&& ARCH=$(uname -m) \
&& if [ "$ARCH" = "x86_64" ]; then \
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"; \
elif [ "$ARCH" = "aarch64" ]; then \
curl "https://awscli.amazonaws.com/awscli-exe-linux-aarch64.zip" -o "awscliv2.zip"; \
else \
echo "Unsupported architecture: $ARCH" && exit 1; \
fi \
&& unzip awscliv2.zip \
&& ./aws/install \
&& rm -rf aws awscliv2.zip \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -785,7 +785,7 @@ impl ComputeNode {
self.spawn_extension_stats_task();
if pspec.spec.autoprewarm {
self.prewarm_lfc();
self.prewarm_lfc(None);
}
Ok(())
}

View File

@@ -25,11 +25,16 @@ struct EndpointStoragePair {
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
endpoint_id: Option<String>,
) -> Result<Self> {
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
let Some(ref endpoint_id) = endpoint_id else {
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
@@ -84,7 +89,7 @@ impl ComputeNode {
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -97,7 +102,7 @@ impl ComputeNode {
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl().await else {
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
@@ -109,13 +114,14 @@ impl ComputeNode {
true
}
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
/// from_endpoint: None for endpoint managed by this compute_ctl
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
}
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
@@ -173,7 +179,7 @@ impl ComputeNode {
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();

View File

@@ -2,6 +2,7 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
@@ -16,8 +17,16 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
Json(compute.lfc_offload_state())
}
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
#[derive(serde::Deserialize)]
pub struct PrewarmQuery {
pub from_endpoint: String,
}
pub(in crate::http) async fn prewarm(
compute: Compute,
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
) -> Response {
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(

View File

@@ -36,6 +36,7 @@ pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_backend.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
postgres_connection.workspace = true
storage_broker.workspace = true
http-utils.workspace = true

View File

@@ -45,7 +45,7 @@ use pageserver_api::models::{
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId};
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
@@ -1255,6 +1255,45 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
pageserver
.timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
.await?;
if env.storage_controller.timelines_onto_safekeepers {
println!("Creating timeline on safekeeper ...");
let timeline_info = pageserver
.timeline_info(
TenantShardId::unsharded(tenant_id),
timeline_id,
pageserver_client::mgmt_api::ForceAwaitLogicalSize::No,
)
.await?;
let default_sk = SafekeeperNode::from_env(env, env.safekeepers.first().unwrap());
let default_host = default_sk
.conf
.listen_addr
.clone()
.unwrap_or_else(|| "localhost".to_string());
let mconf = safekeeper_api::membership::Configuration {
generation: SafekeeperGeneration::new(1),
members: safekeeper_api::membership::MemberSet {
m: vec![SafekeeperId {
host: default_host,
id: default_sk.conf.id,
pg_port: default_sk.conf.pg_port,
}],
},
new_members: None,
};
let pg_version = args.pg_version * 10000;
let req = safekeeper_api::models::TimelineCreateRequest {
tenant_id,
timeline_id,
mconf,
pg_version,
system_id: None,
wal_seg_size: None,
start_lsn: timeline_info.last_record_lsn,
commit_lsn: None,
};
default_sk.create_timeline(&req).await?;
}
env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
println!("Done");
}

View File

@@ -635,4 +635,16 @@ impl PageServerNode {
Ok(())
}
pub async fn timeline_info(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
force_await_logical_size: mgmt_api::ForceAwaitLogicalSize,
) -> anyhow::Result<TimelineInfo> {
let timeline_info = self
.http_client
.timeline_info(tenant_shard_id, timeline_id, force_await_logical_size)
.await?;
Ok(timeline_info)
}
}

View File

@@ -6,7 +6,6 @@
//! .neon/safekeepers/<safekeeper id>
//! ```
use std::error::Error as _;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
use std::time::Duration;
@@ -14,9 +13,9 @@ use std::{io, result};
use anyhow::Context;
use camino::Utf8PathBuf;
use http_utils::error::HttpErrorBody;
use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method};
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_client::mgmt_api;
use thiserror::Error;
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
@@ -35,25 +34,14 @@ pub enum SafekeeperHttpError {
type Result<T> = result::Result<T, SafekeeperHttpError>;
pub(crate) trait ResponseErrorMessageExt: Sized {
fn error_from_body(self) -> impl Future<Output = Result<Self>> + Send;
}
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = self.url().to_owned();
Err(SafekeeperHttpError::Response(
match self.json::<HttpErrorBody>().await {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
fn err_from_client_err(err: mgmt_api::Error) -> SafekeeperHttpError {
use mgmt_api::Error::*;
match err {
ApiError(_, str) => SafekeeperHttpError::Response(str),
Cancelled => SafekeeperHttpError::Response("Cancelled".to_owned()),
ReceiveBody(err) => SafekeeperHttpError::Transport(err),
ReceiveErrorBody(err) => SafekeeperHttpError::Response(err),
Timeout(str) => SafekeeperHttpError::Response(format!("timeout: {str}")),
}
}
@@ -70,9 +58,8 @@ pub struct SafekeeperNode {
pub pg_connection_config: PgConnectionConfig,
pub env: LocalEnv,
pub http_client: reqwest::Client,
pub http_client: mgmt_api::Client,
pub listen_addr: String,
pub http_base_url: String,
}
impl SafekeeperNode {
@@ -82,13 +69,14 @@ impl SafekeeperNode {
} else {
"127.0.0.1".to_string()
};
let jwt = None;
let http_base_url = format!("http://{}:{}", listen_addr, conf.http_port);
SafekeeperNode {
id: conf.id,
conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port),
env: env.clone(),
http_client: env.create_http_client(),
http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port),
http_client: mgmt_api::Client::new(env.create_http_client(), http_base_url, jwt),
listen_addr,
}
}
@@ -278,20 +266,19 @@ impl SafekeeperNode {
)
}
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> reqwest::RequestBuilder {
// TODO: authentication
//if self.env.auth_type == AuthType::NeonJWT {
// builder = builder.bearer_auth(&self.env.safekeeper_auth_token)
//}
self.http_client.request(method, url)
pub async fn check_status(&self) -> Result<()> {
self.http_client
.status()
.await
.map_err(err_from_client_err)?;
Ok(())
}
pub async fn check_status(&self) -> Result<()> {
self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status"))
.send()
.await?
.error_from_body()
.await?;
pub async fn create_timeline(&self, req: &TimelineCreateRequest) -> Result<()> {
self.http_client
.create_timeline(req)
.await
.map_err(err_from_client_err)?;
Ok(())
}
}

View File

@@ -61,10 +61,16 @@ enum Command {
#[arg(long)]
scheduling: Option<NodeSchedulingPolicy>,
},
// Set a node status as deleted.
NodeDelete {
#[arg(long)]
node_id: NodeId,
},
/// Delete a tombstone of node from the storage controller.
NodeDeleteTombstone {
#[arg(long)]
node_id: NodeId,
},
/// Modify a tenant's policies in the storage controller
TenantPolicy {
#[arg(long)]
@@ -82,6 +88,8 @@ enum Command {
},
/// List nodes known to the storage controller
Nodes {},
/// List soft deleted nodes known to the storage controller
NodeTombstones {},
/// List tenants known to the storage controller
Tenants {
/// If this field is set, it will list the tenants on a specific node
@@ -900,6 +908,39 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?;
}
Command::NodeDeleteTombstone { node_id } => {
storcon_client
.dispatch::<(), ()>(
Method::DELETE,
format!("debug/v1/tombstone/{node_id}"),
None,
)
.await?;
}
Command::NodeTombstones {} => {
let mut resp = storcon_client
.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"debug/v1/tombstone".to_string(),
None,
)
.await?;
resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
let mut table = comfy_table::Table::new();
table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
for node in resp {
table.add_row([
format!("{}", node.id),
node.listen_http_addr,
node.availability_zone_id,
format!("{:?}", node.scheduling),
format!("{:?}", node.availability),
]);
}
println!("{table}");
}
Command::TenantSetTimeBasedEviction {
tenant_id,
period,

View File

@@ -8,6 +8,7 @@ anyhow.workspace = true
axum-extra.workspace = true
axum.workspace = true
camino.workspace = true
clap.workspace = true
futures.workspace = true
jsonwebtoken.workspace = true
prometheus.workspace = true

View File

@@ -4,6 +4,8 @@
//! for large computes.
mod app;
use anyhow::Context;
use clap::Parser;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tracing::info;
use utils::logging;
@@ -12,9 +14,26 @@ const fn max_upload_file_limit() -> usize {
100 * 1024 * 1024
}
const fn listen() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 51243)
}
#[derive(Parser)]
struct Args {
#[arg(exclusive = true)]
config_file: Option<String>,
#[arg(long, default_value = "false", requires = "config")]
/// to allow testing k8s helm chart where we don't have s3 credentials
no_s3_check_on_startup: bool,
#[arg(long, value_name = "FILE")]
/// inline config mode for k8s helm chart
config: Option<String>,
}
#[derive(serde::Deserialize)]
#[serde(tag = "type")]
struct Config {
#[serde(default = "listen")]
listen: std::net::SocketAddr,
pemfile: camino::Utf8PathBuf,
#[serde(flatten)]
@@ -31,13 +50,18 @@ async fn main() -> anyhow::Result<()> {
logging::Output::Stdout,
)?;
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: endpoint_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
let config: Config = serde_json::from_str(&config).context("parsing config")?;
let args = Args::parse();
let config: Config = if let Some(config_path) = args.config_file {
info!("Reading config from {config_path}");
let config = std::fs::read_to_string(config_path)?;
serde_json::from_str(&config).context("parsing config")?
} else if let Some(config) = args.config {
info!("Reading inline config");
serde_json::from_str(&config).context("parsing config")?
} else {
anyhow::bail!("Supply either config file path or --config=inline-config");
};
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
@@ -48,7 +72,9 @@ async fn main() -> anyhow::Result<()> {
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
if !args.no_s3_check_on_startup {
app::check_storage_permissions(&storage, cancel.clone()).await?;
}
let proxy = std::sync::Arc::new(endpoint_storage::Storage {
auth,

View File

@@ -344,6 +344,35 @@ impl Default for ShardSchedulingPolicy {
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum NodeLifecycle {
Active,
Deleted,
}
impl FromStr for NodeLifecycle {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"active" => Ok(Self::Active),
"deleted" => Ok(Self::Deleted),
_ => Err(anyhow::anyhow!("Unknown node lifecycle '{s}'")),
}
}
}
impl From<NodeLifecycle> for String {
fn from(value: NodeLifecycle) -> String {
use NodeLifecycle::*;
match value {
Active => "active",
Deleted => "deleted",
}
.to_string()
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum NodeSchedulingPolicy {
Active,

View File

@@ -10,7 +10,7 @@ use crate::{Error, cancel_query_raw, connect_socket};
pub(crate) async fn cancel_query<T>(
config: Option<SocketConfig>,
ssl_mode: SslMode,
mut tls: T,
tls: T,
process_id: i32,
secret_key: i32,
) -> Result<(), Error>

View File

@@ -17,7 +17,6 @@ use crate::{Client, Connection, Error};
/// TLS configuration.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum SslMode {
/// Do not use TLS.
Disable,
@@ -231,7 +230,7 @@ impl Config {
/// Requires the `runtime` Cargo feature (enabled by default).
pub async fn connect<T>(
&self,
tls: T,
tls: &T,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where
T: MakeTlsConnect<TcpStream>,

View File

@@ -13,7 +13,7 @@ use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
pub async fn connect<T>(
mut tls: T,
tls: &T,
config: &Config,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where

View File

@@ -47,7 +47,7 @@ pub trait MakeTlsConnect<S> {
/// Creates a new `TlsConnect`or.
///
/// The domain name is provided for certificate verification and SNI.
fn make_tls_connect(&mut self, domain: &str) -> Result<Self::TlsConnect, Self::Error>;
fn make_tls_connect(&self, domain: &str) -> Result<Self::TlsConnect, Self::Error>;
}
/// An asynchronous function wrapping a stream in a TLS session.
@@ -85,7 +85,7 @@ impl<S> MakeTlsConnect<S> for NoTls {
type TlsConnect = NoTls;
type Error = NoTlsError;
fn make_tls_connect(&mut self, _: &str) -> Result<NoTls, NoTlsError> {
fn make_tls_connect(&self, _: &str) -> Result<NoTls, NoTlsError> {
Ok(NoTls)
}
}

View File

@@ -10,7 +10,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::{env, io};
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
use azure_storage::StorageCredentials;
@@ -37,6 +37,7 @@ use crate::metrics::{AttemptOutcome, RequestKind, start_measuring_requests};
use crate::{
ConcurrencyLimiter, Download, DownloadError, DownloadKind, DownloadOpts, Listing, ListingMode,
ListingObject, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
Version, VersionKind,
};
pub struct AzureBlobStorage {
@@ -405,6 +406,39 @@ impl AzureBlobStorage {
pub fn container_name(&self) -> &str {
&self.container_name
}
async fn list_versions_with_permit(
&self,
_permit: &tokio::sync::SemaphorePermit<'_>,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
let customize_builder = |mut builder: ListBlobsBuilder| {
builder = builder.include_versions(true);
// We do not return this info back to `VersionListing` yet.
builder = builder.include_deleted(true);
builder
};
let kind = RequestKind::ListVersions;
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
prefix,
mode,
max_keys,
cancel,
kind,
customize_builder
));
let mut combined: crate::VersionListing =
stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.versions.extend(list.versions.into_iter());
}
Ok(combined)
}
}
trait ListingCollector {
@@ -488,27 +522,10 @@ impl RemoteStorage for AzureBlobStorage {
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> std::result::Result<crate::VersionListing, DownloadError> {
let customize_builder = |mut builder: ListBlobsBuilder| {
builder = builder.include_versions(true);
builder
};
let kind = RequestKind::ListVersions;
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
prefix,
mode,
max_keys,
cancel,
kind,
customize_builder
));
let mut combined: crate::VersionListing =
stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.versions.extend(list.versions.into_iter());
}
Ok(combined)
let permit = self.permit(kind, cancel).await?;
self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
.await
}
async fn head_object(
@@ -803,14 +820,158 @@ impl RemoteStorage for AzureBlobStorage {
async fn time_travel_recover(
&self,
_prefix: Option<&RemotePath>,
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: &CancellationToken,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
// TODO use Azure point in time recovery feature for this
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
Err(TimeTravelError::Unimplemented)
let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected "
.to_string()
+ "for some specific files. If a file gets deleted but then overwritten and we want to recover "
+ "to the time during the file was not present, this functionality will recover the file. Only "
+ "use the functionality for services that can tolerate this. For example, recovering a state of the "
+ "pageserver tenants.";
tracing::error!("{}", msg);
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, None, cancel)
.await
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),
DownloadError::Cancelled => TimeTravelError::Cancelled,
other => TimeTravelError::Other(other.into()),
})?;
let versions_and_deletes = version_listing.versions;
tracing::info!(
"Built list for time travel with {} versions and deletions",
versions_and_deletes.len()
);
// Work on the list of references instead of the objects directly,
// otherwise we get lifetime errors in the sort_by_key call below.
let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
for vd in &versions_and_deletes {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"
)));
}
tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
vds_for_key.entry(key).or_default().push(vd);
}
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
for (key, versions) in vds_for_key {
let last_vd = versions.last().unwrap();
let key = self.relative_path_to_name(key);
if last_vd.last_modified > done_if_after {
tracing::debug!("Key {key} has version later than done_if_after, skipping");
continue;
}
// the version we want to restore to.
let version_to_restore_to =
match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
Ok(v) => v,
Err(e) => e,
};
if version_to_restore_to == versions.len() {
tracing::debug!("Key {key} has no changes since timestamp, skipping");
continue;
}
let mut do_delete = false;
if version_to_restore_to == 0 {
// All versions more recent, so the key didn't exist at the specified time point.
tracing::debug!(
"All {} versions more recent for {key}, deleting",
versions.len()
);
do_delete = true;
} else {
match &versions[version_to_restore_to - 1] {
Version {
kind: VersionKind::Version(version_id),
..
} => {
let source_url = format!(
"{}/{}?versionid={}",
self.client
.url()
.map_err(|e| TimeTravelError::Other(anyhow!("{e}")))?,
key,
version_id.0
);
tracing::debug!(
"Promoting old version {} for {key} at {}...",
version_id.0,
source_url
);
backoff::retry(
|| async {
let blob_client = self.client.blob_client(key.clone());
let op = blob_client.copy(Url::from_str(&source_url).unwrap());
tokio::select! {
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"copying object version for time_travel_recover",
cancel,
)
.await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::info!(?version_id, %key, "Copied old version in Azure blob storage");
}
Version {
kind: VersionKind::DeletionMarker,
..
} => {
do_delete = true;
}
}
};
if do_delete {
if matches!(last_vd.kind, VersionKind::DeletionMarker) {
// Key has since been deleted (but there was some history), no need to do anything
tracing::debug!("Key {key} already deleted, skipping.");
} else {
tracing::debug!("Deleting {key}...");
self.delete(&RemotePath::from_string(&key).unwrap(), cancel)
.await
.map_err(|e| {
// delete_oid0 will use TimeoutOrCancel
if TimeoutOrCancel::caused_by_cancel(&e) {
TimeTravelError::Cancelled
} else {
TimeTravelError::Other(e)
}
})?;
}
}
}
Ok(())
}
}

View File

@@ -1022,6 +1022,7 @@ impl RemoteStorage for S3Bucket {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
// TODO: check the behavior of using the SDK on a non-versioned container
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"

View File

@@ -13,7 +13,7 @@ use utils::pageserver_feedback::PageserverFeedback;
use crate::membership::Configuration;
use crate::{ServerInfo, Term};
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct SafekeeperStatus {
pub id: NodeId,
}

View File

@@ -23,6 +23,7 @@ use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::feature_resolver::FeatureResolver;
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::page_service::GrpcPageServiceHandler;
use pageserver::task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
};
@@ -572,7 +573,8 @@ fn start_pageserver(
tokio::sync::mpsc::unbounded_channel();
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = mgr::init(
conf,
background_purges.clone(),
TenantSharedResources {
@@ -583,10 +585,10 @@ fn start_pageserver(
basebackup_prepare_sender,
feature_resolver,
},
order,
shutdown_pageserver.clone(),
))?;
);
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?;
let basebackup_cache = BasebackupCache::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -814,7 +816,7 @@ fn start_pageserver(
// necessary?
let mut page_service_grpc = None;
if let Some(grpc_listener) = grpc_listener {
page_service_grpc = Some(page_service::spawn_grpc(
page_service_grpc = Some(GrpcPageServiceHandler::spawn(
tenant_manager.clone(),
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),

View File

@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use pageserver_api::config::NodeMetadata;
use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
PostHogFlagFilterPropertyValue,
@@ -86,7 +87,35 @@ impl FeatureResolver {
}
}
}
// TODO: add pageserver URL.
// TODO: move this to a background task so that we don't block startup in case of slow disk
let metadata_path = conf.metadata_path();
match std::fs::read_to_string(&metadata_path) {
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
Ok(metadata) => {
properties.insert(
"hostname".to_string(),
PostHogFlagFilterPropertyValue::String(metadata.http_host),
);
if let Some(cplane_region) = metadata.other.get("region_id") {
if let Some(cplane_region) = cplane_region.as_str() {
// This region contains the cell number
properties.insert(
"neon_region".to_string(),
PostHogFlagFilterPropertyValue::String(
cplane_region.to_string(),
),
);
}
}
}
Err(e) => {
tracing::warn!("Failed to parse metadata.json: {}", e);
}
},
Err(e) => {
tracing::warn!("Failed to read metadata.json: {}", e);
}
}
Arc::new(properties)
};
let fake_tenants = {

View File

@@ -1053,6 +1053,15 @@ pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("Failed to register pageserver_tenant_states_count metric")
});
pub(crate) static TIMELINE_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_timeline_states_count",
"Count of timelines per state",
&["state"]
)
.expect("Failed to register pageserver_timeline_states_count metric")
});
/// A set of broken tenants.
///
/// These are expected to be so rare that a set is fine. Set as in a new timeseries per each broken
@@ -3325,6 +3334,8 @@ impl TimelineMetrics {
&timeline_id,
);
TIMELINE_STATE_METRIC.with_label_values(&["active"]).inc();
TimelineMetrics {
tenant_id,
shard_id,
@@ -3479,6 +3490,8 @@ impl TimelineMetrics {
return;
}
TIMELINE_STATE_METRIC.with_label_values(&["active"]).dec();
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let shard_id = &self.shard_id;

View File

@@ -169,99 +169,6 @@ pub fn spawn(
Listener { cancel, task }
}
/// Spawns a gRPC server for the page service.
///
/// TODO: move this onto GrpcPageServiceHandler::spawn().
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn_grpc(
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
listener: std::net::TcpListener,
) -> anyhow::Result<CancellableTask> {
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
let incoming = {
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
listener.set_nonblocking(true)?;
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
.with_nodelay(Some(GRPC_TCP_NODELAY))
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
};
// Set up the gRPC server.
//
// TODO: consider tuning window sizes.
let mut server = tonic::transport::Server::builder()
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
//
// * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
//
// * Layers: allow async code, can run code after the service response. However, only has access
// to the raw HTTP request/response, not the gRPC types.
let page_service_handler = GrpcPageServiceHandler {
tenant_manager,
ctx,
gate_guard: gate.enter().expect("gate was just created"),
get_vectored_concurrent_io,
};
let observability_layer = ObservabilityLayer;
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let page_service = tower::ServiceBuilder::new()
// Create tracing span and record request start time.
.layer(observability_layer)
// Intercept gRPC requests.
.layer(tonic::service::InterceptorLayer::new(move |mut req| {
// Extract tenant metadata.
req = tenant_interceptor.call(req)?;
// Authenticate tenant JWT token.
req = auth_interceptor.call(req)?;
Ok(req)
}))
.service(proto::PageServiceServer::new(page_service_handler));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
async move {
let result = server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
},
));
Ok(CancellableTask { task, cancel })
}
impl Listener {
pub async fn stop_accepting(self) -> Connections {
self.cancel.cancel();
@@ -3366,6 +3273,101 @@ pub struct GrpcPageServiceHandler {
}
impl GrpcPageServiceHandler {
/// Spawns a gRPC server for the page service.
///
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn(
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
listener: std::net::TcpListener,
) -> anyhow::Result<CancellableTask> {
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
let incoming = {
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
listener.set_nonblocking(true)?;
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(
listener,
)?)
.with_nodelay(Some(GRPC_TCP_NODELAY))
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
};
// Set up the gRPC server.
//
// TODO: consider tuning window sizes.
let mut server = tonic::transport::Server::builder()
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
//
// * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
//
// * Layers: allow async code, can run code after the service response. However, only has access
// to the raw HTTP request/response, not the gRPC types.
let page_service_handler = GrpcPageServiceHandler {
tenant_manager,
ctx,
gate_guard: gate.enter().expect("gate was just created"),
get_vectored_concurrent_io,
};
let observability_layer = ObservabilityLayer;
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let page_service = tower::ServiceBuilder::new()
// Create tracing span and record request start time.
.layer(observability_layer)
// Intercept gRPC requests.
.layer(tonic::service::InterceptorLayer::new(move |mut req| {
// Extract tenant metadata.
req = tenant_interceptor.call(req)?;
// Authenticate tenant JWT token.
req = auth_interceptor.call(req)?;
Ok(req)
}))
// Run the page service.
.service(proto::PageServiceServer::new(page_service_handler));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
async move {
let result = server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
},
));
Ok(CancellableTask { task, cancel })
}
/// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
/// relations and their sizes, as well as SLRU segments and similar data.
#[allow(clippy::result_large_err)]

View File

@@ -89,7 +89,8 @@ use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_OFFLOADED_TIMELINES,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, TIMELINE_STATE_METRIC,
remove_tenant_metrics,
};
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
@@ -544,6 +545,28 @@ pub struct OffloadedTimeline {
/// Part of the `OffloadedTimeline` object's lifecycle: this needs to be set before we drop it
pub deleted_from_ancestor: AtomicBool,
_metrics_guard: OffloadedTimelineMetricsGuard,
}
/// Increases the offloaded timeline count metric when created, and decreases when dropped.
struct OffloadedTimelineMetricsGuard;
impl OffloadedTimelineMetricsGuard {
fn new() -> Self {
TIMELINE_STATE_METRIC
.with_label_values(&["offloaded"])
.inc();
Self
}
}
impl Drop for OffloadedTimelineMetricsGuard {
fn drop(&mut self) {
TIMELINE_STATE_METRIC
.with_label_values(&["offloaded"])
.dec();
}
}
impl OffloadedTimeline {
@@ -576,6 +599,8 @@ impl OffloadedTimeline {
delete_progress: timeline.delete_progress.clone(),
deleted_from_ancestor: AtomicBool::new(false),
_metrics_guard: OffloadedTimelineMetricsGuard::new(),
})
}
fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self {
@@ -595,6 +620,7 @@ impl OffloadedTimeline {
archived_at,
delete_progress: TimelineDeleteProgress::default(),
deleted_from_ancestor: AtomicBool::new(false),
_metrics_guard: OffloadedTimelineMetricsGuard::new(),
}
}
fn manifest(&self) -> OffloadedTimelineManifest {

File diff suppressed because it is too large Load Diff

View File

@@ -1055,8 +1055,8 @@ pub(crate) enum WaitLsnWaiter<'a> {
/// Argument to [`Timeline::shutdown`].
#[derive(Debug, Clone, Copy)]
pub(crate) enum ShutdownMode {
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk. This method can
/// take multiple seconds for a busy timeline.
///
/// While we are flushing, we continue to accept read I/O for LSNs ingested before
/// the call to [`Timeline::shutdown`].

View File

@@ -1092,13 +1092,15 @@ communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
MyPState->ring_last <= ring_index);
}
/* internal version. Returns the ring index */
/* Internal version. Returns the ring index of the last block (result of this function is used only
* when nblocks==1)
*/
static uint64
prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask,
bool is_prefetch)
{
uint64 min_ring_index;
uint64 last_ring_index;
PrefetchRequest hashkey;
#ifdef USE_ASSERT_CHECKING
bool any_hits = false;
@@ -1122,13 +1124,12 @@ Retry:
MyPState->ring_unused - MyPState->ring_receive;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
last_ring_index = UINT64_MAX;
min_ring_index = UINT64_MAX;
for (int i = 0; i < nblocks; i++)
{
PrefetchRequest *slot = NULL;
PrfHashEntry *entry = NULL;
uint64 ring_index;
neon_request_lsns *lsns;
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
@@ -1152,12 +1153,12 @@ Retry:
if (entry != NULL)
{
slot = entry->slot;
ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(ring_index));
last_ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(last_ring_index));
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index < MyPState->ring_unused);
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
/*
@@ -1169,9 +1170,9 @@ Retry:
if (!neon_prefetch_response_usable(lsns, slot))
{
/* Wait for the old request to finish and discard it */
if (!prefetch_wait_for(ring_index))
if (!prefetch_wait_for(last_ring_index))
goto Retry;
prefetch_set_unused(ring_index);
prefetch_set_unused(last_ring_index);
entry = NULL;
slot = NULL;
pgBufferUsage.prefetch.expired += 1;
@@ -1188,13 +1189,12 @@ Retry:
*/
if (slot->status == PRFS_TAG_REMAINS)
{
prefetch_set_unused(ring_index);
prefetch_set_unused(last_ring_index);
entry = NULL;
slot = NULL;
}
else
{
min_ring_index = Min(min_ring_index, ring_index);
/* The buffered request is good enough, return that index */
if (is_prefetch)
pgBufferUsage.prefetch.duplicates++;
@@ -1283,12 +1283,12 @@ Retry:
* The next buffer pointed to by `ring_unused` is now definitely empty, so
* we can insert the new request to it.
*/
ring_index = MyPState->ring_unused;
last_ring_index = MyPState->ring_unused;
Assert(MyPState->ring_last <= ring_index &&
ring_index <= MyPState->ring_unused);
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index <= MyPState->ring_unused);
slot = GetPrfSlotNoCheck(ring_index);
slot = GetPrfSlotNoCheck(last_ring_index);
Assert(slot->status == PRFS_UNUSED);
@@ -1298,11 +1298,9 @@ Retry:
*/
slot->buftag = hashkey.buftag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
slot->my_ring_index = last_ring_index;
slot->flags = 0;
min_ring_index = Min(min_ring_index, ring_index);
if (is_prefetch)
MyNeonCounters->getpage_prefetch_requests_total++;
else
@@ -1315,11 +1313,12 @@ Retry:
MyPState->ring_unused - MyPState->ring_receive;
Assert(any_hits);
Assert(last_ring_index != UINT64_MAX);
Assert(GetPrfSlot(min_ring_index)->status == PRFS_REQUESTED ||
GetPrfSlot(min_ring_index)->status == PRFS_RECEIVED);
Assert(MyPState->ring_last <= min_ring_index &&
min_ring_index < MyPState->ring_unused);
Assert(GetPrfSlot(last_ring_index)->status == PRFS_REQUESTED ||
GetPrfSlot(last_ring_index)->status == PRFS_RECEIVED);
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
@@ -1335,7 +1334,7 @@ Retry:
MyPState->ring_flush = MyPState->ring_unused;
}
return min_ring_index;
return last_ring_index;
}
static bool
@@ -2086,9 +2085,6 @@ communicator_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber ba
start_ts = GetCurrentTimestamp();
if (RecoveryInProgress() && MyBackendType != B_STARTUP)
XLogWaitForReplayOf(reqlsns->request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/

View File

@@ -2,6 +2,6 @@ DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer);

View File

@@ -2382,6 +2382,8 @@ get_fsm_physical_block(BlockNumber heapblk)
static bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
static XLogRecPtr last_recptr = InvalidXLogRecPtr;
static bool last_no_redo_needed;
XLogRecPtr end_recptr = record->EndRecPtr;
NRelFileInfo rinfo;
ForkNumber forknum;
@@ -2390,69 +2392,88 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
uint32 hash;
LWLock *partitionLock;
int buf_id;
bool no_redo_needed;
bool no_redo_needed = true;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
tag.blockNum = blkno;
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
if (last_recptr != end_recptr)
{
no_redo_needed = false;
#if PG_VERSION_NUM < 150000
int max_block_id = record->max_block_id;
#else
int max_block_id = XLogRecMaxBlockId(record);
#endif
for (int block_id = 0; block_id <= max_block_id && no_redo_needed; block_id++)
{
if (XLogRecHasBlockRef(record, block_id))
{
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
tag.blockNum = blkno;
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
{
no_redo_needed = false;
}
else
{
/* Try to find the relevant buffer */
buf_id = BufTableLookup(&tag, hash);
no_redo_needed = buf_id < 0;
}
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
*/
if (no_redo_needed)
{
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
}
}
}
last_recptr = end_recptr;
last_no_redo_needed = no_redo_needed;
}
else
{
/* Try to find the relevant buffer */
buf_id = BufTableLookup(&tag, hash);
no_redo_needed = buf_id < 0;
}
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
*/
if (no_redo_needed)
{
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
no_redo_needed = last_no_redo_needed;
}
return no_redo_needed;
}

View File

@@ -1135,7 +1135,7 @@ VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
wp->propTermStartLsn = sk->voteResponse.flushLsn;
wp->donor = sk;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
wp->truncateLsn = Max(sk->voteResponse.truncateLsn, wp->truncateLsn);
if (n_votes > 0)
appendStringInfoString(s, ", ");

View File

@@ -18,11 +18,6 @@ pub(super) async fn authenticate(
secret: AuthSecret,
) -> auth::Result<ComputeCredentials> {
let scram_keys = match secret {
#[cfg(any(test, feature = "testing"))]
AuthSecret::Md5(_) => {
debug!("auth endpoint chooses MD5");
return Err(auth::AuthError::MalformedPassword("MD5 not supported"));
}
AuthSecret::Scram(secret) => {
debug!("auth endpoint chooses SCRAM");

View File

@@ -6,18 +6,17 @@ use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
use super::ComputeCredentialKeys;
use crate::auth::IpPattern;
use crate::auth::backend::ComputeUserInfo;
use crate::cache::Cached;
use crate::compute::AuthInfo;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::client::cplane_proxy_v1;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::{ReportableError, UserFacingError};
use crate::pglb::connect_compute::ComputeConnectBackend;
use crate::pqproto::BeMessage;
use crate::proxy::NeonOptions;
use crate::proxy::wake_compute::WakeComputeBackend;
use crate::stream::PqStream;
use crate::types::RoleName;
use crate::{auth, compute, waiters};
@@ -98,15 +97,11 @@ impl ConsoleRedirectBackend {
ctx: &RequestContext,
auth_config: &'static AuthenticationConfig,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<(
ConsoleRedirectNodeInfo,
ComputeUserInfo,
Option<Vec<IpPattern>>,
)> {
) -> auth::Result<(ConsoleRedirectNodeInfo, AuthInfo, ComputeUserInfo)> {
authenticate(ctx, auth_config, &self.console_uri, client)
.await
.map(|(node_info, user_info, ip_allowlist)| {
(ConsoleRedirectNodeInfo(node_info), user_info, ip_allowlist)
.map(|(node_info, auth_info, user_info)| {
(ConsoleRedirectNodeInfo(node_info), auth_info, user_info)
})
}
}
@@ -114,17 +109,13 @@ impl ConsoleRedirectBackend {
pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo);
#[async_trait]
impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
impl WakeComputeBackend for ConsoleRedirectNodeInfo {
async fn wake_compute(
&self,
_ctx: &RequestContext,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
Ok(Cached::new_uncached(self.0.clone()))
}
fn get_keys(&self) -> &ComputeCredentialKeys {
&ComputeCredentialKeys::None
}
}
async fn authenticate(
@@ -132,7 +123,7 @@ async fn authenticate(
auth_config: &'static AuthenticationConfig,
link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<(NodeInfo, ComputeUserInfo, Option<Vec<IpPattern>>)> {
) -> auth::Result<(NodeInfo, AuthInfo, ComputeUserInfo)> {
ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect);
// registering waiter can fail if we get unlucky with rng.
@@ -192,10 +183,24 @@ async fn authenticate(
client.write_message(BeMessage::NoticeResponse("Connecting to database."));
// This config should be self-contained, because we won't
// take username or dbname from client's startup message.
let mut config = compute::ConnCfg::new(db_info.host.to_string(), db_info.port);
config.dbname(&db_info.dbname).user(&db_info.user);
// Backwards compatibility. pg_sni_proxy uses "--" in domain names
// while direct connections do not. Once we migrate to pg_sni_proxy
// everywhere, we can remove this.
let ssl_mode = if db_info.host.contains("--") {
// we need TLS connection with SNI info to properly route it
SslMode::Require
} else {
SslMode::Disable
};
let conn_info = compute::ConnectInfo {
host: db_info.host.into(),
port: db_info.port,
ssl_mode,
host_addr: None,
};
let auth_info =
AuthInfo::for_console_redirect(&db_info.dbname, &db_info.user, db_info.password.as_deref());
let user: RoleName = db_info.user.into();
let user_info = ComputeUserInfo {
@@ -209,26 +214,12 @@ async fn authenticate(
ctx.set_project(db_info.aux.clone());
info!("woken up a compute node");
// Backwards compatibility. pg_sni_proxy uses "--" in domain names
// while direct connections do not. Once we migrate to pg_sni_proxy
// everywhere, we can remove this.
if db_info.host.contains("--") {
// we need TLS connection with SNI info to properly route it
config.ssl_mode(SslMode::Require);
} else {
config.ssl_mode(SslMode::Disable);
}
if let Some(password) = db_info.password {
config.password(password.as_ref());
}
Ok((
NodeInfo {
config,
conn_info,
aux: db_info.aux,
},
auth_info,
user_info,
db_info.allowed_ips,
))
}

View File

@@ -1,11 +1,12 @@
use std::net::SocketAddr;
use arc_swap::ArcSwapOption;
use postgres_client::config::SslMode;
use tokio::sync::Semaphore;
use super::jwt::{AuthRule, FetchAuthRules};
use crate::auth::backend::jwt::FetchAuthRulesError;
use crate::compute::ConnCfg;
use crate::compute::ConnectInfo;
use crate::compute_ctl::ComputeCtlApi;
use crate::context::RequestContext;
use crate::control_plane::NodeInfo;
@@ -29,7 +30,12 @@ impl LocalBackend {
api: http::Endpoint::new(compute_ctl, http::new_client()),
},
node_info: NodeInfo {
config: ConnCfg::new(postgres_addr.ip().to_string(), postgres_addr.port()),
conn_info: ConnectInfo {
host_addr: Some(postgres_addr.ip()),
host: postgres_addr.ip().to_string().into(),
port: postgres_addr.port(),
ssl_mode: SslMode::Disable,
},
// TODO(conrad): make this better reflect compute info rather than endpoint info.
aux: MetricsAuxInfo {
endpoint_id: EndpointIdTag::get_interner().get_or_intern("local"),

View File

@@ -14,20 +14,21 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info};
use crate::auth::{self, AuthError, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
use crate::auth::{self, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::client::ControlPlaneClient;
use crate::control_plane::errors::GetAuthInfoError;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{
self, AccessBlockerFlags, AuthSecret, CachedNodeInfo, ControlPlaneApi, EndpointAccessControl,
RoleAccessControl,
};
use crate::intern::EndpointIdInt;
use crate::pglb::connect_compute::ComputeConnectBackend;
use crate::pqproto::BeMessage;
use crate::proxy::NeonOptions;
use crate::proxy::wake_compute::WakeComputeBackend;
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::Stream;
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
@@ -168,8 +169,6 @@ impl ComputeUserInfo {
#[cfg_attr(test, derive(Debug))]
pub(crate) enum ComputeCredentialKeys {
#[cfg(any(test, feature = "testing"))]
Password(Vec<u8>),
AuthKeys(AuthKeys),
JwtPayload(Vec<u8>),
None,
@@ -232,11 +231,8 @@ async fn auth_quirks(
config.is_vpc_acccess_proxy,
)?;
let endpoint = EndpointIdInt::from(&info.endpoint);
let rate_limit_config = None;
if !endpoint_rate_limiter.check(endpoint, rate_limit_config, 1) {
return Err(AuthError::too_many_connections());
}
access_controls.connection_attempt_rate_limit(ctx, &info.endpoint, &endpoint_rate_limiter)?;
let role_access = api
.get_role_access_control(ctx, &info.endpoint, &info.user)
.await?;
@@ -403,29 +399,23 @@ impl Backend<'_, ComputeUserInfo> {
allowed_ips: Arc::new(vec![]),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
}),
}
}
}
#[async_trait::async_trait]
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
impl WakeComputeBackend for Backend<'_, ComputeUserInfo> {
async fn wake_compute(
&self,
ctx: &RequestContext,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
match self {
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
Self::ControlPlane(api, info) => api.wake_compute(ctx, info).await,
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
}
}
fn get_keys(&self) -> &ComputeCredentialKeys {
match self {
Self::ControlPlane(_, creds) => &creds.keys,
Self::Local(_) => &ComputeCredentialKeys::None,
}
}
}
#[cfg(test)]
@@ -448,6 +438,7 @@ mod tests {
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{
self, AccessBlockerFlags, CachedNodeInfo, EndpointAccessControl, RoleAccessControl,
};
@@ -486,6 +477,7 @@ mod tests {
allowed_ips: Arc::new(self.ips.clone()),
allowed_vpce: Arc::new(self.vpc_endpoint_ids.clone()),
flags: self.access_blocker_flags,
rate_limits: EndpointRateLimitConfig::default(),
})
}

View File

@@ -169,13 +169,6 @@ pub(crate) async fn validate_password_and_exchange(
secret: AuthSecret,
) -> super::Result<sasl::Outcome<ComputeCredentialKeys>> {
match secret {
#[cfg(any(test, feature = "testing"))]
AuthSecret::Md5(_) => {
// test only
Ok(sasl::Outcome::Success(ComputeCredentialKeys::Password(
password.to_owned(),
)))
}
// perform scram authentication as both client and server to validate the keys
AuthSecret::Scram(scram_secret) => {
let outcome = crate::scram::exchange(pool, endpoint, &scram_secret, password).await?;

View File

@@ -28,10 +28,9 @@ use crate::context::RequestContext;
use crate::metrics::{Metrics, ThreadPoolMetrics};
use crate::pqproto::FeStartupPacket;
use crate::protocol2::ConnectionInfo;
use crate::proxy::{
ErrorSource, TlsRequired, copy_bidirectional_client_compute, run_until_cancelled,
};
use crate::proxy::{ErrorSource, TlsRequired, copy_bidirectional_client_compute};
use crate::stream::{PqStream, Stream};
use crate::util::run_until_cancelled;
project_git_version!(GIT_VERSION);

View File

@@ -18,6 +18,7 @@ use crate::types::{EndpointId, RoleName};
#[async_trait]
pub(crate) trait ProjectInfoCache {
fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt);
fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt);
fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt);
fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
@@ -100,6 +101,13 @@ pub struct ProjectInfoCacheImpl {
#[async_trait]
impl ProjectInfoCache for ProjectInfoCacheImpl {
fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt) {
info!("invalidating endpoint access for `{endpoint_id}`");
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_endpoint();
}
}
fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt) {
info!("invalidating endpoint access for project `{project_id}`");
let endpoints = self
@@ -356,6 +364,7 @@ mod tests {
use std::sync::Arc;
use super::*;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{AccessBlockerFlags, AuthSecret};
use crate::scram::ServerSecret;
use crate::types::ProjectId;
@@ -391,6 +400,7 @@ mod tests {
allowed_ips: allowed_ips.clone(),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
},
RoleAccessControl {
secret: secret1.clone(),
@@ -406,6 +416,7 @@ mod tests {
allowed_ips: allowed_ips.clone(),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
},
RoleAccessControl {
secret: secret2.clone(),
@@ -431,6 +442,7 @@ mod tests {
allowed_ips: allowed_ips.clone(),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
},
RoleAccessControl {
secret: secret3.clone(),

View File

@@ -24,7 +24,6 @@ use crate::pqproto::CancelKeyData;
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
use crate::redis::kv_ops::RedisKVClient;
use crate::tls::postgres_rustls::MakeRustlsConnect;
type IpSubnetKey = IpNet;
@@ -497,10 +496,8 @@ impl CancelClosure {
) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
let mut mk_tls =
crate::tls::postgres_rustls::MakeRustlsConnect::new(compute_config.tls.clone());
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
let tls = <_ as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
compute_config,
&self.hostname,
)
.map_err(|e| CancelError::IO(std::io::Error::other(e.to_string())))?;

View File

@@ -1,21 +1,24 @@
mod tls;
use std::fmt::Debug;
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use std::net::{IpAddr, SocketAddr};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use postgres_client::config::{AuthKeys, SslMode};
use postgres_client::maybe_tls_stream::MaybeTlsStream;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{CancelToken, RawConnection};
use postgres_client::{CancelToken, NoTls, RawConnection};
use postgres_protocol::message::backend::NoticeResponseBody;
use rustls::pki_types::InvalidDnsNameError;
use thiserror::Error;
use tokio::net::{TcpStream, lookup_host};
use tracing::{debug, error, info, warn};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::parse_endpoint_param;
use crate::cancellation::CancelClosure;
use crate::compute::tls::TlsError;
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::client::ApiLockError;
@@ -25,7 +28,6 @@ use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumDbConnectionsGuard};
use crate::pqproto::StartupMessageParams;
use crate::proxy::neon_option;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use crate::types::Host;
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
@@ -38,10 +40,7 @@ pub(crate) enum ConnectionError {
Postgres(#[from] postgres_client::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] InvalidDnsNameError),
TlsError(#[from] TlsError),
#[error("{COULD_NOT_CONNECT}: {0}")]
WakeComputeError(#[from] WakeComputeError),
@@ -73,7 +72,7 @@ impl UserFacingError for ConnectionError {
ConnectionError::TooManyConnectionAttempts(_) => {
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
}
_ => COULD_NOT_CONNECT.to_owned(),
ConnectionError::TlsError(_) => COULD_NOT_CONNECT.to_owned(),
}
}
}
@@ -85,7 +84,6 @@ impl ReportableError for ConnectionError {
crate::error::ErrorKind::Postgres
}
ConnectionError::Postgres(_) => crate::error::ErrorKind::Compute,
ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute,
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
ConnectionError::TooManyConnectionAttempts(e) => e.get_error_kind(),
@@ -96,34 +94,85 @@ impl ReportableError for ConnectionError {
/// A pair of `ClientKey` & `ServerKey` for `SCRAM-SHA-256`.
pub(crate) type ScramKeys = postgres_client::config::ScramKeys<32>;
/// A config for establishing a connection to compute node.
/// Eventually, `postgres_client` will be replaced with something better.
/// Newtype allows us to implement methods on top of it.
#[derive(Clone)]
pub(crate) struct ConnCfg(Box<postgres_client::Config>);
pub enum Auth {
/// Only used during console-redirect.
Password(Vec<u8>),
/// Used by sql-over-http, ws, tcp.
Scram(Box<ScramKeys>),
}
/// A config for authenticating to the compute node.
pub(crate) struct AuthInfo {
/// None for local-proxy, as we use trust-based localhost auth.
/// Some for sql-over-http, ws, tcp, and in most cases for console-redirect.
/// Might be None for console-redirect, but that's only a consequence of testing environments ATM.
auth: Option<Auth>,
server_params: StartupMessageParams,
/// Console redirect sets user and database, we shouldn't re-use those from the params.
skip_db_user: bool,
}
/// Contains only the data needed to establish a secure connection to compute.
#[derive(Clone)]
pub struct ConnectInfo {
pub host_addr: Option<IpAddr>,
pub host: Host,
pub port: u16,
pub ssl_mode: SslMode,
}
/// Creation and initialization routines.
impl ConnCfg {
pub(crate) fn new(host: String, port: u16) -> Self {
Self(Box::new(postgres_client::Config::new(host, port)))
}
/// Reuse password or auth keys from the other config.
pub(crate) fn reuse_password(&mut self, other: Self) {
if let Some(password) = other.get_password() {
self.password(password);
}
if let Some(keys) = other.get_auth_keys() {
self.auth_keys(keys);
impl AuthInfo {
pub(crate) fn for_console_redirect(db: &str, user: &str, pw: Option<&str>) -> Self {
let mut server_params = StartupMessageParams::default();
server_params.insert("database", db);
server_params.insert("user", user);
Self {
auth: pw.map(|pw| Auth::Password(pw.as_bytes().to_owned())),
server_params,
skip_db_user: true,
}
}
pub(crate) fn get_host(&self) -> Host {
match self.0.get_host() {
postgres_client::config::Host::Tcp(s) => s.into(),
pub(crate) fn with_auth_keys(keys: ComputeCredentialKeys) -> Self {
Self {
auth: match keys {
ComputeCredentialKeys::AuthKeys(AuthKeys::ScramSha256(auth_keys)) => {
Some(Auth::Scram(Box::new(auth_keys)))
}
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => None,
},
server_params: StartupMessageParams::default(),
skip_db_user: false,
}
}
}
impl ConnectInfo {
pub fn to_postgres_client_config(&self) -> postgres_client::Config {
let mut config = postgres_client::Config::new(self.host.to_string(), self.port);
config.ssl_mode(self.ssl_mode);
if let Some(host_addr) = self.host_addr {
config.set_host_addr(host_addr);
}
config
}
}
impl AuthInfo {
fn enrich(&self, mut config: postgres_client::Config) -> postgres_client::Config {
match &self.auth {
Some(Auth::Scram(keys)) => config.auth_keys(AuthKeys::ScramSha256(**keys)),
Some(Auth::Password(pw)) => config.password(pw),
None => &mut config,
};
for (k, v) in self.server_params.iter() {
config.set_param(k, v);
}
config
}
/// Apply startup message params to the connection config.
pub(crate) fn set_startup_params(
@@ -132,27 +181,26 @@ impl ConnCfg {
arbitrary_params: bool,
) {
if !arbitrary_params {
self.set_param("client_encoding", "UTF8");
self.server_params.insert("client_encoding", "UTF8");
}
for (k, v) in params.iter() {
match k {
// Only set `user` if it's not present in the config.
// Console redirect auth flow takes username from the console's response.
"user" if self.user_is_set() => {}
"database" if self.db_is_set() => {}
"user" | "database" if self.skip_db_user => {}
"options" => {
if let Some(options) = filtered_options(v) {
self.set_param(k, &options);
self.server_params.insert(k, &options);
}
}
"user" | "database" | "application_name" | "replication" => {
self.set_param(k, v);
self.server_params.insert(k, v);
}
// if we allow arbitrary params, then we forward them through.
// this is a flag for a period of backwards compatibility
k if arbitrary_params => {
self.set_param(k, v);
self.server_params.insert(k, v);
}
_ => {}
}
@@ -160,25 +208,13 @@ impl ConnCfg {
}
}
impl std::ops::Deref for ConnCfg {
type Target = postgres_client::Config;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// For now, let's make it easier to setup the config.
impl std::ops::DerefMut for ConnCfg {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl ConnCfg {
/// Establish a raw TCP connection to the compute node.
async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> {
use postgres_client::config::Host;
impl ConnectInfo {
/// Establish a raw TCP+TLS connection to the compute node.
async fn connect_raw(
&self,
config: &ComputeConfig,
) -> Result<(SocketAddr, MaybeTlsStream<TcpStream, RustlsStream>), TlsError> {
let timeout = config.timeout;
// wrap TcpStream::connect with timeout
let connect_with_timeout = |addrs| {
@@ -208,34 +244,32 @@ impl ConnCfg {
// We can't reuse connection establishing logic from `postgres_client` here,
// because it has no means for extracting the underlying socket which we
// require for our business.
let port = self.0.get_port();
let host = self.0.get_host();
let port = self.port;
let host = &*self.host;
let host = match host {
Host::Tcp(host) => host.as_str(),
};
let addrs = match self.0.get_host_addr() {
let addrs = match self.host_addr {
Some(addr) => vec![SocketAddr::new(addr, port)],
None => lookup_host((host, port)).await?.collect(),
};
match connect_once(&*addrs).await {
Ok((sockaddr, stream)) => Ok((sockaddr, stream, host)),
Ok((sockaddr, stream)) => Ok((
sockaddr,
tls::connect_tls(stream, self.ssl_mode, config, host).await?,
)),
Err(err) => {
warn!("couldn't connect to compute node at {host}:{port}: {err}");
Err(err)
Err(TlsError::Connection(err))
}
}
}
}
type RustlsStream = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::Stream;
type RustlsStream = <ComputeConfig as MakeTlsConnect<tokio::net::TcpStream>>::Stream;
pub(crate) struct PostgresConnection {
/// Socket connected to a compute node.
pub(crate) stream:
postgres_client::maybe_tls_stream::MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
pub(crate) stream: MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
/// PostgreSQL connection parameters.
pub(crate) params: std::collections::HashMap<String, String>,
/// Query cancellation token.
@@ -248,28 +282,23 @@ pub(crate) struct PostgresConnection {
_guage: NumDbConnectionsGuard<'static>,
}
impl ConnCfg {
impl ConnectInfo {
/// Connect to a corresponding compute node.
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
aux: MetricsAuxInfo,
auth: &AuthInfo,
config: &ComputeConfig,
user_info: ComputeUserInfo,
) -> Result<PostgresConnection, ConnectionError> {
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream, host) = self.connect_raw(config.timeout).await?;
drop(pause);
let mut tmp_config = auth.enrich(self.to_postgres_client_config());
// we setup SSL early in `ConnectInfo::connect_raw`.
tmp_config.ssl_mode(SslMode::Disable);
let mut mk_tls = crate::tls::postgres_rustls::MakeRustlsConnect::new(config.tls.clone());
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
host,
)?;
// connect_raw() will not use TLS if sslmode is "disable"
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let connection = self.0.connect_raw(stream, tls).await?;
let (socket_addr, stream) = self.connect_raw(config).await?;
let connection = tmp_config.connect_raw(stream, NoTls).await?;
drop(pause);
let RawConnection {
@@ -282,13 +311,14 @@ impl ConnCfg {
tracing::Span::current().record("pid", tracing::field::display(process_id));
tracing::Span::current().record("compute_id", tracing::field::display(&aux.compute_id));
let stream = stream.into_inner();
let MaybeTlsStream::Raw(stream) = stream.into_inner();
// TODO: lots of useful info but maybe we can move it elsewhere (eg traces?)
info!(
cold_start_info = ctx.cold_start_info().as_str(),
"connected to compute node at {host} ({socket_addr}) sslmode={:?}, latency={}, query_id={}",
self.0.get_ssl_mode(),
"connected to compute node at {} ({socket_addr}) sslmode={:?}, latency={}, query_id={}",
self.host,
self.ssl_mode,
ctx.get_proxy_latency(),
ctx.get_testodrome_id().unwrap_or_default(),
);
@@ -299,11 +329,11 @@ impl ConnCfg {
socket_addr,
CancelToken {
socket_config: None,
ssl_mode: self.0.get_ssl_mode(),
ssl_mode: self.ssl_mode,
process_id,
secret_key,
},
host.to_string(),
self.host.to_string(),
user_info,
);

63
proxy/src/compute/tls.rs Normal file
View File

@@ -0,0 +1,63 @@
use futures::FutureExt;
use postgres_client::config::SslMode;
use postgres_client::maybe_tls_stream::MaybeTlsStream;
use postgres_client::tls::{MakeTlsConnect, TlsConnect};
use rustls::pki_types::InvalidDnsNameError;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::pqproto::request_tls;
use crate::proxy::retry::CouldRetry;
#[derive(Debug, Error)]
pub enum TlsError {
#[error(transparent)]
Dns(#[from] InvalidDnsNameError),
#[error(transparent)]
Connection(#[from] std::io::Error),
#[error("TLS required but not provided")]
Required,
}
impl CouldRetry for TlsError {
fn could_retry(&self) -> bool {
match self {
TlsError::Dns(_) => false,
TlsError::Connection(err) => err.could_retry(),
// perhaps compute didn't realise it supports TLS?
TlsError::Required => true,
}
}
}
pub async fn connect_tls<S, T>(
mut stream: S,
mode: SslMode,
tls: &T,
host: &str,
) -> Result<MaybeTlsStream<S, T::Stream>, TlsError>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
T: MakeTlsConnect<
S,
Error = InvalidDnsNameError,
TlsConnect: TlsConnect<S, Error = std::io::Error, Future: Send>,
>,
{
match mode {
SslMode::Disable => return Ok(MaybeTlsStream::Raw(stream)),
SslMode::Prefer | SslMode::Require => {}
}
if !request_tls(&mut stream).await? {
if SslMode::Require == mode {
return Err(TlsError::Required);
}
return Ok(MaybeTlsStream::Raw(stream));
}
Ok(MaybeTlsStream::Tls(
tls.make_tls_connect(host)?.connect(stream).boxed().await?,
))
}

View File

@@ -11,13 +11,12 @@ use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
use crate::pglb::handshake::{HandshakeData, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
use crate::proxy::{
ClientRequestError, ErrorSource, prepare_client_connection, run_until_cancelled,
};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection};
use crate::util::run_until_cancelled;
pub async fn task_main(
config: &'static ProxyConfig,
@@ -210,20 +209,20 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
ctx.set_db_options(params.clone());
let (node_info, user_info, _ip_allowlist) = match backend
let (node_info, mut auth_info, user_info) = match backend
.authenticate(ctx, &config.authentication_config, &mut stream)
.await
{
Ok(auth_result) => auth_result,
Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?,
};
auth_info.set_startup_params(&params, true);
let node = connect_to_compute(
ctx,
&TcpMechanism {
user_info,
params_compat: true,
params: &params,
auth: auth_info,
locks: &config.connect_compute_locks,
},
&node_info,

View File

@@ -146,6 +146,7 @@ impl NeonControlPlaneClient {
public_access_blocked: block_public_connections,
vpc_access_blocked: block_vpc_connections,
},
rate_limits: body.rate_limits,
})
}
.inspect_err(|e| tracing::debug!(error = ?e))
@@ -261,24 +262,18 @@ impl NeonControlPlaneClient {
Some(_) => SslMode::Require,
None => SslMode::Disable,
};
let host_name = match body.server_name {
Some(host) => host,
None => host.to_owned(),
let host = match body.server_name {
Some(host) => host.into(),
None => host.into(),
};
// Don't set anything but host and port! This config will be cached.
// We'll set username and such later using the startup message.
// TODO: add more type safety (in progress).
let mut config = compute::ConnCfg::new(host_name, port);
if let Some(addr) = host_addr {
config.set_host_addr(addr);
}
config.ssl_mode(ssl_mode);
let node = NodeInfo {
config,
conn_info: compute::ConnectInfo {
host_addr,
host,
port,
ssl_mode,
},
aux: body.aux,
};
@@ -318,6 +313,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
allowed_ips: Arc::new(auth_info.allowed_ips),
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
flags: auth_info.access_blocker_flags,
rate_limits: auth_info.rate_limits,
};
let role_control = RoleAccessControl {
secret: auth_info.secret,
@@ -363,6 +359,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
allowed_ips: Arc::new(auth_info.allowed_ips),
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
flags: auth_info.access_blocker_flags,
rate_limits: auth_info.rate_limits,
};
let role_control = RoleAccessControl {
secret: auth_info.secret,

View File

@@ -6,6 +6,7 @@ use std::str::FromStr;
use std::sync::Arc;
use futures::TryFutureExt;
use postgres_client::config::SslMode;
use thiserror::Error;
use tokio_postgres::Client;
use tracing::{Instrument, error, info, info_span, warn};
@@ -14,19 +15,20 @@ use crate::auth::IpPattern;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::cache::Cached;
use crate::compute::ConnectInfo;
use crate::context::RequestContext;
use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::messages::{EndpointRateLimitConfig, MetricsAuxInfo};
use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl,
};
use crate::intern::RoleNameInt;
use crate::scram;
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
use crate::url::ApiUrl;
use crate::{compute, scram};
#[derive(Debug, Error)]
enum MockApiError {
@@ -87,8 +89,7 @@ impl MockControlPlane {
.await?
{
info!("got a secret: {entry}"); // safe since it's not a prod scenario
let secret = scram::ServerSecret::parse(&entry).map(AuthSecret::Scram);
secret.or_else(|| parse_md5(&entry).map(AuthSecret::Md5))
scram::ServerSecret::parse(&entry).map(AuthSecret::Scram)
} else {
warn!("user '{role}' does not exist");
None
@@ -129,6 +130,7 @@ impl MockControlPlane {
project_id: None,
account_id: None,
access_blocker_flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
})
}
@@ -170,25 +172,23 @@ impl MockControlPlane {
async fn do_wake_compute(&self) -> Result<NodeInfo, WakeComputeError> {
let port = self.endpoint.port().unwrap_or(5432);
let mut config = match self.endpoint.host_str() {
None => {
let mut config = compute::ConnCfg::new("localhost".to_string(), port);
config.set_host_addr(IpAddr::V4(Ipv4Addr::LOCALHOST));
config
}
Some(host) => {
let mut config = compute::ConnCfg::new(host.to_string(), port);
if let Ok(addr) = IpAddr::from_str(host) {
config.set_host_addr(addr);
}
config
}
let conn_info = match self.endpoint.host_str() {
None => ConnectInfo {
host_addr: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
host: "localhost".into(),
port,
ssl_mode: SslMode::Disable,
},
Some(host) => ConnectInfo {
host_addr: IpAddr::from_str(host).ok(),
host: host.into(),
port,
ssl_mode: SslMode::Disable,
},
};
config.ssl_mode(postgres_client::config::SslMode::Disable);
let node = NodeInfo {
config,
conn_info,
aux: MetricsAuxInfo {
endpoint_id: (&EndpointId::from("endpoint")).into(),
project_id: (&ProjectId::from("project")).into(),
@@ -234,6 +234,7 @@ impl super::ControlPlaneApi for MockControlPlane {
allowed_ips: Arc::new(info.allowed_ips),
allowed_vpce: Arc::new(info.allowed_vpc_endpoint_ids),
flags: info.access_blocker_flags,
rate_limits: info.rate_limits,
})
}
@@ -266,12 +267,3 @@ impl super::ControlPlaneApi for MockControlPlane {
self.do_wake_compute().map_ok(Cached::new_uncached).await
}
}
fn parse_md5(input: &str) -> Option<[u8; 16]> {
let text = input.strip_prefix("md5")?;
let mut bytes = [0u8; 16];
hex::decode_to_slice(text, &mut bytes).ok()?;
Some(bytes)
}

View File

@@ -10,6 +10,7 @@ use clashmap::ClashMap;
use tokio::time::Instant;
use tracing::{debug, info};
use super::{EndpointAccessControl, RoleAccessControl};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
use crate::cache::endpoints::EndpointsCache;
@@ -22,8 +23,6 @@ use crate::metrics::ApiLockMetrics;
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
use crate::types::EndpointId;
use super::{EndpointAccessControl, RoleAccessControl};
#[non_exhaustive]
#[derive(Clone)]
pub enum ControlPlaneClient {

View File

@@ -227,12 +227,35 @@ pub(crate) struct UserFacingMessage {
#[derive(Deserialize)]
pub(crate) struct GetEndpointAccessControl {
pub(crate) role_secret: Box<str>,
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
pub(crate) project_id: Option<ProjectIdInt>,
pub(crate) account_id: Option<AccountIdInt>,
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
pub(crate) block_public_connections: Option<bool>,
pub(crate) block_vpc_connections: Option<bool>,
#[serde(default)]
pub(crate) rate_limits: EndpointRateLimitConfig,
}
#[derive(Copy, Clone, Deserialize, Default)]
pub struct EndpointRateLimitConfig {
pub connection_attempts: ConnectionAttemptsLimit,
}
#[derive(Copy, Clone, Deserialize, Default)]
pub struct ConnectionAttemptsLimit {
pub tcp: Option<LeakyBucketSetting>,
pub ws: Option<LeakyBucketSetting>,
pub http: Option<LeakyBucketSetting>,
}
#[derive(Copy, Clone, Deserialize)]
pub struct LeakyBucketSetting {
pub rps: f64,
pub burst: f64,
}
/// Response which holds compute node's `host:port` pair.

View File

@@ -11,15 +11,18 @@ pub(crate) mod errors;
use std::sync::Arc;
use messages::EndpointRateLimitConfig;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list};
use crate::cache::{Cached, TimedLru};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
use crate::intern::{AccountIdInt, ProjectIdInt};
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
use crate::protocol2::ConnectionInfoExtra;
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{compute, scram};
@@ -39,10 +42,6 @@ pub mod mgmt;
/// Auth secret which is managed by the cloud.
#[derive(Clone, Eq, PartialEq, Debug)]
pub(crate) enum AuthSecret {
#[cfg(any(test, feature = "testing"))]
/// Md5 hash of user's password.
Md5([u8; 16]),
/// [SCRAM](crate::scram) authentication info.
Scram(scram::ServerSecret),
}
@@ -60,16 +59,14 @@ pub(crate) struct AuthInfo {
pub(crate) account_id: Option<AccountIdInt>,
/// Are public connections or VPC connections blocked?
pub(crate) access_blocker_flags: AccessBlockerFlags,
/// The rate limits for this endpoint.
pub(crate) rate_limits: EndpointRateLimitConfig,
}
/// Info for establishing a connection to a compute node.
/// This is what we get after auth succeeded, but not before!
#[derive(Clone)]
pub(crate) struct NodeInfo {
/// Compute node connection params.
/// It's sad that we have to clone this, but this will improve
/// once we migrate to a bespoke connection logic.
pub(crate) config: compute::ConnCfg,
pub(crate) conn_info: compute::ConnectInfo,
/// Labels for proxy's metrics.
pub(crate) aux: MetricsAuxInfo,
@@ -79,26 +76,14 @@ impl NodeInfo {
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
auth: &compute::AuthInfo,
config: &ComputeConfig,
user_info: ComputeUserInfo,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config
.connect(ctx, self.aux.clone(), config, user_info)
self.conn_info
.connect(ctx, self.aux.clone(), auth, config, user_info)
.await
}
pub(crate) fn reuse_settings(&mut self, other: Self) {
self.config.reuse_password(other.config);
}
pub(crate) fn set_keys(&mut self, keys: &ComputeCredentialKeys) {
match keys {
#[cfg(any(test, feature = "testing"))]
ComputeCredentialKeys::Password(password) => self.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => &mut self.config,
};
}
}
#[derive(Copy, Clone, Default)]
@@ -121,6 +106,8 @@ pub struct EndpointAccessControl {
pub allowed_ips: Arc<Vec<IpPattern>>,
pub allowed_vpce: Arc<Vec<String>>,
pub flags: AccessBlockerFlags,
pub rate_limits: EndpointRateLimitConfig,
}
impl EndpointAccessControl {
@@ -159,6 +146,36 @@ impl EndpointAccessControl {
Ok(())
}
pub fn connection_attempt_rate_limit(
&self,
ctx: &RequestContext,
endpoint: &EndpointId,
rate_limiter: &EndpointRateLimiter,
) -> Result<(), AuthError> {
let endpoint = EndpointIdInt::from(endpoint);
let limits = &self.rate_limits.connection_attempts;
let config = match ctx.protocol() {
crate::metrics::Protocol::Http => limits.http,
crate::metrics::Protocol::Ws => limits.ws,
crate::metrics::Protocol::Tcp => limits.tcp,
crate::metrics::Protocol::SniRouter => return Ok(()),
};
let config = config.and_then(|config| {
if config.rps <= 0.0 || config.burst <= 0.0 {
return None;
}
Some(LeakyBucketConfig::new(config.rps, config.burst))
});
if !rate_limiter.check(endpoint, config, 1) {
return Err(AuthError::too_many_connections());
}
Ok(())
}
}
/// This will allocate per each call, but the http requests alone

View File

@@ -106,4 +106,5 @@ mod tls;
mod types;
mod url;
mod usage_metrics;
mod util;
mod waiters;

View File

@@ -610,11 +610,11 @@ pub enum RedisEventsCount {
BranchCreated,
ProjectCreated,
CancelSession,
PasswordUpdate,
AllowedIpsUpdate,
AllowedVpcEndpointIdsUpdateForProjects,
AllowedVpcEndpointIdsUpdateForAllProjectsInOrg,
BlockPublicOrVpcAccessUpdate,
InvalidateRole,
InvalidateEndpoint,
InvalidateProject,
InvalidateProjects,
InvalidateOrg,
}
pub struct ThreadPoolWorkers(usize);

View File

@@ -1,4 +1,3 @@
pub mod connect_compute;
pub mod copy_bidirectional;
pub mod handshake;
pub mod inprocess;

View File

@@ -8,7 +8,7 @@ use std::io::{self, Cursor};
use bytes::{Buf, BufMut};
use itertools::Itertools;
use rand::distributions::{Distribution, Standard};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use zerocopy::{FromBytes, Immutable, IntoBytes, big_endian};
pub type ErrorCode = [u8; 5];
@@ -53,6 +53,28 @@ impl fmt::Debug for ProtocolVersion {
}
}
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L118>
const MAX_STARTUP_PACKET_LENGTH: usize = 10000;
const RESERVED_INVALID_MAJOR_VERSION: u16 = 1234;
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L132>
const CANCEL_REQUEST_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5678);
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L166>
const NEGOTIATE_SSL_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5679);
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L167>
const NEGOTIATE_GSS_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5680);
/// This first reads the startup message header, is 8 bytes.
/// The first 4 bytes is a big-endian message length, and the next 4 bytes is a version number.
///
/// The length value is inclusive of the header. For example,
/// an empty message will always have length 8.
#[derive(Clone, Copy, FromBytes, IntoBytes, Immutable)]
#[repr(C)]
struct StartupHeader {
len: big_endian::U32,
version: ProtocolVersion,
}
/// read the type from the stream using zerocopy.
///
/// not cancel safe.
@@ -66,32 +88,38 @@ macro_rules! read {
}};
}
/// Returns true if TLS is supported.
///
/// This is not cancel safe.
pub async fn request_tls<S>(stream: &mut S) -> io::Result<bool>
where
S: AsyncRead + AsyncWrite + Unpin,
{
let payload = StartupHeader {
len: 8.into(),
version: NEGOTIATE_SSL_CODE,
};
stream.write_all(payload.as_bytes()).await?;
stream.flush().await?;
// we expect back either `S` or `N` as a single byte.
let mut res = *b"0";
stream.read_exact(&mut res).await?;
debug_assert!(
res == *b"S" || res == *b"N",
"unexpected SSL negotiation response: {}",
char::from(res[0]),
);
// S for SSL.
Ok(res == *b"S")
}
pub async fn read_startup<S>(stream: &mut S) -> io::Result<FeStartupPacket>
where
S: AsyncRead + Unpin,
{
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L118>
const MAX_STARTUP_PACKET_LENGTH: usize = 10000;
const RESERVED_INVALID_MAJOR_VERSION: u16 = 1234;
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L132>
const CANCEL_REQUEST_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5678);
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L166>
const NEGOTIATE_SSL_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5679);
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L167>
const NEGOTIATE_GSS_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5680);
/// This first reads the startup message header, is 8 bytes.
/// The first 4 bytes is a big-endian message length, and the next 4 bytes is a version number.
///
/// The length value is inclusive of the header. For example,
/// an empty message will always have length 8.
#[derive(Clone, Copy, FromBytes, IntoBytes, Immutable)]
#[repr(C)]
struct StartupHeader {
len: big_endian::U32,
version: ProtocolVersion,
}
let header = read!(stream => StartupHeader);
// <https://github.com/postgres/postgres/blob/04bcf9e19a4261fe9c7df37c777592c2e10c32a7/src/backend/tcop/backend_startup.c#L378-L382>
@@ -564,9 +592,8 @@ mod tests {
use tokio::io::{AsyncWriteExt, duplex};
use zerocopy::IntoBytes;
use crate::pqproto::{FeStartupPacket, read_message, read_startup};
use super::ProtocolVersion;
use crate::pqproto::{FeStartupPacket, read_message, read_startup};
#[tokio::test]
async fn reject_large_startup() {

View File

@@ -2,26 +2,25 @@ use async_trait::async_trait;
use tokio::time;
use tracing::{debug, info, warn};
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::compute::{self, COULD_NOT_CONNECT, PostgresConnection};
use crate::auth::backend::ComputeUserInfo;
use crate::compute::{self, AuthInfo, COULD_NOT_CONNECT, PostgresConnection};
use crate::config::{ComputeConfig, RetryConfig};
use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::control_plane::{self, NodeInfo};
use crate::error::ReportableError;
use crate::metrics::{
ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
};
use crate::pqproto::StartupMessageParams;
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry};
use crate::proxy::wake_compute::wake_compute;
use crate::proxy::wake_compute::{WakeComputeBackend, wake_compute};
use crate::types::Host;
/// If we couldn't connect, a cached connection info might be to blame
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
#[tracing::instrument(name = "invalidate_cache", skip_all)]
#[tracing::instrument(skip_all)]
pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
let is_cached = node_info.cached();
if is_cached {
@@ -48,34 +47,17 @@ pub(crate) trait ConnectMechanism {
node_info: &control_plane::CachedNodeInfo,
config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError>;
fn update_connect_config(&self, conf: &mut compute::ConnCfg);
}
#[async_trait]
pub(crate) trait ComputeConnectBackend {
async fn wake_compute(
&self,
ctx: &RequestContext,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>;
fn get_keys(&self) -> &ComputeCredentialKeys;
}
pub(crate) struct TcpMechanism<'a> {
pub(crate) params_compat: bool,
/// KV-dictionary with PostgreSQL connection params.
pub(crate) params: &'a StartupMessageParams,
pub(crate) struct TcpMechanism {
pub(crate) auth: AuthInfo,
/// connect_to_compute concurrency lock
pub(crate) locks: &'static ApiLocks<Host>,
pub(crate) user_info: ComputeUserInfo,
}
#[async_trait]
impl ConnectMechanism for TcpMechanism<'_> {
impl ConnectMechanism for TcpMechanism {
type Connection = PostgresConnection;
type ConnectError = compute::ConnectionError;
type Error = compute::ConnectionError;
@@ -90,19 +72,18 @@ impl ConnectMechanism for TcpMechanism<'_> {
node_info: &control_plane::CachedNodeInfo,
config: &ComputeConfig,
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
permit.release_result(node_info.connect(ctx, config, self.user_info.clone()).await)
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
config.set_startup_params(self.params, self.params_compat);
let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
permit.release_result(
node_info
.connect(ctx, &self.auth, config, self.user_info.clone())
.await,
)
}
}
/// Try to connect to the compute node, retrying if necessary.
#[tracing::instrument(skip_all)]
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: WakeComputeBackend>(
ctx: &RequestContext,
mechanism: &M,
user_info: &B,
@@ -114,12 +95,9 @@ where
M::Error: From<WakeComputeError>,
{
let mut num_retries = 0;
let mut node_info =
let node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
node_info.set_keys(user_info.get_keys());
mechanism.update_connect_config(&mut node_info.config);
// try once
let err = match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
@@ -155,14 +133,9 @@ where
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
debug!("compute node's state has likely changed; requesting a wake-up");
let old_node_info = invalidate_cache(node_info);
invalidate_cache(node_info);
// TODO: increment num_retries?
let mut node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
node_info.reuse_settings(old_node_info);
mechanism.update_connect_config(&mut node_info.config);
node_info
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?
};
// now that we have a new node, try connect to it repeatedly.

View File

@@ -1,8 +1,10 @@
#[cfg(test)]
mod tests;
pub(crate) mod connect_compute;
pub(crate) mod retry;
pub(crate) mod wake_compute;
use std::sync::Arc;
use futures::FutureExt;
@@ -21,15 +23,16 @@ use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::context::RequestContext;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute};
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams};
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::{PqStream, Stream};
use crate::types::EndpointCacheKey;
use crate::util::run_until_cancelled;
use crate::{auth, compute};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
@@ -46,21 +49,6 @@ impl ReportableError for TlsRequired {
impl UserFacingError for TlsRequired {}
pub async fn run_until_cancelled<F: std::future::Future>(
f: F,
cancellation_token: &CancellationToken,
) -> Option<F::Output> {
match futures::future::select(
std::pin::pin!(f),
std::pin::pin!(cancellation_token.cancelled()),
)
.await
{
futures::future::Either::Left((f, _)) => Some(f),
futures::future::Either::Right(((), _)) => None,
}
}
pub async fn task_main(
config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>,
@@ -358,24 +346,22 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
}
};
let compute_user_info = match &user_info {
auth::Backend::ControlPlane(_, info) => &info.info,
let (cplane, creds) = match user_info {
auth::Backend::ControlPlane(cplane, creds) => (cplane, creds),
auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"),
};
let params_compat = compute_user_info
.options
.get(NeonOptions::PARAMS_COMPAT)
.is_some();
let params_compat = creds.info.options.get(NeonOptions::PARAMS_COMPAT).is_some();
let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys);
auth_info.set_startup_params(&params, params_compat);
let res = connect_to_compute(
ctx,
&TcpMechanism {
user_info: compute_user_info.clone(),
params_compat,
params: &params,
user_info: creds.info.clone(),
auth: auth_info,
locks: &config.connect_compute_locks,
},
&user_info,
&auth::Backend::ControlPlane(cplane, creds.info),
config.wake_compute_retry_config,
&config.connect_to_compute,
)

View File

@@ -100,9 +100,9 @@ impl CouldRetry for compute::ConnectionError {
fn could_retry(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.could_retry(),
compute::ConnectionError::CouldNotConnect(err) => err.could_retry(),
compute::ConnectionError::TlsError(err) => err.could_retry(),
compute::ConnectionError::WakeComputeError(err) => err.could_retry(),
_ => false,
compute::ConnectionError::TooManyConnectionAttempts(_) => false,
}
}
}

View File

@@ -19,17 +19,14 @@ use tracing_test::traced_test;
use super::retry::CouldRetry;
use super::*;
use crate::auth::backend::{
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned,
};
use crate::auth::backend::{ComputeUserInfo, MaybeOwned};
use crate::config::{ComputeConfig, RetryConfig};
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
use crate::control_plane::{self, CachedNodeInfo, NodeInfo, NodeInfoCache};
use crate::error::ErrorKind;
use crate::pglb::connect_compute::ConnectMechanism;
use crate::proxy::connect_compute::ConnectMechanism;
use crate::tls::client_config::compute_client_config_with_certs;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use crate::tls::server_config::CertResolver;
use crate::types::{BranchId, EndpointId, ProjectId};
use crate::{sasl, scram};
@@ -72,13 +69,14 @@ struct ClientConfig<'a> {
hostname: &'a str,
}
type TlsConnect<S> = <MakeRustlsConnect as MakeTlsConnect<S>>::TlsConnect;
type TlsConnect<S> = <ComputeConfig as MakeTlsConnect<S>>::TlsConnect;
impl ClientConfig<'_> {
fn make_tls_connect(self) -> anyhow::Result<TlsConnect<DuplexStream>> {
let mut mk = MakeRustlsConnect::new(self.config);
let tls = MakeTlsConnect::<DuplexStream>::make_tls_connect(&mut mk, self.hostname)?;
Ok(tls)
Ok(crate::tls::postgres_rustls::make_tls_connect(
&self.config,
self.hostname,
)?)
}
}
@@ -497,8 +495,6 @@ impl ConnectMechanism for TestConnectMechanism {
x => panic!("expecting action {x:?}, connect is called instead"),
}
}
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
}
impl TestControlPlaneClient for TestConnectMechanism {
@@ -557,7 +553,12 @@ impl TestControlPlaneClient for TestConnectMechanism {
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = NodeInfo {
config: compute::ConnCfg::new("test".to_owned(), 5432),
conn_info: compute::ConnectInfo {
host: "test".into(),
port: 5432,
ssl_mode: SslMode::Disable,
host_addr: None,
},
aux: MetricsAuxInfo {
endpoint_id: (&EndpointId::from("endpoint")).into(),
project_id: (&ProjectId::from("project")).into(),
@@ -572,16 +573,13 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
fn helper_create_connect_info(
mechanism: &TestConnectMechanism,
) -> auth::Backend<'static, ComputeCredentials> {
) -> auth::Backend<'static, ComputeUserInfo> {
auth::Backend::ControlPlane(
MaybeOwned::Owned(ControlPlaneClient::Test(Box::new(mechanism.clone()))),
ComputeCredentials {
info: ComputeUserInfo {
endpoint: "endpoint".into(),
user: "user".into(),
options: NeonOptions::parse_options_raw(""),
},
keys: ComputeCredentialKeys::Password("password".into()),
ComputeUserInfo {
endpoint: "endpoint".into(),
user: "user".into(),
options: NeonOptions::parse_options_raw(""),
},
)
}

View File

@@ -1,3 +1,4 @@
use async_trait::async_trait;
use tracing::{error, info};
use crate::config::RetryConfig;
@@ -8,7 +9,6 @@ use crate::error::ReportableError;
use crate::metrics::{
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
};
use crate::pglb::connect_compute::ComputeConnectBackend;
use crate::proxy::retry::{retry_after, should_retry};
// Use macro to retain original callsite.
@@ -23,7 +23,12 @@ macro_rules! log_wake_compute_error {
};
}
pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
#[async_trait]
pub(crate) trait WakeComputeBackend {
async fn wake_compute(&self, ctx: &RequestContext) -> Result<CachedNodeInfo, WakeComputeError>;
}
pub(crate) async fn wake_compute<B: WakeComputeBackend>(
num_retries: &mut u32,
ctx: &RequestContext,
api: &B,

View File

@@ -69,9 +69,8 @@ pub struct LeakyBucketConfig {
pub max: f64,
}
#[cfg(test)]
impl LeakyBucketConfig {
pub(crate) fn new(rps: f64, max: f64) -> Self {
pub fn new(rps: f64, max: f64) -> Self {
assert!(rps > 0.0, "rps must be positive");
assert!(max > 0.0, "max must be positive");
Self { rps, max }

View File

@@ -12,11 +12,10 @@ use rand::{Rng, SeedableRng};
use tokio::time::{Duration, Instant};
use tracing::info;
use super::LeakyBucketConfig;
use crate::ext::LockExt;
use crate::intern::EndpointIdInt;
use super::LeakyBucketConfig;
pub struct GlobalRateLimiter {
data: Vec<RateBucket>,
info: Vec<RateBucketInfo>,

View File

@@ -3,12 +3,12 @@ use std::sync::Arc;
use futures::StreamExt;
use redis::aio::PubSub;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use tokio_util::sync::CancellationToken;
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::cache::project_info::ProjectInfoCache;
use crate::intern::{AccountIdInt, ProjectIdInt, RoleNameInt};
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
@@ -27,42 +27,37 @@ struct NotificationHeader<'a> {
topic: &'a str,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[serde(tag = "topic", content = "data")]
pub(crate) enum Notification {
enum Notification {
#[serde(
rename = "/allowed_ips_updated",
rename = "/account_settings_update",
alias = "/allowed_vpc_endpoints_updated_for_org",
deserialize_with = "deserialize_json_string"
)]
AllowedIpsUpdate {
allowed_ips_update: AllowedIpsUpdate,
},
AccountSettingsUpdate(InvalidateAccount),
#[serde(
rename = "/block_public_or_vpc_access_updated",
rename = "/endpoint_settings_update",
deserialize_with = "deserialize_json_string"
)]
BlockPublicOrVpcAccessUpdated {
block_public_or_vpc_access_updated: BlockPublicOrVpcAccessUpdated,
},
EndpointSettingsUpdate(InvalidateEndpoint),
#[serde(
rename = "/allowed_vpc_endpoints_updated_for_org",
rename = "/project_settings_update",
alias = "/allowed_ips_updated",
alias = "/block_public_or_vpc_access_updated",
alias = "/allowed_vpc_endpoints_updated_for_projects",
deserialize_with = "deserialize_json_string"
)]
AllowedVpcEndpointsUpdatedForOrg {
allowed_vpc_endpoints_updated_for_org: AllowedVpcEndpointsUpdatedForOrg,
},
ProjectSettingsUpdate(InvalidateProject),
#[serde(
rename = "/allowed_vpc_endpoints_updated_for_projects",
rename = "/role_setting_update",
alias = "/password_updated",
deserialize_with = "deserialize_json_string"
)]
AllowedVpcEndpointsUpdatedForProjects {
allowed_vpc_endpoints_updated_for_projects: AllowedVpcEndpointsUpdatedForProjects,
},
#[serde(
rename = "/password_updated",
deserialize_with = "deserialize_json_string"
)]
PasswordUpdate { password_update: PasswordUpdate },
RoleSettingUpdate(InvalidateRole),
#[serde(
other,
@@ -72,28 +67,56 @@ pub(crate) enum Notification {
UnknownTopic,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct AllowedIpsUpdate {
project_id: ProjectIdInt,
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
enum InvalidateEndpoint {
EndpointId(EndpointIdInt),
EndpointIds(Vec<EndpointIdInt>),
}
impl std::ops::Deref for InvalidateEndpoint {
type Target = [EndpointIdInt];
fn deref(&self) -> &Self::Target {
match self {
Self::EndpointId(id) => std::slice::from_ref(id),
Self::EndpointIds(ids) => ids,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct BlockPublicOrVpcAccessUpdated {
project_id: ProjectIdInt,
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
enum InvalidateProject {
ProjectId(ProjectIdInt),
ProjectIds(Vec<ProjectIdInt>),
}
impl std::ops::Deref for InvalidateProject {
type Target = [ProjectIdInt];
fn deref(&self) -> &Self::Target {
match self {
Self::ProjectId(id) => std::slice::from_ref(id),
Self::ProjectIds(ids) => ids,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct AllowedVpcEndpointsUpdatedForOrg {
account_id: AccountIdInt,
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
enum InvalidateAccount {
AccountId(AccountIdInt),
AccountIds(Vec<AccountIdInt>),
}
impl std::ops::Deref for InvalidateAccount {
type Target = [AccountIdInt];
fn deref(&self) -> &Self::Target {
match self {
Self::AccountId(id) => std::slice::from_ref(id),
Self::AccountIds(ids) => ids,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct AllowedVpcEndpointsUpdatedForProjects {
project_ids: Vec<ProjectIdInt>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct PasswordUpdate {
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct InvalidateRole {
project_id: ProjectIdInt,
role_name: RoleNameInt,
}
@@ -177,41 +200,29 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
tracing::debug!(?msg, "received a message");
match msg {
Notification::AllowedIpsUpdate { .. }
| Notification::PasswordUpdate { .. }
| Notification::BlockPublicOrVpcAccessUpdated { .. }
| Notification::AllowedVpcEndpointsUpdatedForOrg { .. }
| Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
Notification::RoleSettingUpdate { .. }
| Notification::EndpointSettingsUpdate { .. }
| Notification::ProjectSettingsUpdate { .. }
| Notification::AccountSettingsUpdate { .. } => {
invalidate_cache(self.cache.clone(), msg.clone());
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
Metrics::get()
.proxy
.redis_events_count
.inc(RedisEventsCount::AllowedIpsUpdate);
} else if matches!(msg, Notification::PasswordUpdate { .. }) {
Metrics::get()
.proxy
.redis_events_count
.inc(RedisEventsCount::PasswordUpdate);
} else if matches!(
msg,
Notification::AllowedVpcEndpointsUpdatedForProjects { .. }
) {
Metrics::get()
.proxy
.redis_events_count
.inc(RedisEventsCount::AllowedVpcEndpointIdsUpdateForProjects);
} else if matches!(msg, Notification::AllowedVpcEndpointsUpdatedForOrg { .. }) {
Metrics::get()
.proxy
.redis_events_count
.inc(RedisEventsCount::AllowedVpcEndpointIdsUpdateForAllProjectsInOrg);
} else if matches!(msg, Notification::BlockPublicOrVpcAccessUpdated { .. }) {
Metrics::get()
.proxy
.redis_events_count
.inc(RedisEventsCount::BlockPublicOrVpcAccessUpdate);
let m = &Metrics::get().proxy.redis_events_count;
match msg {
Notification::RoleSettingUpdate { .. } => {
m.inc(RedisEventsCount::InvalidateRole);
}
Notification::EndpointSettingsUpdate { .. } => {
m.inc(RedisEventsCount::InvalidateEndpoint);
}
Notification::ProjectSettingsUpdate { .. } => {
m.inc(RedisEventsCount::InvalidateProject);
}
Notification::AccountSettingsUpdate { .. } => {
m.inc(RedisEventsCount::InvalidateOrg);
}
Notification::UnknownTopic => {}
}
// TODO: add additional metrics for the other event types.
// It might happen that the invalid entry is on the way to be cached.
@@ -233,30 +244,23 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
match msg {
Notification::AllowedIpsUpdate {
allowed_ips_update: AllowedIpsUpdate { project_id },
}
| Notification::BlockPublicOrVpcAccessUpdated {
block_public_or_vpc_access_updated: BlockPublicOrVpcAccessUpdated { project_id },
} => cache.invalidate_endpoint_access_for_project(project_id),
Notification::AllowedVpcEndpointsUpdatedForOrg {
allowed_vpc_endpoints_updated_for_org: AllowedVpcEndpointsUpdatedForOrg { account_id },
} => cache.invalidate_endpoint_access_for_org(account_id),
Notification::AllowedVpcEndpointsUpdatedForProjects {
allowed_vpc_endpoints_updated_for_projects:
AllowedVpcEndpointsUpdatedForProjects { project_ids },
} => {
for project in project_ids {
cache.invalidate_endpoint_access_for_project(project);
}
}
Notification::PasswordUpdate {
password_update:
PasswordUpdate {
project_id,
role_name,
},
} => cache.invalidate_role_secret_for_project(project_id, role_name),
Notification::EndpointSettingsUpdate(ids) => ids
.iter()
.for_each(|&id| cache.invalidate_endpoint_access(id)),
Notification::AccountSettingsUpdate(ids) => ids
.iter()
.for_each(|&id| cache.invalidate_endpoint_access_for_org(id)),
Notification::ProjectSettingsUpdate(ids) => ids
.iter()
.for_each(|&id| cache.invalidate_endpoint_access_for_project(id)),
Notification::RoleSettingUpdate(InvalidateRole {
project_id,
role_name,
}) => cache.invalidate_role_secret_for_project(project_id, role_name),
Notification::UnknownTopic => unreachable!(),
}
}
@@ -353,11 +357,32 @@ mod tests {
let result: Notification = serde_json::from_str(&text)?;
assert_eq!(
result,
Notification::AllowedIpsUpdate {
allowed_ips_update: AllowedIpsUpdate {
project_id: (&project_id).into()
}
}
Notification::ProjectSettingsUpdate(InvalidateProject::ProjectId((&project_id).into()))
);
Ok(())
}
#[test]
fn parse_multiple_projects() -> anyhow::Result<()> {
let project_id1: ProjectId = "new_project1".into();
let project_id2: ProjectId = "new_project2".into();
let data = format!("{{\"project_ids\": [\"{project_id1}\",\"{project_id2}\"]}}");
let text = json!({
"type": "message",
"topic": "/allowed_vpc_endpoints_updated_for_projects",
"data": data,
"extre_fields": "something"
})
.to_string();
let result: Notification = serde_json::from_str(&text)?;
assert_eq!(
result,
Notification::ProjectSettingsUpdate(InvalidateProject::ProjectIds(vec![
(&project_id1).into(),
(&project_id2).into()
]))
);
Ok(())
@@ -379,12 +404,10 @@ mod tests {
let result: Notification = serde_json::from_str(&text)?;
assert_eq!(
result,
Notification::PasswordUpdate {
password_update: PasswordUpdate {
project_id: (&project_id).into(),
role_name: (&role_name).into(),
}
}
Notification::RoleSettingUpdate(InvalidateRole {
project_id: (&project_id).into(),
role_name: (&role_name).into(),
})
);
Ok(())

View File

@@ -21,9 +21,8 @@ use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
use crate::auth::backend::{ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo};
use crate::auth::{self, AuthError};
use crate::compute;
use crate::compute_ctl::{
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
};
@@ -35,7 +34,7 @@ use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
use crate::control_plane::locks::ApiLocks;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::intern::EndpointIdInt;
use crate::pglb::connect_compute::ConnectMechanism;
use crate::proxy::connect_compute::ConnectMechanism;
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
use crate::rate_limiter::EndpointRateLimiter;
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
@@ -69,17 +68,20 @@ impl PoolingBackend {
self.config.authentication_config.is_vpc_acccess_proxy,
)?;
let ep = EndpointIdInt::from(&user_info.endpoint);
let rate_limit_config = None;
if !self.endpoint_rate_limiter.check(ep, rate_limit_config, 1) {
return Err(AuthError::too_many_connections());
}
access_control.connection_attempt_rate_limit(
ctx,
&user_info.endpoint,
&self.endpoint_rate_limiter,
)?;
let role_access = backend.get_role_secret(ctx).await?;
let Some(secret) = role_access.secret else {
// If we don't have an authentication secret, for the http flow we can just return an error.
info!("authentication info not found");
return Err(AuthError::password_failed(&*user_info.user));
};
let ep = EndpointIdInt::from(&user_info.endpoint);
let auth_outcome = crate::auth::validate_password_and_exchange(
&self.config.authentication_config.thread_pool,
ep,
@@ -181,14 +183,15 @@ impl PoolingBackend {
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| keys);
crate::pglb::connect_compute::connect_to_compute(
let backend = self.auth_backend.as_ref().map(|()| keys.info);
crate::proxy::connect_compute::connect_to_compute(
ctx,
&TokioMechanism {
conn_id,
conn_info,
pool: self.pool.clone(),
locks: &self.config.connect_compute_locks,
keys: keys.keys,
},
&backend,
self.config.wake_compute_retry_config,
@@ -215,18 +218,15 @@ impl PoolingBackend {
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
info: ComputeUserInfo {
user: conn_info.user_info.user.clone(),
endpoint: EndpointId::from(format!(
"{}{LOCAL_PROXY_SUFFIX}",
conn_info.user_info.endpoint.normalize()
)),
options: conn_info.user_info.options.clone(),
},
keys: crate::auth::backend::ComputeCredentialKeys::None,
let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
user: conn_info.user_info.user.clone(),
endpoint: EndpointId::from(format!(
"{}{LOCAL_PROXY_SUFFIX}",
conn_info.user_info.endpoint.normalize()
)),
options: conn_info.user_info.options.clone(),
});
crate::pglb::connect_compute::connect_to_compute(
crate::proxy::connect_compute::connect_to_compute(
ctx,
&HyperMechanism {
conn_id,
@@ -305,12 +305,13 @@ impl PoolingBackend {
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
let mut node_info = local_backend.node_info.clone();
let (key, jwk) = create_random_jwk();
let config = node_info
.config
let mut config = local_backend
.node_info
.conn_info
.to_postgres_client_config();
config
.user(&conn_info.user_info.user)
.dbname(&conn_info.dbname)
.set_param(
@@ -322,7 +323,7 @@ impl PoolingBackend {
);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = config.connect(postgres_client::NoTls).await?;
let (client, connection) = config.connect(&postgres_client::NoTls).await?;
drop(pause);
let pid = client.get_process_id();
@@ -336,7 +337,7 @@ impl PoolingBackend {
connection,
key,
conn_id,
node_info.aux.clone(),
local_backend.node_info.aux.clone(),
);
{
@@ -495,6 +496,7 @@ struct TokioMechanism {
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
keys: ComputeCredentialKeys,
/// connect_to_compute concurrency lock
locks: &'static ApiLocks<Host>,
@@ -512,19 +514,20 @@ impl ConnectMechanism for TokioMechanism {
node_info: &CachedNodeInfo,
compute_config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
let mut config = (*node_info.config).clone();
let mut config = node_info.conn_info.to_postgres_client_config();
let config = config
.user(&self.conn_info.user_info.user)
.dbname(&self.conn_info.dbname)
.connect_timeout(compute_config.timeout);
let mk_tls =
crate::tls::postgres_rustls::MakeRustlsConnect::new(compute_config.tls.clone());
if let ComputeCredentialKeys::AuthKeys(auth_keys) = self.keys {
config.auth_keys(auth_keys);
}
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(mk_tls).await;
let res = config.connect(compute_config).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;
@@ -548,8 +551,6 @@ impl ConnectMechanism for TokioMechanism {
node_info.aux.clone(),
))
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
}
struct HyperMechanism {
@@ -573,20 +574,20 @@ impl ConnectMechanism for HyperMechanism {
node_info: &CachedNodeInfo,
config: &ComputeConfig,
) -> Result<Self::Connection, Self::ConnectError> {
let host_addr = node_info.config.get_host_addr();
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
let host_addr = node_info.conn_info.host_addr;
let host = &node_info.conn_info.host;
let permit = self.locks.get_permit(host).await?;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let tls = if node_info.config.get_ssl_mode() == SslMode::Disable {
let tls = if node_info.conn_info.ssl_mode == SslMode::Disable {
None
} else {
Some(&config.tls)
};
let port = node_info.config.get_port();
let res = connect_http2(host_addr, &host, port, config.timeout, tls).await;
let port = node_info.conn_info.port;
let res = connect_http2(host_addr, host, port, config.timeout, tls).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;
@@ -609,8 +610,6 @@ impl ConnectMechanism for HyperMechanism {
node_info.aux.clone(),
))
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
}
async fn connect_http2(

View File

@@ -23,12 +23,12 @@ use super::conn_pool_lib::{
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
GlobalConnPool,
};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::Metrics;
use crate::tls::postgres_rustls::MakeRustlsConnect;
type TlsStream = <MakeRustlsConnect as MakeTlsConnect<TcpStream>>::Stream;
type TlsStream = <ComputeConfig as MakeTlsConnect<TcpStream>>::Stream;
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {

View File

@@ -50,10 +50,10 @@ use crate::context::RequestContext;
use crate::ext::TaskExt;
use crate::metrics::Metrics;
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
use crate::proxy::run_until_cancelled;
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
use crate::util::run_until_cancelled;
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";

View File

@@ -41,10 +41,11 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::{ReadBodyError, read_body_with_limit};
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
use crate::pqproto::StartupMessageParams;
use crate::proxy::{NeonOptions, run_until_cancelled};
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::types::{DbName, RoleName};
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
use crate::util::run_until_cancelled;
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]

View File

@@ -2,10 +2,11 @@ use std::convert::TryFrom;
use std::sync::Arc;
use postgres_client::tls::MakeTlsConnect;
use rustls::ClientConfig;
use rustls::pki_types::ServerName;
use rustls::pki_types::{InvalidDnsNameError, ServerName};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::config::ComputeConfig;
mod private {
use std::future::Future;
use std::io;
@@ -123,36 +124,27 @@ mod private {
}
}
/// A `MakeTlsConnect` implementation using `rustls`.
///
/// That way you can connect to PostgreSQL using `rustls` as the TLS stack.
#[derive(Clone)]
pub struct MakeRustlsConnect {
pub config: Arc<ClientConfig>,
}
impl MakeRustlsConnect {
/// Creates a new `MakeRustlsConnect` from the provided `ClientConfig`.
#[must_use]
pub fn new(config: Arc<ClientConfig>) -> Self {
Self { config }
}
}
impl<S> MakeTlsConnect<S> for MakeRustlsConnect
impl<S> MakeTlsConnect<S> for ComputeConfig
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Stream = private::RustlsStream<S>;
type TlsConnect = private::RustlsConnect;
type Error = rustls::pki_types::InvalidDnsNameError;
type Error = InvalidDnsNameError;
fn make_tls_connect(&mut self, hostname: &str) -> Result<Self::TlsConnect, Self::Error> {
ServerName::try_from(hostname).map(|dns_name| {
private::RustlsConnect(private::RustlsConnectData {
hostname: dns_name.to_owned(),
connector: Arc::clone(&self.config).into(),
})
})
fn make_tls_connect(&self, hostname: &str) -> Result<Self::TlsConnect, Self::Error> {
make_tls_connect(&self.tls, hostname)
}
}
pub fn make_tls_connect(
tls: &Arc<rustls::ClientConfig>,
hostname: &str,
) -> Result<private::RustlsConnect, InvalidDnsNameError> {
ServerName::try_from(hostname).map(|dns_name| {
private::RustlsConnect(private::RustlsConnectData {
hostname: dns_name.to_owned(),
connector: tls.clone().into(),
})
})
}

14
proxy/src/util.rs Normal file
View File

@@ -0,0 +1,14 @@
use std::pin::pin;
use futures::future::{Either, select};
use tokio_util::sync::CancellationToken;
pub async fn run_until_cancelled<F: Future>(
f: F,
cancellation_token: &CancellationToken,
) -> Option<F::Output> {
match select(pin!(f), pin!(cancellation_token.cancelled())).await {
Either::Left((f, _)) => Some(f),
Either::Right(((), _)) => None,
}
}

View File

@@ -8,8 +8,8 @@ use std::error::Error as _;
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
TimelineStatus,
self, PullTimelineRequest, PullTimelineResponse, SafekeeperStatus, SafekeeperUtilization,
TimelineCreateRequest, TimelineStatus,
};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
@@ -183,6 +183,12 @@ impl Client {
self.get(&uri).await
}
pub async fn status(&self) -> Result<SafekeeperStatus> {
let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;

View File

@@ -395,6 +395,8 @@ pub enum TimelineError {
Cancelled(TenantTimelineId),
#[error("Timeline {0} was not found in global map")]
NotFound(TenantTimelineId),
#[error("Timeline {0} has been deleted")]
Deleted(TenantTimelineId),
#[error("Timeline {0} creation is in progress")]
CreationInProgress(TenantTimelineId),
#[error("Timeline {0} exists on disk, but wasn't loaded on startup")]

View File

@@ -78,7 +78,13 @@ impl GlobalTimelinesState {
Some(GlobalMapTimeline::CreationInProgress) => {
Err(TimelineError::CreationInProgress(*ttid))
}
None => Err(TimelineError::NotFound(*ttid)),
None => {
if self.has_tombstone(ttid) {
Err(TimelineError::Deleted(*ttid))
} else {
Err(TimelineError::NotFound(*ttid))
}
}
}
}

View File

@@ -0,0 +1 @@
ALTER TABLE nodes DROP COLUMN lifecycle;

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ADD COLUMN lifecycle VARCHAR NOT NULL DEFAULT 'active';

View File

@@ -907,6 +907,42 @@ async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiErr
json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
}
async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let mut nodes = state.service.tombstone_list().await?;
nodes.sort_by_key(|n| n.get_id());
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
json_response(StatusCode::OK, api_nodes)
}
async fn handle_tombstone_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
json_response(
StatusCode::OK,
state.service.tombstone_delete(node_id).await?,
)
}
async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -2062,6 +2098,20 @@ pub fn make_router(
.post("/debug/v1/node/:node_id/drop", |r| {
named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
})
.delete("/debug/v1/tombstone/:node_id", |r| {
named_request_span(
r,
handle_tombstone_delete,
RequestName("debug_v1_tombstone_delete"),
)
})
.get("/debug/v1/tombstone", |r| {
named_request_span(
r,
handle_tombstone_list,
RequestName("debug_v1_tombstone_list"),
)
})
.post("/debug/v1/tenant/:tenant_id/import", |r| {
named_request_span(
r,

View File

@@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;
use pageserver_api::controller_api::{
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeLifecycle, NodeRegisterRequest,
NodeSchedulingPolicy, TenantLocateResponseShard,
};
use pageserver_api::shard::TenantShardId;
@@ -29,6 +29,7 @@ pub(crate) struct Node {
availability: NodeAvailability,
scheduling: NodeSchedulingPolicy,
lifecycle: NodeLifecycle,
listen_http_addr: String,
listen_http_port: u16,
@@ -228,6 +229,7 @@ impl Node {
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Active,
lifecycle: NodeLifecycle::Active,
availability: NodeAvailability::Offline,
availability_zone_id,
use_https,
@@ -239,6 +241,7 @@ impl Node {
NodePersistence {
node_id: self.id.0 as i64,
scheduling_policy: self.scheduling.into(),
lifecycle: self.lifecycle.into(),
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port as i32,
listen_https_port: self.listen_https_port.map(|x| x as i32),
@@ -263,6 +266,7 @@ impl Node {
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
.expect("Bad scheduling policy in DB"),
lifecycle: NodeLifecycle::from_str(&np.lifecycle).expect("Bad lifecycle in DB"),
listen_http_addr: np.listen_http_addr,
listen_http_port: np.listen_http_port as u16,
listen_https_port: np.listen_https_port.map(|x| x as u16),

View File

@@ -19,7 +19,7 @@ use futures::FutureExt;
use futures::future::BoxFuture;
use itertools::Itertools;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
AvailabilityZone, MetadataHealthRecord, NodeLifecycle, NodeSchedulingPolicy, PlacementPolicy,
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
};
use pageserver_api::models::{ShardImportStatus, TenantConfig};
@@ -102,6 +102,7 @@ pub(crate) enum DatabaseOperation {
UpdateNode,
DeleteNode,
ListNodes,
ListTombstones,
BeginShardSplit,
CompleteShardSplit,
AbortShardSplit,
@@ -357,6 +358,8 @@ impl Persistence {
}
/// When a node is first registered, persist it before using it for anything
/// If the provided node_id already exists, it will be error.
/// The common case is when a node marked for deletion wants to register.
pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
let np = &node.to_persistent();
self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| {
@@ -373,19 +376,41 @@ impl Persistence {
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
let nodes: Vec<NodePersistence> = self
use crate::schema::nodes::dsl::*;
let result: Vec<NodePersistence> = self
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
Box::pin(async move {
Ok(crate::schema::nodes::table
.filter(lifecycle.ne(String::from(NodeLifecycle::Deleted)))
.load::<NodePersistence>(conn)
.await?)
})
})
.await?;
tracing::info!("list_nodes: loaded {} nodes", nodes.len());
tracing::info!("list_nodes: loaded {} nodes", result.len());
Ok(nodes)
Ok(result)
}
pub(crate) async fn list_tombstones(&self) -> DatabaseResult<Vec<NodePersistence>> {
use crate::schema::nodes::dsl::*;
let result: Vec<NodePersistence> = self
.with_measured_conn(DatabaseOperation::ListTombstones, move |conn| {
Box::pin(async move {
Ok(crate::schema::nodes::table
.filter(lifecycle.eq(String::from(NodeLifecycle::Deleted)))
.load::<NodePersistence>(conn)
.await?)
})
})
.await?;
tracing::info!("list_tombstones: loaded {} nodes", result.len());
Ok(result)
}
pub(crate) async fn update_node<V>(
@@ -404,6 +429,7 @@ impl Persistence {
Box::pin(async move {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.filter(lifecycle.ne(String::from(NodeLifecycle::Deleted)))
.set(values)
.execute(conn)
.await?;
@@ -447,6 +473,57 @@ impl Persistence {
.await
}
/// Tombstone is a special state where the node is not deleted from the database,
/// but it is not available for usage.
/// The main reason for it is to prevent the flaky node to register.
pub(crate) async fn set_tombstone(&self, del_node_id: NodeId) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.update_node(
del_node_id,
lifecycle.eq(String::from(NodeLifecycle::Deleted)),
)
.await
}
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| {
Box::pin(async move {
// You can hard delete a node only if it has a tombstone.
// So we need to check if the node has lifecycle set to deleted.
let node_to_delete = nodes
.filter(node_id.eq(del_node_id.0 as i64))
.first::<NodePersistence>(conn)
.await
.optional()?;
if let Some(np) = node_to_delete {
let lc = NodeLifecycle::from_str(&np.lifecycle).map_err(|e| {
DatabaseError::Logical(format!(
"Node {} has invalid lifecycle: {}",
del_node_id, e
))
})?;
if lc != NodeLifecycle::Deleted {
return Err(DatabaseError::Logical(format!(
"Node {} was not soft deleted before, cannot hard delete it",
del_node_id
)));
}
diesel::delete(nodes)
.filter(node_id.eq(del_node_id.0 as i64))
.execute(conn)
.await?;
}
Ok(())
})
})
.await
}
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
///
@@ -543,21 +620,6 @@ impl Persistence {
.await
}
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| {
Box::pin(async move {
diesel::delete(nodes)
.filter(node_id.eq(del_node_id.0 as i64))
.execute(conn)
.await?;
Ok(())
})
})
.await
}
/// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
/// batched increment of the generations of all tenants whose generation_pageserver is equal to
/// the node that called /re-attach.
@@ -571,6 +633,20 @@ impl Persistence {
let updated = self
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
Box::pin(async move {
// Check if the node is not marked as deleted
let deleted_node: i64 = nodes
.filter(node_id.eq(input_node_id.0 as i64))
.filter(lifecycle.eq(String::from(NodeLifecycle::Deleted)))
.count()
.get_result(conn)
.await?;
if deleted_node > 0 {
return Err(DatabaseError::Logical(format!(
"Node {} is marked as deleted, re-attach is not allowed",
input_node_id
)));
}
let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.set(generation.eq(generation + 1))
@@ -2048,6 +2124,7 @@ pub(crate) struct NodePersistence {
pub(crate) listen_pg_port: i32,
pub(crate) availability_zone_id: String,
pub(crate) listen_https_port: Option<i32>,
pub(crate) lifecycle: String,
}
/// Tenant metadata health status that are stored durably.

View File

@@ -33,6 +33,7 @@ diesel::table! {
listen_pg_port -> Int4,
availability_zone_id -> Varchar,
listen_https_port -> Nullable<Int4>,
lifecycle -> Varchar,
}
}

View File

@@ -166,6 +166,7 @@ enum NodeOperations {
Register,
Configure,
Delete,
DeleteTombstone,
}
/// The leadership status for the storage controller process.
@@ -1107,7 +1108,8 @@ impl Service {
observed
}
/// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
/// Used during [`Self::startup_reconcile`] and shard splits: detach a list of unknown-to-us
/// tenants from pageservers.
///
/// This is safe to run in the background, because if we don't have this TenantShardId in our map of
/// tenants, then it is probably something incompletely deleted before: we will not fight with any
@@ -6210,7 +6212,11 @@ impl Service {
}
}
pausable_failpoint!("shard-split-pre-complete");
fail::fail_point!("shard-split-pre-complete", |_| Err(ApiError::Conflict(
"failpoint".to_string()
)));
pausable_failpoint!("shard-split-pre-complete-pause");
// TODO: if the pageserver restarted concurrently with our split API call,
// the actual generation of the child shard might differ from the generation
@@ -6232,6 +6238,15 @@ impl Service {
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
// Notify all page servers to detach and clean up the old shards because they will no longer
// be needed. This is best-effort: if it fails, it will be cleaned up on a subsequent
// Pageserver re-attach/startup.
let shards_to_cleanup = targets
.iter()
.map(|target| (target.parent_id, target.node.get_id()))
.collect();
self.cleanup_locations(shards_to_cleanup).await;
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps, stripe_size) in child_locations {
@@ -6909,7 +6924,7 @@ impl Service {
/// detaching or deleting it on pageservers. We do not try and re-schedule any
/// tenants that were on this node.
pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
self.persistence.delete_node(node_id).await?;
self.persistence.set_tombstone(node_id).await?;
let mut locked = self.inner.write().unwrap();
@@ -7033,9 +7048,10 @@ impl Service {
// That is safe because in Service::spawn we only use generation_pageserver if it refers to a node
// that exists.
// 2. Actually delete the node from the database and from in-memory state
// 2. Actually delete the node from in-memory state and set tombstone to the database
// for preventing the node to register again.
tracing::info!("Deleting node from database");
self.persistence.delete_node(node_id).await?;
self.persistence.set_tombstone(node_id).await?;
Ok(())
}
@@ -7054,6 +7070,35 @@ impl Service {
Ok(nodes)
}
pub(crate) async fn tombstone_list(&self) -> Result<Vec<Node>, ApiError> {
self.persistence
.list_tombstones()
.await?
.into_iter()
.map(|np| Node::from_persistent(np, false))
.collect::<Result<Vec<_>, _>>()
.map_err(ApiError::InternalServerError)
}
pub(crate) async fn tombstone_delete(&self, node_id: NodeId) -> Result<(), ApiError> {
let _node_lock = trace_exclusive_lock(
&self.node_op_locks,
node_id,
NodeOperations::DeleteTombstone,
)
.await;
if matches!(self.get_node(node_id).await, Err(ApiError::NotFound(_))) {
self.persistence.delete_node(node_id).await?;
Ok(())
} else {
Err(ApiError::Conflict(format!(
"Node {} is in use, consider using tombstone API first",
node_id
)))
}
}
pub(crate) async fn get_node(&self, node_id: NodeId) -> Result<Node, ApiError> {
self.inner
.read()
@@ -7224,7 +7269,25 @@ impl Service {
};
match registration_status {
RegistrationStatus::New => self.persistence.insert_node(&new_node).await?,
RegistrationStatus::New => {
self.persistence.insert_node(&new_node).await.map_err(|e| {
if matches!(
e,
crate::persistence::DatabaseError::Query(
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)
)
) {
// The node can be deleted by tombstone API, and not show up in the list of nodes.
// If you see this error, check tombstones first.
ApiError::Conflict(format!("Node {} is already exists", new_node.get_id()))
} else {
ApiError::from(e)
}
})?;
}
RegistrationStatus::NeedUpdate => {
self.persistence
.update_node_on_registration(

View File

@@ -69,15 +69,17 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self):
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
def prewarm_lfc(self, from_endpoint_id: str | None = None):
url: str = f"http://localhost:{self.external_port}/lfc/prewarm"
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(url, params=params).raise_for_status()
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status == "completed", f"{status}, error {err}"
wait_until(prewarmed)
wait_until(prewarmed, timeout=60)
def offload_lfc(self):
url = f"http://localhost:{self.external_port}/lfc/offload"

View File

@@ -129,6 +129,18 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def get_project_limits(self, project_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/limits",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_project(
self,
project_id: str,

View File

@@ -2054,6 +2054,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
def tombstone_delete(self, node_id):
log.info(f"tombstone_delete({node_id})")
self.request(
"DELETE",
f"{self.api}/debug/v1/tombstone/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
def node_drain(self, node_id):
log.info(f"node_drain({node_id})")
self.request(
@@ -2110,6 +2118,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return response.json()
def tombstone_list(self):
response = self.request(
"GET",
f"{self.api}/debug/v1/tombstone",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def tenant_shard_dump(self):
"""
Debug listing API: dumps the internal map of tenant shards
@@ -4030,6 +4046,16 @@ def static_proxy(
"CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
)
vanilla_pg.stop()
vanilla_pg.edit_hba(
[
"local all all trust",
"host all all 127.0.0.1/32 scram-sha-256",
"host all all ::1/128 scram-sha-256",
]
)
vanilla_pg.start()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
http_port = port_distributor.get_port()

View File

@@ -45,6 +45,8 @@ class NeonEndpoint:
if self.branch.connect_env:
self.connect_env = self.branch.connect_env.copy()
self.connect_env["PGHOST"] = self.host
if self.type == "read_only":
self.project.read_only_endpoints_total += 1
def delete(self):
self.project.delete_endpoint(self.id)
@@ -228,8 +230,13 @@ class NeonProject:
self.benchmarks: dict[str, subprocess.Popen[Any]] = {}
self.restore_num: int = 0
self.restart_pgbench_on_console_errors: bool = False
self.limits: dict[str, Any] = self.get_limits()["limits"]
self.read_only_endpoints_total: int = 0
def delete(self):
def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
def delete(self) -> None:
self.neon_api.delete_project(self.id)
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
@@ -282,6 +289,7 @@ class NeonProject:
self.neon_api.delete_endpoint(self.id, endpoint_id)
self.endpoints[endpoint_id].branch.endpoints.pop(endpoint_id)
self.endpoints.pop(endpoint_id)
self.read_only_endpoints_total -= 1
self.wait()
def start_benchmark(self, target: str, clients: int = 10) -> subprocess.Popen[Any]:
@@ -369,49 +377,64 @@ def setup_class(
print(f"::warning::Retried on 524 error {neon_api.retries524} times")
if neon_api.retries4xx > 0:
print(f"::warning::Retried on 4xx error {neon_api.retries4xx} times")
log.info("Removing the project")
log.info("Removing the project %s", project.id)
project.delete()
def do_action(project: NeonProject, action: str) -> None:
def do_action(project: NeonProject, action: str) -> bool:
"""
Runs the action
"""
log.info("Action: %s", action)
if action == "new_branch":
log.info("Trying to create a new branch")
if 0 <= project.limits["max_branches"] <= len(project.branches):
log.info(
"Maximum branch limit exceeded (%s of %s)",
len(project.branches),
project.limits["max_branches"],
)
return False
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
log.info("Parent: %s", parent)
child = parent.create_child_branch()
if child is None:
return
return False
log.info("Created branch %s", child)
child.start_benchmark()
elif action == "delete_branch":
if project.leaf_branches:
target = random.choice(list(project.leaf_branches.values()))
target: NeonBranch = random.choice(list(project.leaf_branches.values()))
log.info("Trying to delete branch %s", target)
target.delete()
else:
log.info("Leaf branches not found, skipping")
return False
elif action == "new_ro_endpoint":
if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total:
log.info(
"Maximum read only endpoint limit exceeded (%s of %s)",
project.read_only_endpoints_total,
project.limits["max_read_only_endpoints"],
)
return False
ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches]
).create_ro_endpoint()
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
ep.start_benchmark()
elif action == "delete_ro_endpoint":
if project.read_only_endpoints_total == 0:
log.info("no read_only endpoints present, skipping")
return False
ro_endpoints: list[NeonEndpoint] = [
endpoint for endpoint in project.endpoints.values() if endpoint.type == "read_only"
]
if ro_endpoints:
target_ep: NeonEndpoint = random.choice(ro_endpoints)
target_ep.delete()
log.info("endpoint %s deleted", target_ep.id)
else:
log.info("no read_only endpoints present, skipping")
target_ep: NeonEndpoint = random.choice(ro_endpoints)
target_ep.delete()
log.info("endpoint %s deleted", target_ep.id)
elif action == "restore_random_time":
if project.leaf_branches:
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
@@ -419,8 +442,10 @@ def do_action(project: NeonProject, action: str) -> None:
br.restore_random_time()
else:
log.info("No leaf branches found")
return False
else:
raise ValueError(f"The action {action} is unknown")
return True
@pytest.mark.timeout(7200)
@@ -457,8 +482,9 @@ def test_api_random(
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
for _ in range(num_operations):
log.info("Starting action #%s", _ + 1)
do_action(
while not do_action(
project, random.choices([a[0] for a in ACTIONS], weights=[w[1] for w in ACTIONS])[0]
)
):
log.info("Retrying...")
project.check_all_benchmarks()
assert True

View File

@@ -87,6 +87,9 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
# Set up pageserver for import
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
env.pageserver.tenant_create(tenant)

View File

@@ -188,7 +188,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
pg_cur.execute("select pg_reload_conf()")
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
# Same thing as prewarm_lfc(), testing other method
http_client.prewarm_lfc(endpoint.endpoint_id)
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))

View File

@@ -16,7 +16,7 @@ if TYPE_CHECKING:
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
def test_pageserver_restarts_under_workload(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.create_branch("test_pageserver_restarts")
endpoint = env.endpoints.create_start("test_pageserver_restarts")
@@ -28,7 +28,11 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread = threading.Thread(
target=run_pgbench,
args=(endpoint.connstr(options="-cstatement_timeout=360s"),),
daemon=True,
)
thread.start()
for _ in range(n_restarts):

View File

@@ -19,11 +19,15 @@ TABLE_NAME = "neon_control_plane.endpoints"
async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: VanillaPostgres):
# Shouldn't be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')"
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')",
user="proxy",
password="password",
)
# Should be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')"
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')",
user="proxy",
password="password",
)
def check_cannot_connect(**kwargs):
@@ -60,7 +64,9 @@ async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
# Shouldn't be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')"
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')",
user="proxy",
password="password",
)
def query(status: int, query: str, *args):
@@ -75,6 +81,8 @@ async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
query(400, "select 1;") # ip address is not allowed
# Should be able to connect to this project
vanilla_pg.safe_psql(
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'"
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'",
user="proxy",
password="password",
)
query(200, "select 1;") # should work now

View File

@@ -30,6 +30,7 @@ def test_safekeeper_delete_timeline(neon_env_builder: NeonEnvBuilder, auth_enabl
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)
@@ -198,6 +199,7 @@ def test_safekeeper_delete_timeline_under_load(neon_env_builder: NeonEnvBuilder)
env.pageserver.allowed_errors.extend(
[
".*Timeline.*was cancelled.*",
".*Timeline.*has been deleted.*",
".*Timeline.*was not found.*",
]
)

View File

@@ -1836,3 +1836,90 @@ def test_sharding_gc(
shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}")
assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn
def test_split_ps_delete_old_shard_after_commit(neon_env_builder: NeonEnvBuilder):
"""
Check that PageServer only deletes old shards after the split is committed such that it doesn't
have to download a lot of files during abort.
"""
DBNAME = "regression"
init_shard_count = 4
neon_env_builder.num_pageservers = init_shard_count
stripe_size = 32
env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count, initial_tenant_shard_stripe_size=stripe_size
)
env.storage_controller.allowed_errors.extend(
[
# All split failures log a warning when they enqueue the abort operation
".*Enqueuing background abort.*",
# Tolerate any error logs that mention a failpoint
".*failpoint.*",
]
)
endpoint = env.endpoints.create("main")
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# Write some initial data.
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);")
for _ in range(1000):
endpoint.safe_psql(
"INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
)
# Record how many bytes we've downloaded before the split.
def collect_downloaded_bytes() -> list[float | None]:
downloaded_bytes = []
for page_server in env.pageservers:
metric = page_server.http_client().get_metric_value(
"pageserver_remote_ondemand_downloaded_bytes_total"
)
downloaded_bytes.append(metric)
return downloaded_bytes
downloaded_bytes_before = collect_downloaded_bytes()
# Attempt to split the tenant, but fail the split before it completes.
env.storage_controller.configure_failpoints(("shard-split-pre-complete", "return(1)"))
with pytest.raises(StorageControllerApiException):
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=16)
# Wait until split is aborted.
def check_split_is_aborted():
tenants = env.storage_controller.tenant_list()
assert len(tenants) == 1
shards = tenants[0]["shards"]
assert len(shards) == 4
for shard in shards:
assert not shard["is_splitting"]
assert not shard["is_reconciling"]
# Make sure all new shards have been deleted.
valid_shards = 0
for ps in env.pageservers:
for tenant_dir in os.listdir(ps.workdir / "tenants"):
try:
tenant_shard_id = TenantShardId.parse(tenant_dir)
valid_shards += 1
assert tenant_shard_id.shard_count == 4
except ValueError:
log.info(f"{tenant_dir} is not valid tenant shard id")
assert valid_shards >= 4
wait_until(check_split_is_aborted)
endpoint.safe_psql("SELECT count(*) from usertable;", log_query=False)
# Make sure we didn't download anything following the aborted split.
downloaded_bytes_after = collect_downloaded_bytes()
assert downloaded_bytes_before == downloaded_bytes_after
endpoint.stop_and_destroy()

View File

@@ -2956,7 +2956,7 @@ def test_storage_controller_leadership_transfer_during_split(
env.storage_controller.allowed_errors.extend(
[".*Unexpected child shard count.*", ".*Enqueuing background abort.*"]
)
pause_failpoint = "shard-split-pre-complete"
pause_failpoint = "shard-split-pre-complete-pause"
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
split_fut = executor.submit(
@@ -3003,7 +3003,7 @@ def test_storage_controller_leadership_transfer_during_split(
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
json=[{"name": pause_failpoint, "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
@@ -3093,6 +3093,58 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB
wait_until(reconfigure_node_again)
def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 3
env = neon_env_builder.init_start()
def assert_nodes_count(n: int):
nodes = env.storage_controller.node_list()
assert len(nodes) == n
# Nodes count must remain the same before deletion
assert_nodes_count(3)
ps = env.pageservers[0]
env.storage_controller.node_delete(ps.id)
# After deletion, the node count must be reduced
assert_nodes_count(2)
# Running pageserver CLI init in a separate thread
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
log.info("Restarting tombstoned pageserver...")
ps.stop()
ps_start_fut = executor.submit(lambda: ps.start(await_active=False))
# After deleted pageserver restart, the node count must remain the same
assert_nodes_count(2)
tombstones = env.storage_controller.tombstone_list()
assert len(tombstones) == 1 and tombstones[0]["id"] == ps.id
env.storage_controller.tombstone_delete(ps.id)
tombstones = env.storage_controller.tombstone_list()
assert len(tombstones) == 0
# Wait for the pageserver start operation to complete.
# If it fails with an exception, we try restarting the pageserver since the failure
# may be due to the storage controller refusing to register the node.
# However, if we get a TimeoutError that means the pageserver is completely hung,
# which is an unexpected failure mode that we'll let propagate up.
try:
ps_start_fut.result(timeout=20)
except TimeoutError:
raise
except Exception:
log.info("Restarting deleted pageserver...")
ps.restart()
# Finally, the node can be registered again after tombstone is deleted
wait_until(lambda: assert_nodes_count(3))
def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder):
"""
The storage controller is meant to handle the case where a timeline CRUD operation races

View File

@@ -12,7 +12,7 @@ from typing import TYPE_CHECKING
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.common_types import Lsn, TenantId, TimelineArchivalState, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import (
PAGESERVER_GLOBAL_METRICS,
@@ -299,6 +299,65 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_1, _ = env.create_tenant()
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
endpoint_tenant1 = env.endpoints.create_start(
"test_metrics_removed_after_offload_1", tenant_id=tenant_1
)
endpoint_tenant2 = env.endpoints.create_start(
"test_metrics_removed_after_offload_2", tenant_id=tenant_1
)
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (5000050000,)
endpoint.stop()
def get_ps_metric_samples_for_timeline(
tenant_id: TenantId, timeline_id: TimelineId
) -> list[Sample]:
ps_metrics = env.pageserver.http_client().get_metrics()
samples = []
for metric_name in ps_metrics.metrics:
for sample in ps_metrics.query_all(
name=metric_name,
filter={"tenant_id": str(tenant_id), "timeline_id": str(timeline_id)},
):
samples.append(sample)
return samples
for timeline in [timeline_1, timeline_2]:
pre_offload_samples = set(
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
)
assert len(pre_offload_samples) > 0, f"expected at least one sample for {timeline}"
env.pageserver.http_client().timeline_archival_config(
tenant_1,
timeline,
state=TimelineArchivalState.ARCHIVED,
)
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
post_offload_samples = set(
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
)
assert post_offload_samples == set()
def test_pageserver_with_empty_tenants(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()

View File

@@ -433,6 +433,7 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)
@@ -1934,6 +1935,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.extend(
[
".*Timeline .* was not found in global map.*",
".*Timeline .* has been deleted.*",
".*Timeline .* was cancelled and cannot be used anymore.*",
]
)