Compare commits

..

12 Commits

Author SHA1 Message Date
Konstantin Knizhnik
23f410a481 Create rel_dir when database is created from WAL 2024-02-09 23:05:32 +02:00
Konstantin Knizhnik
529a79d263 Increment generation which LFC is disabled by assigning 0 to neon.file_cache_size_limit (#6692)
## Problem

test_lfc_resize sometimes filed with assertion failure when require lock
in write operation:

```
	if (lfc_ctl->generation == generation)
	{
		Assert(LFC_ENABLED());
```

## Summary of changes

Increment generation when 0 is assigned to neon.file_cache_size_limit

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-02-09 08:14:41 +02:00
Joonas Koivunen
c09993396e fix: secondary tenant relative order eviction (#6491)
Calculate the `relative_last_activity` using the total evicted and
resident layers similar to what we originally planned.

Cc: #5331
2024-02-09 00:37:57 +02:00
Joonas Koivunen
9a31311990 fix(heavier_once_cell): assertion failure can be hit (#6652)
@problame noticed that the `tokio::sync::AcquireError` branch assertion
can be hit like in the first commit. We haven't seen this yet in
production, but I'd prefer not to see it there. There `take_and_deinit`
is being used, but this race must be quite timing sensitive.
2024-02-08 22:40:14 +02:00
Arpad Müller
c0e0fc8151 Update Rust to 1.76.0 (#6683)
[Release notes](https://github.com/rust-lang/rust/releases/tag/1.75.0).
2024-02-08 19:57:02 +01:00
John Spray
e8d2843df6 storage controller: improved handling of node availability on restart (#6658)
- Automatically set a node's availability to Active if it is responsive
in startup_reconcile
- Impose a 5s timeout of HTTP request to list location conf, so that an
unresponsive node can't hang it for minutes
- Do several retries if the request fails with a retryable error, to be
tolerant of concurrent pageserver & storage controller restarts
- Add a readiness hook for use with k8s so that we can tell when the
startup reconciliaton is done and the service is fully ready to do work.
- Add /metrics to the list of un-authenticated endpoints (this is
unrelated but we're touching the line in this PR already, and it fixes
auth error spam in deployed container.)
- A test for the above.

Closes: #6670
2024-02-08 18:00:53 +00:00
John Spray
af91a28936 pageserver: shard splitting (#6379)
## Problem

One doesn't know at tenant creation time how large the tenant will grow.
We need to be able to dynamically adjust the shard count at runtime.
This is implemented as "splitting" of shards into smaller child shards,
which cover a subset of the keyspace that the parent covered.

Refer to RFC: https://github.com/neondatabase/neon/pull/6358

Part of epic: #6278

## Summary of changes

This PR implements the happy path (does not cleanly recover from a crash
mid-split, although won't lose any data), without any optimizations
(e.g. child shards re-download their own copies of layers that the
parent shard already had on local disk)

- Add `/v1/tenant/:tenant_shard_id/shard_split` API to pageserver: this
copies the shard's index to the child shards' paths, instantiates child
`Tenant` object, and tears down parent `Tenant` object.
- Add `splitting` column to `tenant_shards` table. This is written into
an existing migration because we haven't deployed yet, so don't need to
cleanly upgrade.
- Add `/control/v1/tenant/:tenant_id/shard_split` API to
attachment_service,
- Add `test_sharding_split_smoke` test. This covers the happy path:
future PRs will add tests that exercise failure cases.
2024-02-08 15:35:13 +00:00
Konstantin Knizhnik
43eae17f0d Drop unused replication slots (#6655)
## Problem

See #6626

If there is inactive replication slot then Postgres will not bw able to
shrink WAL and delete unused snapshots.
If she other active subscription is present, then snapshots created each
15 seconds will overflow AUX_DIR.

Setting `max_slot_wal_keep_size` doesn't solve the problem, because even
small WAL segment will be enough to overflow AUX_DIR if there is no
other activity on the system.

## Summary of changes

If there are active subscriptions and some logical replication slots are
not used during `neon.logical_replication_max_time_lag` interval, then
unused slot is dropped.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-02-08 17:31:15 +02:00
Anna Khanova
6c34d4cd14 Proxy: set timeout on establishing connection (#6679)
## Problem

There is no timeout on the handshake.

## Summary of changes

Set the timeout on the establishing connection.
2024-02-08 13:52:04 +00:00
Anna Khanova
c63e3e7e84 Proxy: improve http-pool (#6577)
## Problem

The password check logic for the sql-over-http is a bit non-intuitive. 

## Summary of changes

1. Perform scram auth using the same logic as for websocket cleartext
password.
2. Split establish connection logic and connection pool.
3. Parallelize param parsing logic with authentication + wake compute.
4. Limit the total number of clients
2024-02-08 12:57:05 +01:00
Christian Schwarz
c52495774d tokio-epoll-uring: expose its metrics in pageserver's /metrics (#6672)
context: https://github.com/neondatabase/neon/issues/6667
2024-02-07 23:58:54 +00:00
Andreas Scherbaum
9a017778a9 Update copyright notice, set it to current year (#6671)
## Problem

Copyright notice is outdated

## Summary of changes

Replace the initial year `2022` with `2022 - 2024`, after brief
discussion with Stas about the format

Co-authored-by: Andreas Scherbaum <andreas@neon.tech>
2024-02-08 00:48:31 +01:00
58 changed files with 2499 additions and 675 deletions

5
Cargo.lock generated
View File

@@ -4079,6 +4079,7 @@ dependencies = [
"clap",
"consumption_metrics",
"dashmap",
"env_logger",
"futures",
"git-version",
"hashbrown 0.13.2",
@@ -5739,7 +5740,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc"
dependencies = [
"futures",
"nix 0.26.4",
@@ -6264,7 +6265,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc"
dependencies = [
"io-uring",
"libc",

View File

@@ -100,6 +100,11 @@ RUN mkdir -p /data/.neon/ && chown -R neon:neon /data/.neon/ \
-c "listen_pg_addr='0.0.0.0:6400'" \
-c "listen_http_addr='0.0.0.0:9898'"
# When running a binary that links with libpq, default to using our most recent postgres version. Binaries
# that want a particular postgres version will select it explicitly: this is just a default.
ENV LD_LIBRARY_PATH /usr/local/v16/lib
VOLUME ["/data"]
USER neon
EXPOSE 6400

View File

@@ -135,7 +135,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.75.0
ENV RUSTC_VERSION=1.76.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \

2
NOTICE
View File

@@ -1,5 +1,5 @@
Neon
Copyright 2022 Neon Inc.
Copyright 2022 - 2024 Neon Inc.
The PostgreSQL submodules in vendor/ are licensed under the PostgreSQL license.
See vendor/postgres-vX/COPYRIGHT for details.

View File

@@ -1235,19 +1235,10 @@ LIMIT 100",
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
let build_tag_str = if spec
.features
.contains(&ComputeFeature::RemoteExtensionsUseLatest)
{
"latest"
} else {
&self.build_tag
};
let mut download_tasks = Vec::new();
for library in &libs_vec {
let (ext_name, ext_path) =
remote_extensions.get_ext(library, true, build_tag_str, &self.pgversion)?;
remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
download_tasks.push(self.download_extension(ext_name, ext_path));
}
let results = join_all(download_tasks).await;

View File

@@ -8,7 +8,6 @@ use std::thread;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use compute_api::spec::ComputeFeature;
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
@@ -172,16 +171,12 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
};
let build_tag_str = if spec
.features
.contains(&ComputeFeature::RemoteExtensionsUseLatest)
{
"latest"
} else {
&compute.build_tag
};
remote_extensions.get_ext(&filename, is_library, build_tag_str, &compute.pgversion)
remote_extensions.get_ext(
&filename,
is_library,
&compute.build_tag,
&compute.pgversion,
)
};
match ext {

View File

@@ -264,9 +264,10 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
// case we miss some events for some reason. Not strictly necessary, but
// better safe than sorry.
let (tx, rx) = std::sync::mpsc::channel();
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
let watcher_res = notify::recommended_watcher(move |res| {
let _ = tx.send(res);
}) {
});
let (mut watcher, rx): (Box<dyn Watcher>, _) = match watcher_res {
Ok(watcher) => (Box::new(watcher), rx),
Err(e) => {
match e.kind {

View File

@@ -7,6 +7,7 @@ CREATE TABLE tenant_shards (
generation INTEGER NOT NULL,
generation_pageserver BIGINT NOT NULL,
placement_policy VARCHAR NOT NULL,
splitting SMALLINT NOT NULL,
-- config is JSON encoded, opaque to the database.
config TEXT NOT NULL
);

View File

@@ -3,7 +3,8 @@ use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use pageserver_api::models::{
TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest,
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -41,7 +42,7 @@ pub struct HttpState {
impl HttpState {
pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
let allowlist_routes = ["/status"]
let allowlist_routes = ["/status", "/ready", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect::<Vec<_>>();
@@ -292,6 +293,19 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
}
async fn handle_tenant_shard_split(
service: Arc<Service>,
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
json_response(
StatusCode::OK,
service.tenant_shard_split(tenant_id, split_req).await?,
)
}
async fn handle_tenant_shard_migrate(
service: Arc<Service>,
mut req: Request<Body>,
@@ -311,6 +325,17 @@ async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError>
json_response(StatusCode::OK, ())
}
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let state = get_state(&req);
if state.service.startup_complete.is_ready() {
json_response(StatusCode::OK, ())
} else {
json_response(StatusCode::SERVICE_UNAVAILABLE, ())
}
}
impl From<ReconcileError> for ApiError {
fn from(value: ReconcileError) -> Self {
ApiError::Conflict(format!("Reconciliation error: {}", value))
@@ -366,6 +391,7 @@ pub fn make_router(
.data(Arc::new(HttpState::new(service, auth)))
// Non-prefixed generic endpoints (status, metrics)
.get("/status", |r| request_span(r, handle_status))
.get("/ready", |r| request_span(r, handle_ready))
// Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
.post("/upcall/v1/re-attach", |r| {
request_span(r, handle_re_attach)
@@ -391,6 +417,9 @@ pub fn make_router(
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(r, handle_tenant_shard_migrate)
})
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
tenant_service_handler(r, handle_tenant_shard_split)
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.

View File

@@ -1,7 +1,9 @@
pub(crate) mod split_state;
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use self::split_state::SplitState;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
@@ -363,19 +365,101 @@ impl Persistence {
Ok(())
}
// TODO: when we start shard splitting, we must durably mark the tenant so that
// on restart, we know that we must go through recovery (list shards that exist
// and pick up where we left off and/or revert to parent shards).
// When we start shard splitting, we must durably mark the tenant so that
// on restart, we know that we must go through recovery.
//
// We create the child shards here, so that they will be available for increment_generation calls
// if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
#[allow(dead_code)]
pub(crate) async fn begin_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
todo!();
pub(crate) async fn begin_shard_split(
&self,
old_shard_count: ShardCount,
split_tenant_id: TenantId,
parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> DatabaseResult<()> {
// Mark parent shards as splitting
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.0 as i32))
.set((splitting.eq(1),))
.execute(conn)?;
if ShardCount(updated.try_into().map_err(|_| DatabaseError::Logical(format!("Overflow existing shard count {} while splitting", updated)))?) != old_shard_count {
// Perhaps a deletion or another split raced with this attempt to split, mutating
// the parent shards that we intend to split. In this case the split request should fail.
return Err(DatabaseError::Logical(
format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {old_shard_count:?})")
));
}
// FIXME: spurious clone to sidestep closure move rules
let parent_to_children = parent_to_children.clone();
// Insert child shards
for (parent_shard_id, children) in parent_to_children {
let mut parent = crate::schema::tenant_shards::table
.filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(parent_shard_id.shard_count.0 as i32))
.load::<TenantShardPersistence>(conn)?;
let parent = if parent.len() != 1 {
return Err(DatabaseError::Logical(format!(
"Parent shard {parent_shard_id} not found"
)));
} else {
parent.pop().unwrap()
};
for mut shard in children {
// Carry the parent's generation into the child
shard.generation = parent.generation;
debug_assert!(shard.splitting == SplitState::Splitting);
diesel::insert_into(tenant_shards)
.values(shard)
.execute(conn)?;
}
}
Ok(())
})?;
Ok(())
})
.await
}
// TODO: when we finish shard splitting, we must atomically clean up the old shards
// When we finish shard splitting, we must atomically clean up the old shards
// and insert the new shards, and clear the splitting marker.
#[allow(dead_code)]
pub(crate) async fn complete_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
todo!();
pub(crate) async fn complete_shard_split(
&self,
split_tenant_id: TenantId,
old_shard_count: ShardCount,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> QueryResult<()> {
// Drop parent shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.0 as i32))
.execute(conn)?;
// Clear sharding flag
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.set((splitting.eq(0),))
.execute(conn)?;
debug_assert!(updated > 0);
Ok(())
})?;
Ok(())
})
.await
}
}
@@ -403,6 +487,8 @@ pub(crate) struct TenantShardPersistence {
#[serde(default)]
pub(crate) placement_policy: String,
#[serde(default)]
pub(crate) splitting: SplitState,
#[serde(default)]
pub(crate) config: String,
}

View File

@@ -0,0 +1,46 @@
use diesel::pg::{Pg, PgValue};
use diesel::{
deserialize::FromSql, deserialize::FromSqlRow, expression::AsExpression, serialize::ToSql,
sql_types::Int2,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression)]
#[diesel(sql_type = SplitStateSQLRepr)]
#[derive(Deserialize, Serialize)]
pub enum SplitState {
Idle = 0,
Splitting = 1,
}
impl Default for SplitState {
fn default() -> Self {
Self::Idle
}
}
type SplitStateSQLRepr = Int2;
impl ToSql<SplitStateSQLRepr, Pg> for SplitState {
fn to_sql<'a>(
&'a self,
out: &'a mut diesel::serialize::Output<Pg>,
) -> diesel::serialize::Result {
let raw_value: i16 = *self as i16;
let mut new_out = out.reborrow();
ToSql::<SplitStateSQLRepr, Pg>::to_sql(&raw_value, &mut new_out)
}
}
impl FromSql<SplitStateSQLRepr, Pg> for SplitState {
fn from_sql(pg_value: PgValue) -> diesel::deserialize::Result<Self> {
match FromSql::<SplitStateSQLRepr, Pg>::from_sql(pg_value).map(|v| match v {
0 => Some(Self::Idle),
1 => Some(Self::Splitting),
_ => None,
})? {
Some(v) => Ok(v),
None => Err(format!("Invalid SplitState value, was: {:?}", pg_value.as_bytes()).into()),
}
}
}

View File

@@ -20,6 +20,7 @@ diesel::table! {
generation -> Int4,
generation_pageserver -> Int8,
placement_policy -> Varchar,
splitting -> Int2,
config -> Text,
}
}

View File

@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, HashMap},
cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet},
str::FromStr,
sync::Arc,
time::{Duration, Instant},
@@ -23,13 +24,14 @@ use pageserver_api::{
models::{
LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
TimelineCreateRequest, TimelineInfo,
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
},
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
};
use pageserver_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::{
backoff,
completion::Barrier,
generation::Generation,
http::error::ApiError,
@@ -40,7 +42,11 @@ use utils::{
use crate::{
compute_hook::{self, ComputeHook},
node::Node,
persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence},
persistence::{
split_state::SplitState, DatabaseError, NodePersistence, Persistence,
TenantShardPersistence,
},
reconciler::attached_location_conf,
scheduler::Scheduler,
tenant_state::{
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
@@ -145,31 +151,71 @@ impl Service {
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed = HashMap::new();
let nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let mut nodes_online = HashSet::new();
// TODO: give Service a cancellation token for clean shutdown
let cancel = CancellationToken::new();
// TODO: issue these requests concurrently
for node in nodes.values() {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
{
let nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
for node in nodes.values() {
let http_client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(5))
.build()
.expect("Failed to construct HTTP client");
let client = mgmt_api::Client::from_client(
http_client,
node.base_url(),
self.config.jwt_token.as_deref(),
);
tracing::info!("Scanning shards on node {}...", node.id);
match client.list_location_config().await {
Err(e) => {
tracing::warn!("Could not contact pageserver {} ({e})", node.id);
// TODO: be more tolerant, apply a generous 5-10 second timeout with retries, in case
// pageserver is being restarted at the same time as we are
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
}
}
Ok(listing) => {
tracing::info!(
"Received {} shard statuses from pageserver {}, setting it to Active",
listing.tenant_shards.len(),
node.id
);
for (tenant_shard_id, conf_opt) in listing.tenant_shards {
observed.insert(tenant_shard_id, (node.id, conf_opt));
let list_response = backoff::retry(
|| client.list_location_config(),
is_fatal,
1,
5,
"Location config listing",
&cancel,
)
.await;
let Some(list_response) = list_response else {
tracing::info!("Shutdown during startup_reconcile");
return;
};
tracing::info!("Scanning shards on node {}...", node.id);
match list_response {
Err(e) => {
tracing::warn!("Could not contact pageserver {} ({e})", node.id);
// TODO: be more tolerant, do some retries, in case
// pageserver is being restarted at the same time as we are
}
Ok(listing) => {
tracing::info!(
"Received {} shard statuses from pageserver {}, setting it to Active",
listing.tenant_shards.len(),
node.id
);
nodes_online.insert(node.id);
for (tenant_shard_id, conf_opt) in listing.tenant_shards {
observed.insert(tenant_shard_id, (node.id, conf_opt));
}
}
}
}
@@ -180,8 +226,19 @@ impl Service {
let mut compute_notifications = Vec::new();
// Populate intent and observed states for all tenants, based on reported state on pageservers
let shard_count = {
let (shard_count, nodes) = {
let mut locked = self.inner.write().unwrap();
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
let mut nodes = (*locked.nodes).clone();
for (node_id, node) in nodes.iter_mut() {
if nodes_online.contains(node_id) {
node.availability = NodeAvailability::Active;
}
}
locked.nodes = Arc::new(nodes);
let nodes = locked.nodes.clone();
for (tenant_shard_id, (node_id, observed_loc)) in observed {
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
@@ -213,7 +270,7 @@ impl Service {
}
}
locked.tenants.len()
(locked.tenants.len(), nodes)
};
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
@@ -274,9 +331,8 @@ impl Service {
let stream = futures::stream::iter(compute_notifications.into_iter())
.map(|(tenant_shard_id, node_id)| {
let compute_hook = compute_hook.clone();
let cancel = cancel.clone();
async move {
// TODO: give Service a cancellation token for clean shutdown
let cancel = CancellationToken::new();
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
tracing::error!(
tenant_shard_id=%tenant_shard_id,
@@ -382,7 +438,7 @@ impl Service {
))),
config,
persistence,
startup_complete,
startup_complete: startup_complete.clone(),
});
let result_task_this = this.clone();
@@ -476,6 +532,7 @@ impl Service {
generation_pageserver: i64::MAX,
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
splitting: SplitState::default(),
};
match self.persistence.insert_tenant_shards(vec![tsp]).await {
@@ -718,6 +775,7 @@ impl Service {
generation_pageserver: i64::MAX,
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
config: serde_json::to_string(&create_req.config).unwrap(),
splitting: SplitState::default(),
})
.collect();
self.persistence
@@ -977,6 +1035,10 @@ impl Service {
}
};
// TODO: if we timeout/fail on reconcile, we should still succeed this request,
// because otherwise a broken compute hook causes a feedback loop where
// location_config returns 500 and gets retried forever.
if let Some(create_req) = maybe_create {
let create_resp = self.tenant_create(create_req).await?;
result.shards = create_resp
@@ -1100,6 +1162,7 @@ impl Service {
self.ensure_attached_wait(tenant_id).await?;
// TODO: refuse to do this if shard splitting is in progress
// (https://github.com/neondatabase/neon/issues/6676)
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -1180,6 +1243,7 @@ impl Service {
self.ensure_attached_wait(tenant_id).await?;
// TODO: refuse to do this if shard splitting is in progress
// (https://github.com/neondatabase/neon/issues/6676)
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -1352,6 +1416,326 @@ impl Service {
})
}
pub(crate) async fn tenant_shard_split(
&self,
tenant_id: TenantId,
split_req: TenantShardSplitRequest,
) -> Result<TenantShardSplitResponse, ApiError> {
let mut policy = None;
let mut shard_ident = None;
// TODO: put a cancellation token on Service for clean shutdown
let cancel = CancellationToken::new();
// A parent shard which will be split
struct SplitTarget {
parent_id: TenantShardId,
node: Node,
child_ids: Vec<TenantShardId>,
}
// Validate input, and calculate which shards we will create
let (old_shard_count, targets, compute_hook) = {
let locked = self.inner.read().unwrap();
let pageservers = locked.nodes.clone();
let mut targets = Vec::new();
// In case this is a retry, count how many already-split shards we found
let mut children_found = Vec::new();
let mut old_shard_count = None;
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
match shard.shard.count.0.cmp(&split_req.new_shard_count) {
Ordering::Equal => {
// Already split this
children_found.push(*tenant_shard_id);
continue;
}
Ordering::Greater => {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Requested count {} but already have shards at count {}",
split_req.new_shard_count,
shard.shard.count.0
)));
}
Ordering::Less => {
// Fall through: this shard has lower count than requested,
// is a candidate for splitting.
}
}
match old_shard_count {
None => old_shard_count = Some(shard.shard.count),
Some(old_shard_count) => {
if old_shard_count != shard.shard.count {
// We may hit this case if a caller asked for two splits to
// different sizes, before the first one is complete.
// e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
// of shard_count=1 and shard_count=2 shards in the map.
return Err(ApiError::Conflict(
"Cannot split, currently mid-split".to_string(),
));
}
}
}
if policy.is_none() {
policy = Some(shard.policy.clone());
}
if shard_ident.is_none() {
shard_ident = Some(shard.shard);
}
if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) {
tracing::info!(
"Tenant shard {} already has shard count {}",
tenant_shard_id,
split_req.new_shard_count
);
continue;
}
let node_id =
shard
.intent
.attached
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"Cannot split a tenant that is not attached"
)))?;
let node = pageservers
.get(&node_id)
.expect("Pageservers may not be deleted while referenced");
// TODO: if any reconciliation is currently in progress for this shard, wait for it.
targets.push(SplitTarget {
parent_id: *tenant_shard_id,
node: node.clone(),
child_ids: tenant_shard_id.split(ShardCount(split_req.new_shard_count)),
});
}
if targets.is_empty() {
if children_found.len() == split_req.new_shard_count as usize {
return Ok(TenantShardSplitResponse {
new_shards: children_found,
});
} else {
// No shards found to split, and no existing children found: the
// tenant doesn't exist at all.
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
));
}
}
(old_shard_count, targets, locked.compute_hook.clone())
};
// unwrap safety: we would have returned above if we didn't find at least one shard to split
let old_shard_count = old_shard_count.unwrap();
let shard_ident = shard_ident.unwrap();
let policy = policy.unwrap();
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
// request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
// parent shards exist as expected, but it would be neater to do the above pre-checks within the
// same database transaction rather than pre-check in-memory and then maybe-fail the database write.
// (https://github.com/neondatabase/neon/issues/6676)
// Before creating any new child shards in memory or on the pageservers, persist them: this
// enables us to ensure that we will always be able to clean up if something goes wrong. This also
// acts as the protection against two concurrent attempts to split: one of them will get a database
// error trying to insert the child shards.
let mut child_tsps = Vec::new();
for target in &targets {
let mut this_child_tsps = Vec::new();
for child in &target.child_ids {
let mut child_shard = shard_ident;
child_shard.number = child.shard_number;
child_shard.count = child.shard_count;
this_child_tsps.push(TenantShardPersistence {
tenant_id: child.tenant_id.to_string(),
shard_number: child.shard_number.0 as i32,
shard_count: child.shard_count.0 as i32,
shard_stripe_size: shard_ident.stripe_size.0 as i32,
// Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
// populate the correct generation as part of its transaction, to protect us
// against racing with changes in the state of the parent.
generation: 0,
generation_pageserver: target.node.id.0 as i64,
placement_policy: serde_json::to_string(&policy).unwrap(),
// TODO: get the config out of the map
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
splitting: SplitState::Splitting,
});
}
child_tsps.push((target.parent_id, this_child_tsps));
}
if let Err(e) = self
.persistence
.begin_shard_split(old_shard_count, tenant_id, child_tsps)
.await
{
match e {
DatabaseError::Query(diesel::result::Error::DatabaseError(
DatabaseErrorKind::UniqueViolation,
_,
)) => {
// Inserting a child shard violated a unique constraint: we raced with another call to
// this function
tracing::warn!("Conflicting attempt to split {tenant_id}: {e}");
return Err(ApiError::Conflict("Tenant is already splitting".into()));
}
_ => return Err(ApiError::InternalServerError(e.into())),
}
}
// FIXME: we have now committed the shard split state to the database, so any subsequent
// failure needs to roll it back. We will later wrap this function in logic to roll back
// the split if it fails.
// (https://github.com/neondatabase/neon/issues/6676)
// TODO: issue split calls concurrently (this only matters once we're splitting
// N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
for target in &targets {
let SplitTarget {
parent_id,
node,
child_ids,
} = target;
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
let response = client
.tenant_shard_split(
*parent_id,
TenantShardSplitRequest {
new_shard_count: split_req.new_shard_count,
},
)
.await
.map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
tracing::info!(
"Split {} into {}",
parent_id,
response
.new_shards
.iter()
.map(|s| format!("{:?}", s))
.collect::<Vec<_>>()
.join(",")
);
if &response.new_shards != child_ids {
// This should never happen: the pageserver should agree with us on how shard splits work.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})",
parent_id,
response.new_shards,
child_ids
)));
}
}
// TODO: if the pageserver restarted concurrently with our split API call,
// the actual generation of the child shard might differ from the generation
// we expect it to have. In order for our in-database generation to end up
// correct, we should carry the child generation back in the response and apply it here
// in complete_shard_split (and apply the correct generation in memory)
// (or, we can carry generation in the request and reject the request if
// it doesn't match, but that requires more retry logic on this side)
self.persistence
.complete_shard_split(tenant_id, old_shard_count)
.await?;
// Replace all the shards we just split with their children
let mut response = TenantShardSplitResponse {
new_shards: Vec::new(),
};
let mut child_locations = Vec::new();
{
let mut locked = self.inner.write().unwrap();
for target in targets {
let SplitTarget {
parent_id,
node: _node,
child_ids,
} = target;
let (pageserver, generation, config) = {
let old_state = locked
.tenants
.remove(&parent_id)
.expect("It was present, we just split it");
(
old_state.intent.attached.unwrap(),
old_state.generation,
old_state.config.clone(),
)
};
locked.tenants.remove(&parent_id);
for child in child_ids {
let mut child_shard = shard_ident;
child_shard.number = child.shard_number;
child_shard.count = child.shard_count;
let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
child_observed.insert(
pageserver,
ObservedStateLocation {
conf: Some(attached_location_conf(generation, &child_shard, &config)),
},
);
let mut child_state = TenantState::new(child, child_shard, policy.clone());
child_state.intent = IntentState::single(Some(pageserver));
child_state.observed = ObservedState {
locations: child_observed,
};
child_state.generation = generation;
child_state.config = config.clone();
child_locations.push((child, pageserver));
locked.tenants.insert(child, child_state);
response.new_shards.push(child);
}
}
}
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps) in child_locations {
if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await {
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
child_id, child_ps);
failed_notifications.push(child_id);
}
}
// If we failed any compute notifications, make a note to retry later.
if !failed_notifications.is_empty() {
let mut locked = self.inner.write().unwrap();
for failed in failed_notifications {
if let Some(shard) = locked.tenants.get_mut(&failed) {
shard.pending_compute_notification = true;
}
}
}
Ok(response)
}
pub(crate) async fn tenant_shard_migrate(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -193,6 +193,13 @@ impl IntentState {
result
}
pub(crate) fn single(node_id: Option<NodeId>) -> Self {
Self {
attached: node_id,
secondary: vec![],
}
}
/// When a node goes offline, we update intents to avoid using it
/// as their attached pageserver.
///
@@ -286,6 +293,9 @@ impl TenantState {
// self.intent refers to pageservers that are offline, and pick other
// pageservers if so.
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
// change their attach location.
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut used_pageservers = self.intent.all_pageservers();

View File

@@ -8,7 +8,10 @@ use diesel::{
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
use hyper::Method;
use pageserver_api::{
models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo},
models::{
ShardParameters, TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest, TimelineInfo,
},
shard::TenantShardId,
};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
@@ -648,7 +651,7 @@ impl AttachmentService {
) -> anyhow::Result<TenantShardMigrateResponse> {
self.dispatch(
Method::PUT,
format!("tenant/{tenant_shard_id}/migrate"),
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
Some(TenantShardMigrateRequest {
tenant_shard_id,
node_id,
@@ -657,6 +660,20 @@ impl AttachmentService {
.await
}
#[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
pub async fn tenant_split(
&self,
tenant_id: TenantId,
new_shard_count: u8,
) -> anyhow::Result<TenantShardSplitResponse> {
self.dispatch(
Method::PUT,
format!("control/v1/tenant/{tenant_id}/shard_split"),
Some(TenantShardSplitRequest { new_shard_count }),
)
.await
}
#[instrument(skip_all, fields(node_id=%req.node_id))]
pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))

View File

@@ -72,7 +72,6 @@ where
let log_path = datadir.join(format!("{process_name}.log"));
let process_log_file = fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&log_path)
.with_context(|| {

View File

@@ -575,6 +575,26 @@ async fn handle_tenant(
println!("{tenant_table}");
println!("{shard_table}");
}
Some(("shard-split", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
let attachment_service = AttachmentService::from_env(env);
let result = attachment_service
.tenant_split(tenant_id, shard_count)
.await?;
println!(
"Split tenant {} into shards {}",
tenant_id,
result
.new_shards
.iter()
.map(|s| format!("{:?}", s))
.collect::<Vec<_>>()
.join(",")
);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
None => bail!("no tenant subcommand provided"),
}
@@ -1524,6 +1544,11 @@ fn cli() -> Command {
.subcommand(Command::new("status")
.about("Human readable summary of the tenant's shards and attachment locations")
.arg(tenant_id_arg.clone()))
.subcommand(Command::new("shard-split")
.about("Increase the number of shards in the tenant")
.arg(tenant_id_arg.clone())
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
)
)
.subcommand(
Command::new("pageserver")

View File

@@ -90,11 +90,6 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
// Use latest version of remote extensions
// This is needed to allow us to test new versions of extensions before
// they are merged into the main branch.
RemoteExtensionsUseLatest,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.
@@ -157,12 +152,8 @@ impl RemoteExtSpec {
//
// Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main
//
// if ComputeFeature::RemoteExtensionsUseLatest is enabled
// use "latest" as the build_tag
let archive_path_str =
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
Ok((
real_ext_name.to_string(),
RemotePath::from_string(&archive_path_str)?,

View File

@@ -192,6 +192,16 @@ pub struct TimelineCreateRequest {
pub pg_version: Option<u32>,
}
#[derive(Serialize, Deserialize)]
pub struct TenantShardSplitRequest {
pub new_shard_count: u8,
}
#[derive(Serialize, Deserialize)]
pub struct TenantShardSplitResponse {
pub new_shards: Vec<TenantShardId>,
}
/// Parameters that apply to all shards in a tenant. Used during tenant creation.
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]

View File

@@ -88,12 +88,36 @@ impl TenantShardId {
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
/// is useful when logging from code that is already in a span that includes tenant ID, to
/// keep messages reasonably terse.
pub fn to_index(&self) -> ShardIndex {
ShardIndex {
shard_number: self.shard_number,
shard_count: self.shard_count,
}
}
/// Calculate the children of this TenantShardId when splitting the overall tenant into
/// the given number of shards.
pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
let mut child_shards = Vec::new();
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
// Key mapping is based on a round robin mapping of key hash modulo shard count,
// so our child shards are the ones which the same keys would map to.
if shard_number % effective_old_shard_count == self.shard_number.0 {
child_shards.push(TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(shard_number),
shard_count: new_shard_count,
})
}
}
child_shards
}
}
/// Formatting helper
@@ -793,4 +817,108 @@ mod tests {
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
assert_eq!(shard, ShardNumber(8));
}
#[test]
fn shard_id_split() {
let tenant_id = TenantId::generate();
let parent = TenantShardId::unsharded(tenant_id);
// Unsharded into 2
assert_eq!(
parent.split(ShardCount(2)),
vec![
TenantShardId {
tenant_id,
shard_count: ShardCount(2),
shard_number: ShardNumber(0)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(2),
shard_number: ShardNumber(1)
}
]
);
// Unsharded into 4
assert_eq!(
parent.split(ShardCount(4)),
vec![
TenantShardId {
tenant_id,
shard_count: ShardCount(4),
shard_number: ShardNumber(0)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(4),
shard_number: ShardNumber(1)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(4),
shard_number: ShardNumber(2)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(4),
shard_number: ShardNumber(3)
}
]
);
// count=1 into 2 (check this works the same as unsharded.)
let parent = TenantShardId {
tenant_id,
shard_count: ShardCount(1),
shard_number: ShardNumber(0),
};
assert_eq!(
parent.split(ShardCount(2)),
vec![
TenantShardId {
tenant_id,
shard_count: ShardCount(2),
shard_number: ShardNumber(0)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(2),
shard_number: ShardNumber(1)
}
]
);
// count=2 into count=8
let parent = TenantShardId {
tenant_id,
shard_count: ShardCount(2),
shard_number: ShardNumber(1),
};
assert_eq!(
parent.split(ShardCount(8)),
vec![
TenantShardId {
tenant_id,
shard_count: ShardCount(8),
shard_number: ShardNumber(1)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(8),
shard_number: ShardNumber(3)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(8),
shard_number: ShardNumber(5)
},
TenantShardId {
tenant_id,
shard_count: ShardCount(8),
shard_number: ShardNumber(7)
},
]
);
}
}

View File

@@ -27,6 +27,11 @@ impl Barrier {
b.wait().await
}
}
/// Return true if a call to wait() would complete immediately
pub fn is_ready(&self) -> bool {
futures::future::FutureExt::now_or_never(self.0.wait()).is_some()
}
}
impl PartialEq for Barrier {

View File

@@ -69,37 +69,44 @@ impl<T> OnceCell<T> {
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
loop {
let sem = {
let guard = self.inner.write().await;
if guard.value.is_some() {
return Ok(GuardMut(guard));
}
guard.init_semaphore.clone()
};
{
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire().await
};
let Ok(permit) = permit else {
let guard = self.inner.write().await;
if !Arc::ptr_eq(&sem, &guard.init_semaphore) {
// there was a take_and_deinit in between
continue;
}
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardMut(guard));
};
permit.forget();
}
let permit = InitPermit(sem);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
if guard.value.is_some() {
return Ok(GuardMut(guard));
}
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.write().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardMut(guard));
}
return Ok(Self::set0(value, guard));
}
}
@@ -112,37 +119,44 @@ impl<T> OnceCell<T> {
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
let guard = self.inner.read().await;
if guard.value.is_some() {
return Ok(GuardRef(guard));
}
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
Ok(Self::set0(value, guard).downgrade())
}
Err(_closed) => {
loop {
let sem = {
let guard = self.inner.read().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardRef(guard));
if guard.value.is_some() {
return Ok(GuardRef(guard));
}
guard.init_semaphore.clone()
};
{
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire().await
};
let Ok(permit) = permit else {
let guard = self.inner.read().await;
if !Arc::ptr_eq(&sem, &guard.init_semaphore) {
// there was a take_and_deinit in between
continue;
}
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardRef(guard));
};
permit.forget();
}
let permit = InitPermit(sem);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
return Ok(Self::set0(value, guard).downgrade());
}
}
@@ -250,15 +264,12 @@ impl<'a, T> GuardMut<'a, T> {
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
.clone()
.try_acquire_owned()
.expect("we just created this");
let sem = swapped.init_semaphore.clone();
sem.try_acquire().expect("we just created this").forget();
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(permit)))
.map(|v| (v, InitPermit(sem)))
.expect("guard is not created unless value has been initialized")
}
@@ -282,13 +293,23 @@ impl<T> std::ops::Deref for GuardRef<'_, T> {
}
/// Type held by OnceCell (de)initializing task.
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
pub struct InitPermit(Arc<tokio::sync::Semaphore>);
impl Drop for InitPermit {
fn drop(&mut self) {
debug_assert_eq!(self.0.available_permits(), 0);
self.0.add_permits(1);
}
}
#[cfg(test)]
mod tests {
use futures::Future;
use super::*;
use std::{
convert::Infallible,
pin::{pin, Pin},
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
@@ -455,4 +476,94 @@ mod tests {
.unwrap();
assert_eq!(*g, "now initialized");
}
#[tokio::test(start_paused = true)]
async fn reproduce_init_take_deinit_race() {
init_take_deinit_scenario(|cell, factory| {
Box::pin(async {
cell.get_or_init(factory).await.unwrap();
})
})
.await;
}
#[tokio::test(start_paused = true)]
async fn reproduce_init_take_deinit_race_mut() {
init_take_deinit_scenario(|cell, factory| {
Box::pin(async {
cell.get_mut_or_init(factory).await.unwrap();
})
})
.await;
}
type BoxedInitFuture<T, E> = Pin<Box<dyn Future<Output = Result<(T, InitPermit), E>>>>;
type BoxedInitFunction<T, E> = Box<dyn Fn(InitPermit) -> BoxedInitFuture<T, E>>;
/// Reproduce an assertion failure with both initialization methods.
///
/// This has interesting generics to be generic between `get_or_init` and `get_mut_or_init`.
/// Alternative would be a macro_rules! but that is the last resort.
async fn init_take_deinit_scenario<F>(init_way: F)
where
F: for<'a> Fn(
&'a OnceCell<&'static str>,
BoxedInitFunction<&'static str, Infallible>,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>,
{
let cell = OnceCell::default();
// acquire the init_semaphore only permit to drive initializing tasks in order to waiting
// on the same semaphore.
let permit = cell
.inner
.read()
.await
.init_semaphore
.clone()
.try_acquire_owned()
.unwrap();
let mut t1 = pin!(init_way(
&cell,
Box::new(|permit| Box::pin(async move { Ok(("t1", permit)) })),
));
let mut t2 = pin!(init_way(
&cell,
Box::new(|permit| Box::pin(async move { Ok(("t2", permit)) })),
));
// drive t2 first to the init_semaphore
tokio::select! {
_ = &mut t2 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// followed by t1 in the init_semaphore
tokio::select! {
_ = &mut t1 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// now let t2 proceed and initialize
drop(permit);
t2.await;
let (s, permit) = { cell.get_mut().await.unwrap().take_and_deinit() };
assert_eq!("t2", s);
// now originally t1 would see the semaphore it has as closed. it cannot yet get a permit from
// the new one.
tokio::select! {
_ = &mut t1 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// only now we get to initialize it
drop(permit);
t1.await;
assert_eq!("t1", *cell.get().await.unwrap());
}
}

View File

@@ -56,10 +56,18 @@ pub enum ForceAwaitLogicalSize {
impl Client {
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
}
pub fn from_client(
client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
client: reqwest::Client::new(),
client,
}
}
@@ -310,6 +318,22 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn tenant_shard_split(
&self,
tenant_shard_id: TenantShardId,
req: TenantShardSplitRequest,
) -> Result<TenantShardSplitResponse> {
let uri = format!(
"{}/v1/tenant/{}/shard_split",
self.mgmt_api_endpoint, tenant_shard_id
);
self.request(Method::PUT, &uri, req)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_list(
&self,
tenant_shard_id: &TenantShardId,

View File

@@ -274,6 +274,10 @@ fn start_pageserver(
set_launch_timestamp_metric(launch_ts);
#[cfg(target_os = "linux")]
metrics::register_internal(Box::new(metrics::more_process_metrics::Collector::new())).unwrap();
metrics::register_internal(Box::new(
pageserver::metrics::tokio_epoll_uring::Collector::new(),
))
.unwrap();
pageserver::preinitialize_metrics();
// If any failpoints were set from FAILPOINTS environment variable,

View File

@@ -623,6 +623,7 @@ impl std::fmt::Display for EvictionLayer {
}
}
#[derive(Default)]
pub(crate) struct DiskUsageEvictionInfo {
/// Timeline's largest layer (remote or resident)
pub max_layer_size: Option<u64>,
@@ -854,19 +855,27 @@ async fn collect_eviction_candidates(
let total = tenant_candidates.len();
for (i, mut candidate) in tenant_candidates.into_iter().enumerate() {
// as we iterate this reverse sorted list, the most recently accessed layer will always
// be 1.0; this is for us to evict it last.
candidate.relative_last_activity = eviction_order.relative_last_activity(total, i);
let tenant_candidates =
tenant_candidates
.into_iter()
.enumerate()
.map(|(i, mut candidate)| {
// as we iterate this reverse sorted list, the most recently accessed layer will always
// be 1.0; this is for us to evict it last.
candidate.relative_last_activity =
eviction_order.relative_last_activity(total, i);
let partition = if cumsum > min_resident_size as i128 {
MinResidentSizePartition::Above
} else {
MinResidentSizePartition::Below
};
cumsum += i128::from(candidate.layer.get_file_size());
candidates.push((partition, candidate));
}
let partition = if cumsum > min_resident_size as i128 {
MinResidentSizePartition::Above
} else {
MinResidentSizePartition::Below
};
cumsum += i128::from(candidate.layer.get_file_size());
(partition, candidate)
});
candidates.extend(tenant_candidates);
}
// Note: the same tenant ID might be hit twice, if it transitions from attached to
@@ -882,21 +891,41 @@ async fn collect_eviction_candidates(
);
for secondary_tenant in secondary_tenants {
let mut layer_info = secondary_tenant.get_layers_for_eviction();
// for secondary tenants we use a sum of on_disk layers and already evicted layers. this is
// to prevent repeated disk usage based evictions from completely draining less often
// updating secondaries.
let (mut layer_info, total_layers) = secondary_tenant.get_layers_for_eviction();
debug_assert!(
total_layers >= layer_info.resident_layers.len(),
"total_layers ({total_layers}) must be at least the resident_layers.len() ({})",
layer_info.resident_layers.len()
);
layer_info
.resident_layers
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
candidates.extend(layer_info.resident_layers.into_iter().map(|candidate| {
(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
candidate,
)
}));
let tenant_candidates =
layer_info
.resident_layers
.into_iter()
.enumerate()
.map(|(i, mut candidate)| {
candidate.relative_last_activity =
eviction_order.relative_last_activity(total_layers, i);
(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
candidate,
)
});
candidates.extend(tenant_candidates);
tokio::task::yield_now().await;
}
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,

View File

@@ -19,11 +19,14 @@ use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigResponse;
use pageserver_api::models::TenantShardLocation;
use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use pageserver_api::shard::ShardCount;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeTravelError;
@@ -875,7 +878,7 @@ async fn tenant_reset_handler(
let state = get_state(&request);
state
.tenant_manager
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
@@ -1104,6 +1107,25 @@ async fn tenant_size_handler(
)
}
async fn tenant_shard_split_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let req: TenantShardSplitRequest = json_request(&mut request).await?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let new_shards = state
.tenant_manager
.shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, TenantShardSplitResponse { new_shards })
}
async fn layer_map_info_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -2063,6 +2085,9 @@ pub fn make_router(
.put("/v1/tenant/config", |r| {
api_handler(r, update_tenant_config_handler)
})
.put("/v1/tenant/:tenant_shard_id/shard_split", |r| {
api_handler(r, tenant_shard_split_handler)
})
.get("/v1/tenant/:tenant_shard_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})

View File

@@ -2400,6 +2400,72 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}
}
pub mod tokio_epoll_uring {
use metrics::UIntGauge;
pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
}
const NMETRICS: usize = 2;
impl metrics::core::Collector for Collector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
self.systems_created.set(systems_created);
mfs.extend(self.systems_created.collect());
self.systems_destroyed.set(systems_destroyed);
mfs.extend(self.systems_destroyed.collect());
mfs
}
}
impl Collector {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut descs = Vec::new();
let systems_created = UIntGauge::new(
"pageserver_tokio_epoll_uring_systems_created",
"counter of tokio-epoll-uring systems that were created",
)
.unwrap();
descs.extend(
metrics::core::Collector::desc(&systems_created)
.into_iter()
.cloned(),
);
let systems_destroyed = UIntGauge::new(
"pageserver_tokio_epoll_uring_systems_destroyed",
"counter of tokio-epoll-uring systems that were destroyed",
)
.unwrap();
descs.extend(
metrics::core::Collector::desc(&systems_destroyed)
.into_iter()
.cloned(),
);
Self {
descs,
systems_created,
systems_destroyed,
}
}
}
}
pub fn preinitialize_metrics() {
// Python tests need these and on some we do alerting.
//

View File

@@ -989,6 +989,17 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub async fn create_rel_dir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> {
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
})?;
self.put(
rel_dir_to_key(spcnode, dbnode),
Value::Image(Bytes::from(buf)),
);
Ok(())
}
/// Store a relmapper file (pg_filenode.map) in the repository
pub async fn put_relmap_file(
&mut self,
@@ -1171,9 +1182,6 @@ impl<'a> DatadirModification<'a> {
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
// Update logical database size.
self.pending_nblocks -= old_size as i64 - nblocks as i64;
}

View File

@@ -53,6 +53,7 @@ use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::mgr::TenantsMap;
use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::TimelineUninitMark;
@@ -2397,6 +2398,67 @@ impl Tenant {
pub(crate) fn get_generation(&self) -> Generation {
self.generation
}
/// This function partially shuts down the tenant (it shuts down the Timelines) and is fallible,
/// and can leave the tenant in a bad state if it fails. The caller is responsible for
/// resetting this tenant to a valid state if we fail.
pub(crate) async fn split_prepare(
&self,
child_shards: &Vec<TenantShardId>,
) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
let Some(tl_client) = &timeline.remote_client else {
anyhow::bail!("Remote storage is mandatory");
};
let Some(remote_storage) = &self.remote_storage else {
anyhow::bail!("Remote storage is mandatory");
};
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
// to ensure that they do not start a split if currently in the process of doing these.
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tl_client.schedule_index_upload_for_file_changes()?;
tl_client.wait_completion().await?;
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tl_client.shutdown().await?;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
// we use here really is the remotely persistent one).
let result = tl_client
.download_index_file(self.cancel.clone())
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
.await?;
let index_part = match result {
MaybeDeletedIndexPart::Deleted(_) => {
anyhow::bail!("Timeline deletion happened concurrently with split")
}
MaybeDeletedIndexPart::IndexPart(p) => p,
};
for child_shard in child_shards {
upload_index_part(
remote_storage,
child_shard,
&timeline.timeline_id,
self.generation,
&index_part,
&self.cancel,
)
.await?;
}
}
Ok(())
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -3732,6 +3794,10 @@ impl Tenant {
Ok(())
}
pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
}
}
fn remove_timeline_and_uninit_mark(

View File

@@ -2,6 +2,7 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
@@ -22,7 +23,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use remote_storage::GenericRemoteStorage;
use utils::crashsafe;
use utils::{completion, crashsafe};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
@@ -644,8 +645,6 @@ pub(crate) async fn shutdown_all_tenants() {
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
use utils::completion;
let mut join_set = JoinSet::new();
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
@@ -1200,7 +1199,7 @@ impl TenantManager {
&self,
tenant_shard_id: TenantShardId,
drop_cache: bool,
ctx: RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let Some(old_slot) = slot_guard.get_old_value() else {
@@ -1253,7 +1252,7 @@ impl TenantManager {
None,
self.tenants,
SpawnMode::Normal,
&ctx,
ctx,
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
@@ -1375,6 +1374,164 @@ impl TenantManager {
slot_guard.revert();
result
}
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))]
pub(crate) async fn shard_split(
&self,
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
// Plan: identify what the new child shards will be
let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1);
if new_shard_count <= ShardCount(effective_old_shard_count) {
anyhow::bail!("Requested shard count is not an increase");
}
let expansion_factor = new_shard_count.0 / effective_old_shard_count;
if !expansion_factor.is_power_of_two() {
anyhow::bail!("Requested split is not a power of two");
}
let parent_shard_identity = tenant.shard_identity;
let parent_tenant_conf = tenant.get_tenant_conf();
let parent_generation = tenant.generation;
let child_shards = tenant_shard_id.split(new_shard_count);
tracing::info!(
"Shard {} splits into: {}",
tenant_shard_id.to_index(),
child_shards
.iter()
.map(|id| format!("{}", id.to_index()))
.join(",")
);
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
if let Err(e) = tenant.split_prepare(&child_shards).await {
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
// have been left in a partially-shut-down state.
tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
self.reset_tenant(tenant_shard_id, false, ctx).await?;
return Err(e);
}
self.resources.deletion_queue_client.flush_advisory();
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
drop(tenant);
let mut parent_slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let parent = match parent_slot_guard.get_old_value() {
Some(TenantSlot::Attached(t)) => t,
Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
Some(TenantSlot::InProgress(_)) => {
// tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
// it would return an error.
unreachable!()
}
None => {
// We don't actually need the parent shard to still be attached to do our work, but it's
// a weird enough situation that the caller probably didn't want us to continue working
// if they had detached the tenant they requested the split on.
anyhow::bail!("Detached parent shard in the middle of split!")
}
};
// TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download
// TODO: erase the dentries from the parent
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
// child shards to reach this point.
let mut target_lsns = HashMap::new();
for timeline in parent.timelines.lock().unwrap().clone().values() {
target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
}
// TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
// and could slow down the children trying to catch up.
// Phase 3: Spawn the child shards
for child_shard in &child_shards {
let mut child_shard_identity = parent_shard_identity;
child_shard_identity.count = child_shard.shard_count;
child_shard_identity.number = child_shard.shard_number;
let child_location_conf = LocationConf {
mode: LocationMode::Attached(AttachedLocationConfig {
generation: parent_generation,
attach_mode: AttachmentMode::Single,
}),
shard: child_shard_identity,
tenant_conf: parent_tenant_conf,
};
self.upsert_location(
*child_shard,
child_location_conf,
None,
SpawnMode::Normal,
ctx,
)
.await?;
}
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
for child_shard_id in &child_shards {
let child_shard = {
let locked = TENANTS.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?;
peek_slot.and_then(|s| s.get_attached()).cloned()
};
if let Some(t) = child_shard {
let timelines = t.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
continue;
};
tracing::info!(
"Waiting for child shard {}/{} to reach target lsn {}...",
child_shard_id,
timeline.timeline_id,
target_lsn
);
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
// Failure here might mean shutdown, in any case this part is an optimization
// and we shouldn't hold up the split operation.
tracing::warn!(
"Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
timeline.timeline_id
);
} else {
tracing::info!(
"Child shard {}/{} reached target lsn {}",
child_shard_id,
timeline.timeline_id,
target_lsn
);
}
}
}
}
// Phase 5: Shut down the parent shard.
let (_guard, progress) = completion::channel();
match parent.shutdown(progress, false).await {
Ok(()) => {}
Err(other) => {
other.wait().await;
}
}
parent_slot_guard.drop_old_value()?;
// Phase 6: Release the InProgress on the parent shard
drop(parent_slot_guard);
Ok(child_shards)
}
}
#[derive(Debug, thiserror::Error)]
@@ -2209,8 +2366,6 @@ async fn remove_tenant_from_memory<V, F>(
where
F: std::future::Future<Output = anyhow::Result<V>>,
{
use utils::completion;
let mut slot_guard =
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;

View File

@@ -27,7 +27,7 @@ use super::index::LayerFileMetadata;
use tracing::info;
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<'a>(
pub(crate) async fn upload_index_part<'a>(
storage: &'a GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,

View File

@@ -160,7 +160,7 @@ impl SecondaryTenant {
&self.tenant_shard_id
}
pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> DiskUsageEvictionInfo {
pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
self.detail.lock().unwrap().get_layers_for_eviction(self)
}

View File

@@ -146,14 +146,15 @@ impl SecondaryDetail {
}
}
/// Additionally returns the total number of layers, used for more stable relative access time
/// based eviction.
pub(super) fn get_layers_for_eviction(
&self,
parent: &Arc<SecondaryTenant>,
) -> DiskUsageEvictionInfo {
let mut result = DiskUsageEvictionInfo {
max_layer_size: None,
resident_layers: Vec::new(),
};
) -> (DiskUsageEvictionInfo, usize) {
let mut result = DiskUsageEvictionInfo::default();
let mut total_layers = 0;
for (timeline_id, timeline_detail) in &self.timelines {
result
.resident_layers
@@ -169,6 +170,10 @@ impl SecondaryDetail {
relative_last_activity: finite_f32::FiniteF32::ZERO,
}
}));
// total might be missing currently downloading layers, but as a lower than actual
// value it is good enough approximation.
total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
}
result.max_layer_size = result
.resident_layers
@@ -183,7 +188,7 @@ impl SecondaryDetail {
result.resident_layers.len()
);
result
(result, total_layers)
}
}
@@ -312,9 +317,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
.tenant_manager
.get_secondary_tenant_shard(*tenant_shard_id);
let Some(tenant) = tenant else {
{
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
}
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
};
Ok(PendingDownload {
@@ -389,9 +392,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
}
CompleteDownload {
secondary_state,
completed_at: Instant::now(),
}
secondary_state,
completed_at: Instant::now(),
}
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
}
}

View File

@@ -156,6 +156,10 @@ impl WalIngest {
}
} else if pg_version == 15 {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
let createdb = XlCreateDatabaseFromWal::decode(&mut buf);
modification
.create_rel_dir(createdb.tablespace_id, createdb.db_id)
.await?;
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
@@ -176,6 +180,10 @@ impl WalIngest {
}
} else if pg_version == 16 {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
let createdb = XlCreateDatabaseFromWal::decode(&mut buf);
modification
.create_rel_dir(createdb.tablespace_id, createdb.db_id)
.await?;
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,

View File

@@ -521,6 +521,22 @@ impl XlCreateDatabase {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlCreateDatabaseFromWal {
pub db_id: Oid,
pub tablespace_id: Oid,
}
impl XlCreateDatabaseFromWal {
pub fn decode(buf: &mut Bytes) -> XlCreateDatabaseFromWal {
XlCreateDatabaseFromWal {
db_id: buf.get_u32_le(),
tablespace_id: buf.get_u32_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlDropDatabase {

View File

@@ -314,6 +314,9 @@ lfc_change_limit_hook(int newval, void *extra)
lfc_ctl->used -= 1;
}
lfc_ctl->limit = new_size;
if (new_size == 0) {
lfc_ctl->generation += 1;
}
neon_log(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);

View File

@@ -11,16 +11,23 @@
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "catalog/pg_type.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/wait_event.h"
#include "neon.h"
#include "walproposer.h"
@@ -30,6 +37,130 @@
PG_MODULE_MAGIC;
void _PG_init(void);
static int logical_replication_max_time_lag = 3600;
static void
InitLogicalReplicationMonitor(void)
{
BackgroundWorker bgw;
DefineCustomIntVariable(
"neon.logical_replication_max_time_lag",
"Threshold for dropping unused logical replication slots",
NULL,
&logical_replication_max_time_lag,
3600, 0, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_S,
NULL, NULL, NULL);
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LogicalSlotsMonitorMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Logical replication monitor");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Logical replication monitor");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
typedef struct
{
NameData name;
bool dropped;
XLogRecPtr confirmed_flush_lsn;
TimestampTz last_updated;
} SlotStatus;
/*
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
*/
PGDLLEXPORT void
LogicalSlotsMonitorMain(Datum main_arg)
{
SlotStatus* slots;
TimestampTz now, last_checked;
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
slots = (SlotStatus*)calloc(max_replication_slots, sizeof(SlotStatus));
last_checked = GetCurrentTimestamp();
for (;;)
{
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
logical_replication_max_time_lag*1000/2,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
now = GetCurrentTimestamp();
if (now - last_checked > logical_replication_max_time_lag*USECS_PER_SEC)
{
int n_active_slots = 0;
last_checked = now;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
/* Consider only logical repliction slots */
if (!s->in_use || !SlotIsLogical(s))
continue;
if (s->active_pid != 0)
{
n_active_slots += 1;
continue;
}
/* Check if there was some activity with the slot since last check */
if (s->data.confirmed_flush != slots[i].confirmed_flush_lsn)
{
slots[i].confirmed_flush_lsn = s->data.confirmed_flush;
slots[i].last_updated = now;
}
else if (now - slots[i].last_updated > logical_replication_max_time_lag*USECS_PER_SEC)
{
slots[i].name = s->data.name;
slots[i].dropped = true;
}
}
LWLockRelease(ReplicationSlotControlLock);
/*
* If there are no active subscriptions, then no new snapshots are generated
* and so no need to force slot deletion.
*/
if (n_active_slots != 0)
{
for (int i = 0; i < max_replication_slots; i++)
{
if (slots[i].dropped)
{
elog(LOG, "Drop logical replication slot because it was not update more than %ld seconds",
(now - slots[i].last_updated)/USECS_PER_SEC);
ReplicationSlotDrop(slots[i].name.data, true);
slots[i].dropped = false;
}
}
}
}
}
}
void
_PG_init(void)
{
@@ -44,6 +175,8 @@ _PG_init(void)
pg_init_libpagestore();
pg_init_walproposer();
InitLogicalReplicationMonitor();
InitControlPlaneConnector();
pg_init_extension_server();

View File

@@ -19,6 +19,7 @@ chrono.workspace = true
clap.workspace = true
consumption_metrics.workspace = true
dashmap.workspace = true
env_logger.workspace = true
futures.workspace = true
git-version.workspace = true
hashbrown.workspace = true

View File

@@ -68,6 +68,7 @@ pub trait TestBackend: Send + Sync + 'static {
fn get_allowed_ips_and_secret(
&self,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>;
fn get_role_secret(&self) -> Result<CachedRoleSecret, console::errors::GetAuthInfoError>;
}
impl std::fmt::Display for BackendType<'_, ()> {
@@ -358,6 +359,17 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
}
impl BackendType<'_, ComputeUserInfo> {
pub async fn get_role_secret(
&self,
ctx: &mut RequestMonitoring,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
use BackendType::*;
match self {
Console(api, user_info) => api.get_role_secret(ctx, user_info).await,
Link(_) => Ok(Cached::new_uncached(None)),
}
}
pub async fn get_allowed_ips_and_secret(
&self,
ctx: &mut RequestMonitoring,

View File

@@ -167,7 +167,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
}
}
pub(super) fn validate_password_and_exchange(
pub(crate) fn validate_password_and_exchange(
password: &[u8],
secret: AuthSecret,
) -> super::Result<sasl::Outcome<ComputeCredentialKeys>> {

View File

@@ -88,6 +88,9 @@ struct ProxyCliArgs {
/// path to directory with TLS certificates for client postgres connections
#[clap(long)]
certs_dir: Option<String>,
/// timeout for the TLS handshake
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
handshake_timeout: tokio::time::Duration,
/// http endpoint to receive periodic metric updates
#[clap(long)]
metric_collection_endpoint: Option<String>,
@@ -165,6 +168,10 @@ struct SqlOverHttpArgs {
#[clap(long, default_value_t = 20)]
sql_over_http_pool_max_conns_per_endpoint: usize,
/// How many connections to pool for each endpoint. Excess connections are discarded
#[clap(long, default_value_t = 20000)]
sql_over_http_pool_max_total_conns: usize,
/// How long pooled connections should remain idle for before closing
#[clap(long, default_value = "5m", value_parser = humantime::parse_duration)]
sql_over_http_idle_timeout: tokio::time::Duration,
@@ -387,6 +394,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
pool_shards: args.sql_over_http.sql_over_http_pool_shards,
idle_timeout: args.sql_over_http.sql_over_http_idle_timeout,
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
},
};
let authentication_config = AuthenticationConfig {
@@ -406,6 +414,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
require_client_ip: args.require_client_ip,
disable_ip_check_for_http: args.disable_ip_check_for_http,
endpoint_rps_limit,
handshake_timeout: args.handshake_timeout,
// TODO: add this argument
region: args.region.clone(),
}));

View File

@@ -22,6 +22,7 @@ pub struct ProxyConfig {
pub disable_ip_check_for_http: bool,
pub endpoint_rps_limit: Vec<RateBucketInfo>,
pub region: String,
pub handshake_timeout: Duration,
}
#[derive(Debug)]

View File

@@ -188,6 +188,7 @@ impl super::Api for Api {
ep,
Arc::new(auth_info.allowed_ips),
);
ctx.set_project_id(project_id);
}
// When we just got a secret, we don't need to invalidate it.
Ok(Cached::new_uncached(auth_info.secret))
@@ -221,6 +222,7 @@ impl super::Api for Api {
self.caches
.project_info
.insert_allowed_ips(&project_id, ep, allowed_ips.clone());
ctx.set_project_id(project_id);
}
Ok((
Cached::new_uncached(allowed_ips),

View File

@@ -89,6 +89,10 @@ impl RequestMonitoring {
self.project = Some(x.project_id);
}
pub fn set_project_id(&mut self, project_id: ProjectId) {
self.project = Some(project_id);
}
pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
crate::metrics::CONNECTING_ENDPOINTS
.with_label_values(&[self.protocol])

View File

@@ -1,8 +1,10 @@
use ::metrics::{
exponential_buckets, register_histogram, register_histogram_vec, register_hll_vec,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge_vec, Histogram,
HistogramVec, HyperLogLogVec, IntCounterPairVec, IntCounterVec, IntGaugeVec,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec, Histogram, HistogramVec, HyperLogLogVec, IntCounterPairVec,
IntCounterVec, IntGauge, IntGaugeVec,
};
use metrics::{register_int_counter_pair, IntCounterPair};
use once_cell::sync::Lazy;
use tokio::time;
@@ -112,6 +114,44 @@ pub static ALLOWED_IPS_NUMBER: Lazy<Histogram> = Lazy::new(|| {
.unwrap()
});
pub static HTTP_CONTENT_LENGTH: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"proxy_http_conn_content_length_bytes",
"Time it took for proxy to establish a connection to the compute endpoint",
// largest bucket = 3^16 * 0.05ms = 2.15s
exponential_buckets(8.0, 2.0, 20).unwrap()
)
.unwrap()
});
pub static GC_LATENCY: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"proxy_http_pool_reclaimation_lag_seconds",
"Time it takes to reclaim unused connection pools",
// 1us -> 65ms
exponential_buckets(1e-6, 2.0, 16).unwrap(),
)
.unwrap()
});
pub static ENDPOINT_POOLS: Lazy<IntCounterPair> = Lazy::new(|| {
register_int_counter_pair!(
"proxy_http_pool_endpoints_registered_total",
"Number of endpoints we have registered pools for",
"proxy_http_pool_endpoints_unregistered_total",
"Number of endpoints we have unregistered pools for",
)
.unwrap()
});
pub static NUM_OPEN_CLIENTS_IN_HTTP_POOL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"proxy_http_pool_opened_connections",
"Number of opened connections to a database.",
)
.unwrap()
});
#[derive(Clone)]
pub struct LatencyTimer {
// time since the stopwatch was started

View File

@@ -194,10 +194,11 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let pause = ctx.latency_timer.pause();
let do_handshake = handshake(stream, mode.handshake_tls(tls), &cancel_map);
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
drop(pause);
let hostname = mode.hostname(stream.get_ref());

View File

@@ -34,21 +34,6 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
node_info.invalidate().config
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
async fn connect_to_compute_once(
ctx: &mut RequestMonitoring,
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, compute::ConnectionError> {
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(ctx, allow_self_signed_compute, timeout)
.await
}
#[async_trait]
pub trait ConnectMechanism {
type Connection;
@@ -75,13 +60,18 @@ impl ConnectMechanism for TcpMechanism<'_> {
type ConnectError = compute::ConnectionError;
type Error = compute::ConnectionError;
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
connect_to_compute_once(ctx, node_info, timeout).await
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(ctx, allow_self_signed_compute, timeout)
.await
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {

View File

@@ -478,6 +478,9 @@ impl TestBackend for TestConnectMechanism {
{
unimplemented!("not used in tests")
}
fn get_role_secret(&self) -> Result<CachedRoleSecret, console::errors::GetAuthInfoError> {
unimplemented!("not used in tests")
}
}
fn helper_create_cached_node_info() -> CachedNodeInfo {

View File

@@ -2,6 +2,7 @@
//!
//! Handles both SQL over HTTP and SQL over Websockets.
mod backend;
mod conn_pool;
mod json;
mod sql_over_http;
@@ -18,11 +19,11 @@ pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio_util::task::TaskTracker;
use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::{cancellation::CancelMap, config::ProxyConfig};
use futures::StreamExt;
use hyper::{
@@ -54,12 +55,13 @@ pub async fn task_main(
info!("websocket server has shut down");
}
let conn_pool = conn_pool::GlobalConnPool::new(config);
let conn_pool2 = Arc::clone(&conn_pool);
tokio::spawn(async move {
conn_pool2.gc_worker(StdRng::from_entropy()).await;
});
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
{
let conn_pool = Arc::clone(&conn_pool);
tokio::spawn(async move {
conn_pool.gc_worker(StdRng::from_entropy()).await;
});
}
// shutdown the connection pool
tokio::spawn({
@@ -73,6 +75,11 @@ pub async fn task_main(
}
});
let backend = Arc::new(PoolingBackend {
pool: Arc::clone(&conn_pool),
config,
});
let tls_config = match config.tls_config.as_ref() {
Some(config) => config,
None => {
@@ -106,7 +113,7 @@ pub async fn task_main(
let client_addr = io.client_addr();
let remote_addr = io.inner.remote_addr();
let sni_name = tls.server_name().map(|s| s.to_string());
let conn_pool = conn_pool.clone();
let backend = backend.clone();
let ws_connections = ws_connections.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
@@ -119,7 +126,7 @@ pub async fn task_main(
Ok(MetricService::new(hyper::service::service_fn(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
let backend = backend.clone();
let ws_connections = ws_connections.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
@@ -130,8 +137,7 @@ pub async fn task_main(
request_handler(
req,
config,
tls_config,
conn_pool,
backend,
ws_connections,
cancel_map,
session_id,
@@ -200,8 +206,7 @@ where
async fn request_handler(
mut request: Request<Body>,
config: &'static ProxyConfig,
tls: &'static TlsConfig,
conn_pool: Arc<conn_pool::GlobalConnPool>,
backend: Arc<PoolingBackend>,
ws_connections: TaskTracker,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
@@ -248,15 +253,7 @@ async fn request_handler(
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let mut ctx = RequestMonitoring::new(session_id, peer_addr, "http", &config.region);
sql_over_http::handle(
tls,
&config.http_config,
&mut ctx,
request,
sni_hostname,
conn_pool,
)
.await
sql_over_http::handle(config, &mut ctx, request, sni_hostname, backend).await
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")

View File

@@ -0,0 +1,157 @@
use std::{sync::Arc, time::Duration};
use anyhow::Context;
use async_trait::async_trait;
use tracing::info;
use crate::{
auth::{backend::ComputeCredentialKeys, check_peer_addr_is_in_list, AuthError},
compute,
config::ProxyConfig,
console::CachedNodeInfo,
context::RequestMonitoring,
proxy::connect_compute::ConnectMechanism,
};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool, APP_NAME};
pub struct PoolingBackend {
pub pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pub config: &'static ProxyConfig,
}
impl PoolingBackend {
pub async fn authenticate(
&self,
ctx: &mut RequestMonitoring,
conn_info: &ConnInfo,
) -> Result<ComputeCredentialKeys, AuthError> {
let user_info = conn_info.user_info.clone();
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(AuthError::ip_address_not_allowed());
}
let cached_secret = match maybe_secret {
Some(secret) => secret,
None => backend.get_role_secret(ctx).await?,
};
let secret = match cached_secret.value.clone() {
Some(secret) => secret,
None => {
// If we don't have an authentication secret, for the http flow we can just return an error.
info!("authentication info not found");
return Err(AuthError::auth_failed(&*user_info.user));
}
};
let auth_outcome =
crate::auth::validate_password_and_exchange(conn_info.password.as_bytes(), secret)?;
match auth_outcome {
crate::sasl::Outcome::Success(key) => Ok(key),
crate::sasl::Outcome::Failure(reason) => {
info!("auth backend failed with an error: {reason}");
Err(AuthError::auth_failed(&*conn_info.user_info.user))
}
}
}
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
pub async fn connect_to_compute(
&self,
ctx: &mut RequestMonitoring,
conn_info: ConnInfo,
keys: ComputeCredentialKeys,
force_new: bool,
) -> anyhow::Result<Client<tokio_postgres::Client>> {
let maybe_client = if !force_new {
info!("pool: looking for an existing connection");
self.pool.get(ctx, &conn_info).await?
} else {
info!("pool: pool is disabled");
None
};
if let Some(client) = maybe_client {
return Ok(client);
}
let conn_id = uuid::Uuid::new_v4();
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
ctx.set_application(Some(APP_NAME));
let backend = self
.config
.auth_backend
.as_ref()
.map(|_| conn_info.user_info.clone());
let mut node_info = backend
.wake_compute(ctx)
.await?
.context("missing cache entry from wake_compute")?;
match keys {
#[cfg(any(test, feature = "testing"))]
ComputeCredentialKeys::Password(password) => node_info.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => node_info.config.auth_keys(auth_keys),
};
ctx.set_project(node_info.aux.clone());
crate::proxy::connect_compute::connect_to_compute(
ctx,
&TokioMechanism {
conn_id,
conn_info,
pool: self.pool.clone(),
},
node_info,
&backend,
)
.await
}
}
struct TokioMechanism {
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
}
#[async_trait]
impl ConnectMechanism for TokioMechanism {
type Connection = Client<tokio_postgres::Client>;
type ConnectError = tokio_postgres::Error;
type Error = anyhow::Error;
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
node_info: &CachedNodeInfo,
timeout: Duration,
) -> Result<Self::Connection, Self::ConnectError> {
let mut config = (*node_info.config).clone();
let config = config
.user(&self.conn_info.user_info.user)
.password(&*self.conn_info.password)
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
Ok(poll_client(
self.pool.clone(),
ctx,
self.conn_info.clone(),
client,
connection,
self.conn_id,
node_info.aux.clone(),
))
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
}

File diff suppressed because it is too large Load Diff

View File

@@ -9,23 +9,23 @@ use tokio_postgres::Row;
// as parameters.
//
pub fn json_to_pg_text(json: Vec<Value>) -> Vec<Option<String>> {
json.iter()
.map(|value| {
match value {
// special care for nulls
Value::Null => None,
json.iter().map(json_value_to_pg_text).collect()
}
// convert to text with escaping
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
fn json_value_to_pg_text(value: &Value) -> Option<String> {
match value {
// special care for nulls
Value::Null => None,
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Some(s.to_string()),
// convert to text with escaping
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
}
})
.collect()
// avoid escaping here, as we pass this as a parameter
Value::String(s) => Some(s.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
}
}
//

View File

@@ -13,6 +13,7 @@ use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde_json::json;
use serde_json::Value;
use tokio::join;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
use tokio_postgres::GenericClient;
@@ -20,6 +21,7 @@ use tokio_postgres::IsolationLevel;
use tokio_postgres::ReadyForQueryStatus;
use tokio_postgres::Transaction;
use tracing::error;
use tracing::info;
use tracing::instrument;
use url::Url;
use utils::http::error::ApiError;
@@ -27,22 +29,25 @@ use utils::http::json::json_response;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::endpoint_sni;
use crate::config::HttpConfig;
use crate::config::ProxyConfig;
use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
use crate::metrics::HTTP_CONTENT_LENGTH;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::RoleName;
use super::backend::PoolingBackend;
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
use super::json::{json_to_pg_text, pg_text_row_to_json};
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
use super::SERVERLESS_DRIVER_SNI;
#[derive(serde::Deserialize)]
struct QueryData {
query: String,
params: Vec<serde_json::Value>,
#[serde(deserialize_with = "bytes_to_pg_text")]
params: Vec<Option<String>>,
}
#[derive(serde::Deserialize)]
@@ -69,6 +74,15 @@ static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrab
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
fn bytes_to_pg_text<'de, D>(deserializer: D) -> Result<Vec<Option<String>>, D::Error>
where
D: serde::de::Deserializer<'de>,
{
// TODO: consider avoiding the allocation here.
let json: Vec<Value> = serde::de::Deserialize::deserialize(deserializer)?;
Ok(json_to_pg_text(json))
}
fn get_conn_info(
ctx: &mut RequestMonitoring,
headers: &HeaderMap,
@@ -171,16 +185,15 @@ fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Err
// TODO: return different http error codes
pub async fn handle(
tls: &'static TlsConfig,
config: &'static HttpConfig,
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
backend: Arc<PoolingBackend>,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.request_timeout,
handle_inner(tls, config, ctx, request, sni_hostname, conn_pool),
config.http_config.request_timeout,
handle_inner(config, ctx, request, sni_hostname, backend),
)
.await;
let mut response = match result {
@@ -265,7 +278,7 @@ pub async fn handle(
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
config.request_timeout.as_secs()
config.http_config.request_timeout.as_secs()
);
error!(message);
json_response(
@@ -283,22 +296,36 @@ pub async fn handle(
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
async fn handle_inner(
tls: &'static TlsConfig,
config: &'static HttpConfig,
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
backend: Arc<PoolingBackend>,
) -> anyhow::Result<Response<Body>> {
let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
.with_label_values(&["http"])
.with_label_values(&[ctx.protocol])
.guard();
info!(
protocol = ctx.protocol,
"handling interactive connection from client"
);
//
// Determine the destination and connection params
//
let headers = request.headers();
let conn_info = get_conn_info(ctx, headers, sni_hostname, tls)?;
// TLS config should be there.
let conn_info = get_conn_info(
ctx,
headers,
sni_hostname,
config.tls_config.as_ref().unwrap(),
)?;
info!(
user = conn_info.user_info.user.as_str(),
project = conn_info.user_info.endpoint.as_str(),
"credentials"
);
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
@@ -307,8 +334,8 @@ async fn handle_inner(
// Allow connection pooling only if explicitly requested
// or if we have decided that http pool is no longer opt-in
let allow_pool =
!config.pool_options.opt_in || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
let allow_pool = !config.http_config.pool_options.opt_in
|| headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
// isolation level, read only and deferrable
@@ -333,6 +360,8 @@ async fn handle_inner(
None => MAX_REQUEST_SIZE + 1,
};
drop(paused);
info!(request_content_length, "request size in bytes");
HTTP_CONTENT_LENGTH.observe(request_content_length as f64);
// we don't have a streaming request support yet so this is to prevent OOM
// from a malicious user sending an extremely large request body
@@ -342,13 +371,28 @@ async fn handle_inner(
));
}
//
// Read the query and query params from the request body
//
let body = hyper::body::to_bytes(request.into_body()).await?;
let payload: Payload = serde_json::from_slice(&body)?;
let fetch_and_process_request = async {
let body = hyper::body::to_bytes(request.into_body())
.await
.map_err(anyhow::Error::from)?;
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
};
let mut client = conn_pool.get(ctx, conn_info, !allow_pool).await?;
let authenticate_and_connect = async {
let keys = backend.authenticate(ctx, &conn_info).await?;
backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await
};
// Run both operations in parallel
let (payload_result, auth_and_connect_result) =
join!(fetch_and_process_request, authenticate_and_connect,);
// Handle the results
let payload = payload_result?; // Handle errors appropriately
let mut client = auth_and_connect_result?; // Handle errors appropriately
let mut response = Response::builder()
.status(StatusCode::OK)
@@ -482,7 +526,7 @@ async fn query_to_json<T: GenericClient>(
raw_output: bool,
array_mode: bool,
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
let query_params = json_to_pg_text(data.params);
let query_params = data.params;
let row_stream = client.query_raw_txt(&data.query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.75.0"
channel = "1.76.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -1949,6 +1949,15 @@ class NeonAttachmentService:
return headers
def ready(self) -> bool:
resp = self.request("GET", f"{self.env.attachment_service_api}/ready")
if resp.status_code == 503:
return False
elif resp.status_code == 200:
return True
else:
raise RuntimeError(f"Unexpected status {resp.status_code} from readiness endpoint")
def attach_hook_issue(
self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int
) -> int:
@@ -4054,7 +4063,7 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -
def tenant_get_shards(
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int]
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None
) -> list[tuple[TenantShardId, NeonPageserver]]:
"""
Helper for when you want to talk to one or more pageservers, and the

View File

@@ -393,11 +393,11 @@ def test_sql_over_http_batch(static_proxy: NeonProxy):
def test_sql_over_http_pool(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
def get_pid(status: int, pw: str) -> Any:
def get_pid(status: int, pw: str, user="http_auth") -> Any:
return static_proxy.http_query(
GET_CONNECTION_PID_QUERY,
[],
user="http_auth",
user=user,
password=pw,
expected_code=status,
)
@@ -418,20 +418,14 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
static_proxy.safe_psql("alter user http_auth with password 'http2'")
# after password change, should open a new connection to verify it
pid2 = get_pid(200, "http2")["rows"][0]["pid"]
assert pid1 != pid2
# after password change, shouldn't open a new connection because it checks password in proxy.
rows = get_pid(200, "http2")["rows"]
assert rows == [{"pid": pid1}]
time.sleep(0.02)
# query should be on an existing connection
pid = get_pid(200, "http2")["rows"][0]["pid"]
assert pid in [pid1, pid2]
time.sleep(0.02)
# old password should not work
res = get_pid(400, "http")
# incorrect user shouldn't reveal that the user doesn't exists
res = get_pid(400, "http", user="http_auth2")
assert "password authentication failed for user" in res["message"]

View File

@@ -1,6 +1,7 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
tenant_get_shards,
)
from fixtures.remote_storage import s3_storage
from fixtures.types import TimelineId
@@ -82,4 +83,130 @@ def test_sharding_smoke(
)
assert timelines == {env.initial_timeline, timeline_b}
# TODO: test timeline deletion and tenant deletion (depends on change in attachment_service)
def test_sharding_split_smoke(
neon_env_builder: NeonEnvBuilder,
):
"""
Test the basics of shard splitting:
- The API results in more shards than we started with
- The tenant's data remains readable
"""
# We will start with 4 shards and split into 8, then migrate all those
# 8 shards onto separate pageservers
shard_count = 4
split_shard_count = 8
neon_env_builder.num_pageservers = split_shard_count
# 1MiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given
# in number of 8KiB pages.
stripe_size = 128
# Use S3-compatible remote storage so that we can scrub: this test validates
# that the scrubber doesn't barf when it sees a sharded tenant.
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
workload.init()
# Initial data
workload.write_rows(256)
# Note which pageservers initially hold a shard after tenant creation
pre_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
# For pageservers holding a shard, validate their ingest statistics
# reflect a proper splitting of the WAL.
for pageserver in env.pageservers:
if pageserver.id not in pre_split_pageserver_ids:
continue
metrics = pageserver.http_client().get_metrics_values(
[
"pageserver_wal_ingest_records_received_total",
"pageserver_wal_ingest_records_committed_total",
"pageserver_wal_ingest_records_filtered_total",
]
)
log.info(f"Pageserver {pageserver.id} metrics: {metrics}")
# Not everything received was committed
assert (
metrics["pageserver_wal_ingest_records_received_total"]
> metrics["pageserver_wal_ingest_records_committed_total"]
)
# Something was committed
assert metrics["pageserver_wal_ingest_records_committed_total"] > 0
# Counts are self consistent
assert (
metrics["pageserver_wal_ingest_records_received_total"]
== metrics["pageserver_wal_ingest_records_committed_total"]
+ metrics["pageserver_wal_ingest_records_filtered_total"]
)
# TODO: validate that shards have different sizes
workload.validate()
assert len(pre_split_pageserver_ids) == 4
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
# We should have split into 8 shards, on the same 4 pageservers we started on.
assert len(post_split_pageserver_ids) == split_shard_count
assert len(set(post_split_pageserver_ids)) == shard_count
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
workload.validate()
workload.churn_rows(256)
workload.validate()
# Run GC on all new shards, to check they don't barf or delete anything that breaks reads
# (compaction was already run as part of churn_rows)
all_shards = tenant_get_shards(env, tenant_id)
for tenant_shard_id, pageserver in all_shards:
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
# Restart all nodes, to check that the newly created shards are durable
for ps in env.pageservers:
ps.restart()
workload.validate()
migrate_to_pageserver_ids = list(
set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
)
assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
# Migrate shards away from the node where the split happened
for ps_id in pre_split_pageserver_ids:
shards_here = [
tenant_shard_id
for (tenant_shard_id, pageserver) in all_shards
if pageserver.id == ps_id
]
assert len(shards_here) == 2
migrate_shard = shards_here[0]
destination = migrate_to_pageserver_ids.pop()
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10)
workload.validate()

View File

@@ -128,6 +128,38 @@ def test_sharding_service_smoke(
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
def test_node_status_after_restart(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
# Initially we have two online pageservers
nodes = env.attachment_service.node_list()
assert len(nodes) == 2
env.pageservers[1].stop()
env.attachment_service.stop()
env.attachment_service.start()
# Initially readiness check should fail because we're trying to connect to the offline node
assert env.attachment_service.ready() is False
def is_ready():
assert env.attachment_service.ready() is True
wait_until(30, 1, is_ready)
# We loaded nodes from database on restart
nodes = env.attachment_service.node_list()
assert len(nodes) == 2
# We should still be able to create a tenant, because the pageserver which is still online
# should have had its availabilty state set to Active.
env.attachment_service.tenant_create(TenantId.generate())
def test_sharding_service_passthrough(
neon_env_builder: NeonEnvBuilder,
):