Compare commits

...

11 Commits

Author SHA1 Message Date
Conrad Ludgate
e108ba8904 fix feature flags 2024-01-16 16:01:02 +00:00
Conrad Ludgate
a1cc1a6373 add tokio console subscriber 2024-01-16 13:51:34 +00:00
Arpad Müller
4b0204ede5 Add copy operation tests and implement them for azure blobs (#6362)
This implements the `copy` operation for azure blobs, added to S3 by
#6091, and adds tests both to s3 and azure ensuring that the copy
operation works.
2024-01-16 12:07:20 +00:00
John Spray
bf4e708646 pageserver: eviction for secondary mode tenants (#6225)
Follows #6123 

Closes: https://github.com/neondatabase/neon/issues/5342

The approach here is to avoid using `Layer` from secondary tenants, and
instead make the eviction types (e.g. `EvictionCandidate`) have a
variant that carries a Layer for attached tenants, and a different
variant for secondary tenants.

Other changes:
- EvictionCandidate no longer carries a `Timeline`: this was only used
for providing a witness reference to remote timeline client.
- The types for returning eviction candidates are all in
disk_usage_eviction_task.rs now, whereas some of them were in
timeline.rs before.
- The EvictionCandidate type replaces LocalLayerInfoForDiskUsageEviction
type, which was basically the same thing.
2024-01-16 10:29:26 +00:00
John Spray
887e94d7da page_service: more efficient page_service -> shard lookup (#6037)
## Problem

In #5980 the page service connection handler gets a simple piece of
logic for finding the right Timeline: at connection time, it picks an
arbitrary Timeline, and then when handling individual page requests it
checks if the original timeline is the correct shard, and if not looks
one up.

This is pretty slow in the case where we have to go look up the other
timeline, because we take the big tenants manager lock.

## Summary of changes

- Add a `shard_timelines` map of ShardIndex to Timeline on the page
service connection handler
- When looking up a Timeline for a particular ShardIndex, consult
`shard_timelines` to avoid hitting the TenantsManager unless we really
need to.
- Re-work the CancellationToken handling, because the handler now holds
gateguards on multiple timelines, and so must respect cancellation of
_any_ timeline it has in its cache, not just the timeline related to the
request it is currently servicing.

---------

Co-authored-by: Vlad Lazar <vlad@neon.tech>
2024-01-16 09:39:19 +00:00
John Spray
df9e9de541 pageserver: API updates for sharding (#6330)
The theme of the changes in this PR is that they're enablers for #6251
which are superficial struct/api changes.

This is a spinoff from #6251:
- Various APIs + clients thereof take TenantShardId rather than TenantId
- The creation API gets a ShardParameters member, which may be used to
configure shard count and stripe size. This enables the attachment
service to present a "virtual pageserver" creation endpoint that creates
multiple shards.
- The attachment service will use tenant size information to drive shard
splitting. Make a version of `TenantHistorySize` that is usable for
decoding these API responses.
- ComputeSpec includes a shard stripe size.
2024-01-16 09:21:00 +00:00
Anna Khanova
3f2187eb92 Proxy relax sni check (#6323)
## Problem

Using the same domain name () for serverless driver can help with
connection caching.
https://github.com/neondatabase/neon/issues/6290

## Summary of changes

Relax SNI check.
2024-01-16 08:42:13 +00:00
John Khvatov
2a3cfc9665 Remove PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME histogram. (#6356)
Fixes #6343.

## Problem

PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME is used on hot path and it adds
noticeable latency to GetPage@LSN.

## Refs

https://discordapp.com/channels/1176467419317940276/1195022264115151001/1196370689268125716
2024-01-15 17:19:19 +01:00
Cihan Demirci
d34adf46b4 do not provide disclaimer input for the deploy-prod workflow (#6360)
We've removed this input from the deploy-prod workflow.
2024-01-15 16:15:34 +00:00
Conrad Ludgate
0bac8ddd76 proxy: fix serverless error message info (#6279)
## Problem


https://github.com/neondatabase/serverless/issues/51#issuecomment-1878677318

## Summary of changes

1. When we have a db_error, use db_error.message() as the message.
2. include error position.
3. line should be a string (weird?)
4. `datatype` -> `dataType`
2024-01-15 16:43:19 +01:00
Christian Schwarz
0e1ef3713e fix(pagebench): #6325 broke running without --runtime (#6351)
After PR #6325, when running without --runtime, we wouldn't wait for
start_work_barrier, causing the benchmark to not start at all.
2024-01-15 08:54:19 +00:00
37 changed files with 1235 additions and 364 deletions

View File

@@ -1131,7 +1131,7 @@ jobs:
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
exit 1

120
Cargo.lock generated
View File

@@ -1226,6 +1226,43 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console-api"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787"
dependencies = [
"futures-core",
"prost 0.12.3",
"prost-types 0.12.3",
"tonic 0.10.2",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-task",
"hdrhistogram",
"humantime",
"prost-types 0.12.3",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic 0.10.2",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.5"
@@ -3094,7 +3131,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto",
"prost",
"prost 0.11.9",
"reqwest",
"thiserror",
]
@@ -3108,7 +3145,7 @@ dependencies = [
"futures",
"futures-util",
"opentelemetry",
"prost",
"prost 0.11.9",
"tonic 0.8.3",
]
@@ -3823,7 +3860,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.11.9",
]
[[package]]
name = "prost"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
dependencies = [
"bytes",
"prost-derive 0.12.3",
]
[[package]]
@@ -3840,8 +3887,8 @@ dependencies = [
"multimap",
"petgraph",
"prettyplease 0.1.25",
"prost",
"prost-types",
"prost 0.11.9",
"prost-types 0.11.9",
"regex",
"syn 1.0.109",
"tempfile",
@@ -3861,13 +3908,35 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "prost-derive"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.32",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost",
"prost 0.11.9",
]
[[package]]
name = "prost-types"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e"
dependencies = [
"prost 0.12.3",
]
[[package]]
@@ -3883,6 +3952,7 @@ dependencies = [
"camino-tempfile",
"chrono",
"clap",
"console-subscriber",
"consumption_metrics",
"dashmap",
"futures",
@@ -5148,7 +5218,7 @@ dependencies = [
"metrics",
"once_cell",
"parking_lot 0.12.1",
"prost",
"prost 0.11.9",
"tokio",
"tokio-stream",
"tonic 0.9.2",
@@ -5467,6 +5537,7 @@ dependencies = [
"signal-hook-registry",
"socket2 0.5.5",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
@@ -5657,8 +5728,8 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"prost 0.11.9",
"prost-derive 0.11.9",
"tokio",
"tokio-stream",
"tokio-util",
@@ -5689,7 +5760,7 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost 0.11.9",
"rustls-native-certs",
"rustls-pemfile",
"tokio",
@@ -5701,6 +5772,33 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.21.1",
"bytes",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.12.3",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.9.2"
@@ -6606,7 +6704,7 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"prost",
"prost 0.11.9",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",

View File

@@ -64,6 +64,7 @@ cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive"] }
close_fds = "0.3.2"
console-subscriber = "0.2.0"
comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"

View File

@@ -35,6 +35,7 @@ ARG BUILD_TAG
ARG RUSTC_WRAPPER=cachepot
ENV AWS_REGION=eu-central-1
ENV CACHEPOT_S3_KEY_PREFIX=cachepot
ENV RUSTFLAGS="--cfg tokio_unstable"
ARG CACHEPOT_BUCKET=neon-github-dev
#ARG AWS_ACCESS_KEY_ID
#ARG AWS_SECRET_ACCESS_KEY

View File

@@ -288,6 +288,21 @@ async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiErr
)
}
async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let inspect_req = json_request::<InspectRequest>(&mut req).await?;
let state = get_state(&req).inner.clone();
let locked = state.write().await;
let tenant_state = locked.tenants.get(&inspect_req.tenant_id);
json_response(
StatusCode::OK,
InspectResponse {
attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
},
)
}
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router()
.data(Arc::new(State::new(persistent_state)))
@@ -295,6 +310,9 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body,
.post("/validate", |r| request_span(r, handle_validate))
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
.post("/inspect", |r| request_span(r, handle_inspect))
.post("/tenant/:tenant_id", |r| {
request_span(r, handle_tenant_create)
})
}
#[tokio::main]

View File

@@ -16,6 +16,7 @@ use control_plane::safekeeper::SafekeeperNode;
use control_plane::tenant_migration::migrate_tenant;
use control_plane::{broker, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::shard::TenantShardId;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -279,7 +280,7 @@ async fn get_timeline_infos(
tenant_id: &TenantId,
) -> Result<HashMap<TimelineId, TimelineInfo>> {
Ok(get_default_pageserver(env)
.timeline_list(tenant_id)
.timeline_list(&TenantShardId::unsharded(*tenant_id))
.await?
.into_iter()
.map(|timeline_info| (timeline_info.timeline_id, timeline_info))
@@ -490,7 +491,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
match timeline_match.subcommand() {
Some(("list", list_match)) => {
let tenant_id = get_tenant_id(list_match, env)?;
let timelines = pageserver.timeline_list(&tenant_id).await?;
let timelines = pageserver
.timeline_list(&TenantShardId::unsharded(tenant_id))
.await?;
print_timelines_tree(timelines, env.timeline_name_mappings())?;
}
Some(("create", create_match)) => {

View File

@@ -48,6 +48,7 @@ use anyhow::{anyhow, bail, Context, Result};
use compute_api::spec::RemoteExtSpec;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use pageserver_api::models::ShardParameters;
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId, TimelineId};
@@ -543,6 +544,7 @@ impl Endpoint {
storage_auth_token: auth_token.clone(),
remote_extensions,
pgbouncer_settings: None,
shard_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE.0 as usize),
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo};
use pageserver_api::models::{self, LocationConfig, ShardParameters, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
@@ -376,6 +376,7 @@ impl PageServerNode {
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
generation,
config,
shard_parameters: ShardParameters::default(),
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -471,18 +472,21 @@ impl PageServerNode {
pub async fn location_config(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
config: LocationConfig,
flush_ms: Option<Duration>,
) -> anyhow::Result<()> {
Ok(self
.http_client
.location_config(tenant_id, config, flush_ms)
.location_config(tenant_shard_id, config, flush_ms)
.await?)
}
pub async fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
Ok(self.http_client.list_timelines(*tenant_id).await?)
pub async fn timeline_list(
&self,
tenant_shard_id: &TenantShardId,
) -> anyhow::Result<Vec<TimelineInfo>> {
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
}
pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> {

View File

@@ -24,7 +24,9 @@ async fn get_lsns(
tenant_id: TenantId,
pageserver: &PageServerNode,
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let timelines = pageserver.timeline_list(&tenant_id).await?;
let timelines = pageserver
.timeline_list(&TenantShardId::unsharded(tenant_id))
.await?;
Ok(timelines
.into_iter()
.map(|t| (t.timeline_id, t.last_record_lsn))
@@ -120,7 +122,9 @@ pub async fn migrate_tenant(
.attach_hook(tenant_id, dest_ps.conf.id)
.await?;
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None).await?;
dest_ps
.location_config(TenantShardId::unsharded(tenant_id), dest_conf, None)
.await?;
println!("✅ Migration complete");
return Ok(());
}
@@ -130,7 +134,11 @@ pub async fn migrate_tenant(
let stale_conf =
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
origin_ps
.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))
.location_config(
TenantShardId::unsharded(tenant_id),
stale_conf,
Some(Duration::from_secs(10)),
)
.await?;
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?);
@@ -156,7 +164,9 @@ pub async fn migrate_tenant(
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
dest_ps.location_config(tenant_id, dest_conf, None).await?;
dest_ps
.location_config(TenantShardId::unsharded(tenant_id), dest_conf, None)
.await?;
if let Some(baseline) = baseline_lsns {
println!("🕑 Waiting for LSN to catch up...");
@@ -203,7 +213,7 @@ pub async fn migrate_tenant(
other_ps.conf.id
);
other_ps
.location_config(tenant_id, secondary_conf, None)
.location_config(TenantShardId::unsharded(tenant_id), secondary_conf, None)
.await?;
}
@@ -212,7 +222,9 @@ pub async fn migrate_tenant(
dest_ps.conf.id
);
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None).await?;
dest_ps
.location_config(TenantShardId::unsharded(tenant_id), dest_conf, None)
.await?;
println!("✅ Migration complete");

View File

@@ -75,6 +75,10 @@ pub struct ComputeSpec {
pub remote_extensions: Option<RemoteExtSpec>,
pub pgbouncer_settings: Option<HashMap<String, String>>,
// Stripe size for pageserver sharding, in pages
#[serde(default)]
pub shard_stripe_size: Option<usize>,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.

View File

@@ -18,7 +18,10 @@ use utils::{
lsn::Lsn,
};
use crate::{reltag::RelTag, shard::TenantShardId};
use crate::{
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
use anyhow::bail;
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -188,6 +191,31 @@ pub struct TimelineCreateRequest {
pub pg_version: Option<u32>,
}
/// Parameters that apply to all shards in a tenant. Used during tenant creation.
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct ShardParameters {
pub count: ShardCount,
pub stripe_size: ShardStripeSize,
}
impl ShardParameters {
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
pub fn is_unsharded(&self) -> bool {
self.count == ShardCount(0)
}
}
impl Default for ShardParameters {
fn default() -> Self {
Self {
count: ShardCount(0),
stripe_size: Self::DEFAULT_STRIPE_SIZE,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
@@ -195,6 +223,12 @@ pub struct TenantCreateRequest {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
// If omitted, create a single shard with TenantShardId::unsharded()
#[serde(default)]
#[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
pub shard_parameters: ShardParameters,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
@@ -297,7 +331,7 @@ pub struct StatusResponse {
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
pub tenant_id: TenantId,
pub tenant_id: TenantShardId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}
@@ -660,6 +694,17 @@ pub struct PagestreamDbSizeResponse {
pub db_size: i64,
}
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
// that require pageserver-internal types. It is sufficient to get the total size.
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantHistorySize {
pub id: TenantId,
/// Size is a mixture of WAL and logical size, so the unit is bytes.
///
/// Will be none if `?inputs_only=true` was given.
pub size: Option<u64>,
}
impl PagestreamFeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();

View File

@@ -1,6 +1,9 @@
use std::{ops::RangeInclusive, str::FromStr};
use crate::key::{is_rel_block_key, Key};
use crate::{
key::{is_rel_block_key, Key},
models::ShardParameters,
};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use thiserror;
@@ -85,6 +88,12 @@ impl TenantShardId {
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
pub fn to_index(&self) -> ShardIndex {
ShardIndex {
shard_number: self.shard_number,
shard_count: self.shard_count,
}
}
}
/// Formatting helper
@@ -403,6 +412,17 @@ impl ShardIdentity {
}
}
/// For use when creating ShardIdentity instances for new shards, where a creation request
/// specifies the ShardParameters that apply to all shards.
pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
Self {
number,
count: params.count,
layout: LAYOUT_V1,
stripe_size: params.stripe_size,
}
}
fn is_broken(&self) -> bool {
self.layout == LAYOUT_BROKEN
}

View File

@@ -5,7 +5,9 @@ use std::collections::HashMap;
use std::env;
use std::num::NonZeroU32;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
use anyhow::Result;
@@ -13,12 +15,14 @@ use azure_core::request_options::{MaxResults, Metadata, Range};
use azure_core::RetryOptions;
use azure_identity::DefaultAzureCredential;
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::prelude::ClientBuilder;
use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
use bytes::Bytes;
use futures::stream::Stream;
use futures_util::StreamExt;
use http_types::StatusCode;
use http_types::{StatusCode, Url};
use tokio::time::Instant;
use tracing::debug;
use crate::s3_bucket::RequestKind;
@@ -323,10 +327,49 @@ impl RemoteStorage for AzureBlobStorage {
Ok(())
}
async fn copy(&self, _from: &RemotePath, _to: &RemotePath) -> anyhow::Result<()> {
Err(anyhow::anyhow!(
"copy for azure blob storage is not implemented"
))
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Copy).await;
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
let source_url = format!(
"{}/{}",
self.client.url()?,
self.relative_path_to_name(from)
);
let builder = blob_client.copy(Url::from_str(&source_url)?);
let result = builder.into_future().await?;
let mut copy_status = result.copy_status;
let start_time = Instant::now();
const MAX_WAIT_TIME: Duration = Duration::from_secs(60);
loop {
match copy_status {
CopyStatus::Aborted => {
anyhow::bail!("Received abort for copy from {from} to {to}.");
}
CopyStatus::Failed => {
anyhow::bail!("Received failure response for copy from {from} to {to}.");
}
CopyStatus::Success => return Ok(()),
CopyStatus::Pending => (),
}
// The copy is taking longer. Waiting a second and then re-trying.
// TODO estimate time based on copy_progress and adjust time based on that
tokio::time::sleep(Duration::from_millis(1000)).await;
let properties = blob_client.get_properties().into_future().await?;
let Some(status) = properties.blob.properties.copy_status else {
tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
return Ok(());
};
if start_time.elapsed() > MAX_WAIT_TIME {
anyhow::bail!("Copy from from {from} to {to} took longer than limit MAX_WAIT_TIME={}s. copy_pogress={:?}.",
MAX_WAIT_TIME.as_secs_f32(),
properties.blob.properties.copy_progress,
);
}
copy_status = status;
}
}
}

View File

@@ -263,6 +263,44 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
Ok(())
}
#[test_context(MaybeEnabledAzure)]
#[tokio::test]
async fn azure_copy_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
let MaybeEnabledAzure::Enabled(ctx) = ctx else {
return Ok(());
};
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
.with_context(|| "RemotePath conversion")?;
let path_dest = RemotePath::new(Utf8Path::new(
format!("{}/file_dest", ctx.base_prefix).as_str(),
))
.with_context(|| "RemotePath conversion")?;
let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
let (data, len) = wrap_stream(orig.clone());
ctx.client.upload(data, len, &path, None).await?;
// Normal download request
ctx.client.copy_object(&path, &path_dest).await?;
let dl = ctx.client.download(&path_dest).await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
debug!("Cleanup: deleting file at path {path:?}");
ctx.client
.delete_objects(&[path.clone(), path_dest.clone()])
.await
.with_context(|| format!("{path:?} removal"))?;
Ok(())
}
struct EnabledAzure {
client: Arc<GenericRemoteStorage>,
base_prefix: &'static str,

View File

@@ -259,6 +259,44 @@ async fn s3_upload_download_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()
Ok(())
}
#[test_context(MaybeEnabledS3)]
#[tokio::test]
async fn s3_copy_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
let MaybeEnabledS3::Enabled(ctx) = ctx else {
return Ok(());
};
let path = RemotePath::new(Utf8Path::new(
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
))
.with_context(|| "RemotePath conversion")?;
let path_dest = RemotePath::new(Utf8Path::new(
format!("{}/file_dest", ctx.base_prefix).as_str(),
))
.with_context(|| "RemotePath conversion")?;
let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
let (data, len) = wrap_stream(orig.clone());
ctx.client.upload(data, len, &path, None).await?;
// Normal download request
ctx.client.copy_object(&path, &path_dest).await?;
let dl = ctx.client.download(&path_dest).await?;
let buf = download_to_vec(dl).await?;
assert_eq!(&buf, &orig);
debug!("Cleanup: deleting file at path {path:?}");
ctx.client
.delete_objects(&[path.clone(), path_dest.clone()])
.await
.with_context(|| format!("{path:?} removal"))?;
Ok(())
}
struct EnabledS3 {
client: Arc<GenericRemoteStorage>,
base_prefix: &'static str,

View File

@@ -1,5 +1,5 @@
use pageserver_api::{models::*, shard::TenantShardId};
use reqwest::{IntoUrl, Method};
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
@@ -22,8 +22,8 @@ pub enum Error {
#[error("receive error body: {0}")]
ReceiveErrorBody(String),
#[error("pageserver API: {0}")]
ApiError(String),
#[error("pageserver API: {1}")]
ApiError(StatusCode, String),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -41,7 +41,7 @@ impl ResponseErrorMessageExt for reqwest::Response {
let url = self.url().to_owned();
Err(match self.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(msg),
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
}
@@ -71,9 +71,9 @@ impl Client {
pub async fn tenant_details(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
) -> Result<pageserver_api::models::TenantDetails> {
let uri = format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint);
let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
self.get(uri)
.await?
.json()
@@ -83,9 +83,12 @@ impl Client {
pub async fn list_timelines(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
let uri = format!("{}/v1/tenant/{tenant_id}/timeline", self.mgmt_api_endpoint);
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline",
self.mgmt_api_endpoint
);
self.get(&uri)
.await?
.json()
@@ -179,23 +182,23 @@ impl Client {
"{}/v1/tenant/{}/secondary/download",
self.mgmt_api_endpoint, tenant_id
);
self.request(Method::POST, &uri, ())
.await?
.error_for_status()
.map(|_| ())
.map_err(|e| Error::ApiError(format!("{}", e)))
self.request(Method::POST, &uri, ()).await?;
Ok(())
}
pub async fn location_config(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
config: LocationConfig,
flush_ms: Option<std::time::Duration>,
) -> Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
let req_body = TenantLocationConfigRequest {
tenant_id: tenant_shard_id,
config,
};
let path = format!(
"{}/v1/tenant/{}/location_config",
self.mgmt_api_endpoint, tenant_id
self.mgmt_api_endpoint, tenant_shard_id
);
let path = if let Some(flush_ms) = flush_ms {
format!("{}?flush_ms={}", path, flush_ms.as_millis())
@@ -233,4 +236,19 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn tenant_synthetic_size(
&self,
tenant_shard_id: TenantShardId,
) -> Result<TenantHistorySize> {
let uri = format!(
"{}/v1/tenant/{}/synthetic_size",
self.mgmt_api_endpoint, tenant_shard_id
);
self.get(&uri)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -2,6 +2,7 @@
use std::sync::Arc;
use pageserver_api::shard::TenantShardId;
use tokio::task::JoinSet;
use utils::id::{TenantId, TenantTimelineId};
@@ -31,7 +32,10 @@ pub async fn get_pageserver_tenant_timelines_unsharded(
async move {
(
tenant_id,
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
mgmt_api_client
.tenant_details(TenantShardId::unsharded(tenant_id))
.await
.unwrap(),
)
}
});

View File

@@ -349,10 +349,10 @@ async fn main_impl(
let work_sender_task = tokio::spawn(work_sender);
info!("waiting for everything to become ready");
start_work_barrier.wait().await;
info!("work started");
if let Some(runtime) = args.runtime {
info!("waiting for everything to become ready");
start_work_barrier.wait().await;
info!("work started");
tokio::time::sleep(runtime.into()).await;
info!("runtime over, signalling cancellation");
cancel.cancel();

View File

@@ -527,6 +527,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
}

View File

@@ -47,21 +47,24 @@ use std::{
};
use anyhow::Context;
use camino::Utf8Path;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use utils::{completion, id::TimelineId};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
storage_layer::{AsLayerDesc, EvictionError, Layer},
mgr::TenantManager,
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName},
Timeline,
},
};
@@ -125,6 +128,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
tenant_manager: Arc<TenantManager>,
background_jobs_barrier: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
@@ -150,8 +154,7 @@ pub fn launch_disk_usage_global_eviction_task(
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel)
.await;
disk_usage_eviction_task(&state, task_config, &storage, tenant_manager, cancel).await;
Ok(())
},
);
@@ -164,7 +167,7 @@ async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) {
scopeguard::defer! {
@@ -191,7 +194,7 @@ async fn disk_usage_eviction_task(
state,
task_config,
storage,
tenants_dir,
&tenant_manager,
&cancel,
)
.await;
@@ -226,15 +229,17 @@ async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
let tenants_dir = tenant_manager.get_conf().tenants_path();
let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(
state,
storage,
usage_pre,
tenant_manager,
task_config.eviction_order,
cancel,
)
@@ -248,7 +253,7 @@ async fn disk_usage_eviction_task_iteration(
}
IterationOutcome::Finished(outcome) => {
// Verify with statvfs whether we made any real progress
let after = filesystem_level_usage::get(tenants_dir, task_config)
let after = filesystem_level_usage::get(&tenants_dir, task_config)
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
.context("get filesystem-level disk usage after evictions")?;
@@ -324,6 +329,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
_storage: &GenericRemoteStorage,
usage_pre: U,
tenant_manager: &Arc<TenantManager>,
eviction_order: EvictionOrder,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
@@ -344,29 +350,29 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
"running disk usage based eviction due to pressure"
);
let candidates = match collect_eviction_candidates(eviction_order, cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
EvictionCandidates::Finished(partitioned) => partitioned,
};
let candidates =
match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
EvictionCandidates::Finished(partitioned) => partitioned,
};
// Debug-log the list of candidates
let now = SystemTime::now();
for (i, (partition, candidate)) in candidates.iter().enumerate() {
let nth = i + 1;
let desc = candidate.layer.layer_desc();
let total_candidates = candidates.len();
let size = desc.file_size;
let size = candidate.layer.get_file_size();
let rel = candidate.relative_last_activity;
debug!(
"cand {nth}/{total_candidates}: size={size}, rel_last_activity={rel}, no_access_for={}us, partition={partition:?}, {}/{}/{}",
now.duration_since(candidate.last_activity_ts)
.unwrap()
.as_micros(),
desc.tenant_shard_id,
desc.timeline_id,
candidate.layer,
candidate.layer.get_tenant_shard_id(),
candidate.layer.get_timeline_id(),
candidate.layer.get_name(),
);
}
@@ -398,7 +404,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
warned = Some(usage_planned);
}
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
usage_planned.add_available_bytes(candidate.layer.get_file_size());
evicted_amount += 1;
}
@@ -463,19 +469,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
continue;
};
js.spawn(async move {
let rtc = candidate.timeline.remote_client.as_ref().expect(
"holding the witness, all timelines must have a remote timeline client",
);
let file_size = candidate.layer.layer_desc().file_size;
candidate
.layer
.evict_and_wait(rtc)
.await
.map(|()| file_size)
.map_err(|e| (file_size, e))
});
match candidate.layer {
EvictionLayer::Attached(layer) => {
let file_size = layer.layer_desc().file_size;
js.spawn(async move {
layer
.evict_and_wait()
.await
.map(|()| file_size)
.map_err(|e| (file_size, e))
});
}
EvictionLayer::Secondary(layer) => {
let file_size = layer.metadata.file_size();
let tenant_manager = tenant_manager.clone();
js.spawn(async move {
layer
.secondary_tenant
.evict_layer(tenant_manager.get_conf(), layer.timeline_id, layer.name)
.await;
Ok(file_size)
});
}
}
tokio::task::yield_now().await;
}
@@ -502,11 +519,100 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
}
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Layer,
last_activity_ts: SystemTime,
relative_last_activity: finite_f32::FiniteF32,
pub(crate) struct EvictionSecondaryLayer {
pub(crate) secondary_tenant: Arc<SecondaryTenant>,
pub(crate) timeline_id: TimelineId,
pub(crate) name: LayerFileName,
pub(crate) metadata: LayerFileMetadata,
}
/// Full [`Layer`] objects are specific to tenants in attached mode. This type is a layer
/// of indirection to store either a `Layer`, or a reference to a secondary tenant and a layer name.
#[derive(Clone)]
pub(crate) enum EvictionLayer {
Attached(Layer),
#[allow(dead_code)]
Secondary(EvictionSecondaryLayer),
}
impl From<Layer> for EvictionLayer {
fn from(value: Layer) -> Self {
Self::Attached(value)
}
}
impl EvictionLayer {
pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
match self {
Self::Attached(l) => &l.layer_desc().tenant_shard_id,
Self::Secondary(sl) => sl.secondary_tenant.get_tenant_shard_id(),
}
}
pub(crate) fn get_timeline_id(&self) -> &TimelineId {
match self {
Self::Attached(l) => &l.layer_desc().timeline_id,
Self::Secondary(sl) => &sl.timeline_id,
}
}
pub(crate) fn get_name(&self) -> LayerFileName {
match self {
Self::Attached(l) => l.layer_desc().filename(),
Self::Secondary(sl) => sl.name.clone(),
}
}
pub(crate) fn get_file_size(&self) -> u64 {
match self {
Self::Attached(l) => l.layer_desc().file_size,
Self::Secondary(sl) => sl.metadata.file_size(),
}
}
}
#[derive(Clone)]
pub(crate) struct EvictionCandidate {
pub(crate) layer: EvictionLayer,
pub(crate) last_activity_ts: SystemTime,
pub(crate) relative_last_activity: finite_f32::FiniteF32,
}
impl std::fmt::Display for EvictionLayer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Attached(l) => l.fmt(f),
Self::Secondary(sl) => {
write!(f, "{}/{}", sl.timeline_id, sl.name)
}
}
}
}
pub(crate) struct DiskUsageEvictionInfo {
/// Timeline's largest layer (remote or resident)
pub max_layer_size: Option<u64>,
/// Timeline's resident layers
pub resident_layers: Vec<EvictionCandidate>,
}
impl std::fmt::Debug for EvictionCandidate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
// having to allocate a string to this is bad, but it will rarely be formatted
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
struct DisplayIsDebug<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
.field("layer", &DisplayIsDebug(&self.layer))
.field("last_activity", &ts)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
@@ -623,6 +729,7 @@ enum EvictionCandidates {
/// - tenant B 1 layer
/// - tenant C 8 layers
async fn collect_eviction_candidates(
tenant_manager: &Arc<TenantManager>,
eviction_order: EvictionOrder,
cancel: &CancellationToken,
) -> anyhow::Result<EvictionCandidates> {
@@ -631,13 +738,16 @@ async fn collect_eviction_candidates(
.await
.context("get list of tenants")?;
// TODO: avoid listing every layer in every tenant: this loop can block the executor,
// and the resulting data structure can be huge.
// (https://github.com/neondatabase/neon/issues/6224)
let mut candidates = Vec::new();
for (tenant_id, _state, _gen) in &tenants {
for (tenant_id, _state, _gen) in tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
let tenant = match tenant::mgr::get_tenant(tenant_id, true) {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
@@ -665,11 +775,7 @@ async fn collect_eviction_candidates(
}
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers
.into_iter()
.map(|layer_infos| (tl.clone(), layer_infos)),
);
tenant_candidates.extend(info.resident_layers.into_iter());
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
if cancel.is_cancelled() {
@@ -707,7 +813,7 @@ async fn collect_eviction_candidates(
// Sort layers most-recently-used first, then partition by
// cumsum above/below min_resident_size.
tenant_candidates
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
let mut cumsum: i128 = 0;
// keeping the -1 or not decides if every tenant should lose their least recently accessed
@@ -741,12 +847,10 @@ async fn collect_eviction_candidates(
.unwrap_or(1);
let divider = total as f32;
for (i, (timeline, layer_info)) in tenant_candidates.into_iter().enumerate() {
let file_size = layer_info.file_size();
for (i, mut candidate) in tenant_candidates.into_iter().enumerate() {
// as we iterate this reverse sorted list, the most recently accessed layer will always
// be 1.0; this is for us to evict it last.
let relative_last_activity = if matches!(
candidate.relative_last_activity = if matches!(
eviction_order,
EvictionOrder::RelativeAccessed { .. }
) {
@@ -761,22 +865,46 @@ async fn collect_eviction_candidates(
finite_f32::FiniteF32::ZERO
};
let candidate = EvictionCandidate {
timeline,
last_activity_ts: layer_info.last_activity_ts,
layer: layer_info.layer,
relative_last_activity,
};
let partition = if cumsum > min_resident_size as i128 {
MinResidentSizePartition::Above
} else {
MinResidentSizePartition::Below
};
cumsum += i128::from(candidate.layer.get_file_size());
candidates.push((partition, candidate));
cumsum += i128::from(file_size);
}
}
// Note: the same tenant ID might be hit twice, if it transitions from attached to
// secondary while we run. That is okay: when we eventually try and run the eviction,
// the `Gate` on the object will ensure that whichever one has already been shut down
// will not delete anything.
let mut secondary_tenants = Vec::new();
tenant_manager.foreach_secondary_tenants(
|_tenant_shard_id: &TenantShardId, state: &Arc<SecondaryTenant>| {
secondary_tenants.push(state.clone());
},
);
for secondary_tenant in secondary_tenants {
let mut layer_info = secondary_tenant.get_layers_for_eviction();
layer_info
.resident_layers
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
candidates.extend(layer_info.resident_layers.into_iter().map(|candidate| {
(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
candidate,
)
}));
}
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
@@ -821,7 +949,7 @@ impl std::ops::Deref for TimelineKey {
}
/// A totally ordered f32 subset we can use with sorting functions.
mod finite_f32 {
pub(crate) mod finite_f32 {
/// A totally ordered f32 subset we can use with sorting functions.
#[derive(Clone, Copy, PartialEq)]

View File

@@ -14,6 +14,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
@@ -265,7 +266,7 @@ impl From<SetNewTenantConfigError> for ApiError {
SetNewTenantConfigError::GetTenant(tid) => {
ApiError::NotFound(anyhow!("tenant {}", tid).into())
}
e @ SetNewTenantConfigError::Persist(_) => {
e @ (SetNewTenantConfigError::Persist(_) | SetNewTenantConfigError::Other(_)) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
@@ -704,7 +705,9 @@ async fn tenant_attach_handler(
}
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let shard_params = ShardParameters::default();
let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params);
let tenant = state
.tenant_manager
.upsert_location(
@@ -1194,7 +1197,8 @@ async fn tenant_create_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let location_conf =
LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
let new_tenant = state
.tenant_manager
@@ -1213,7 +1217,6 @@ async fn tenant_create_handler(
"Upsert succeeded but didn't return tenant!"
)));
};
// We created the tenant. Existing API semantics are that the tenant
// is Active when this function returns.
if let res @ Err(_) = new_tenant
@@ -1652,12 +1655,13 @@ async fn disk_usage_eviction_run(
)));
};
let state = state.disk_usage_eviction_state.clone();
let eviction_state = state.disk_usage_eviction_state.clone();
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
&eviction_state,
storage,
usage,
&state.tenant_manager,
config.eviction_order,
&cancel,
)

View File

@@ -337,15 +337,6 @@ pub(crate) mod page_cache_eviction_metrics {
}
}
pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_cache_acquire_pinned_slot_seconds",
"Time spent acquiring a pinned slot in the page cache",
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"page_cache_errors_total",

View File

@@ -550,7 +550,6 @@ impl PageCache {
// not require changes.
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
@@ -563,7 +562,6 @@ impl PageCache {
res.expect("this semaphore is never closed"),
)),
Err(_timeout) => {
timer.stop_and_discard();
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
);

View File

@@ -13,7 +13,10 @@ use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -21,11 +24,14 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::{ShardCount, ShardNumber};
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::collections::HashMap;
use std::io;
use std::net::TcpListener;
use std::pin::pin;
@@ -40,6 +46,7 @@ use tokio_util::sync::CancellationToken;
use tracing::field;
use tracing::*;
use utils::id::ConnectionId;
use utils::sync::gate::GateGuard;
use utils::{
auth::{Claims, Scope, SwappableJwtAuth},
id::{TenantId, TimelineId},
@@ -274,6 +281,13 @@ async fn page_service_conn_main(
}
}
/// While a handler holds a reference to a Timeline, it also holds a the
/// timeline's Gate open.
struct HandlerTimeline {
timeline: Arc<Timeline>,
_guard: GateGuard,
}
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
@@ -285,6 +299,14 @@ struct PageServerHandler {
/// For each query received over the connection,
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
/// See [`Self::cache_timeline`] for usage.
///
/// Note on size: the typical size of this map is 1. The largest size we expect
/// to see is the number of shards divided by the number of pageservers (typically < 2),
/// or the ratio used when splitting shards (i.e. how many children created from one)
/// parent shard, where a "large" number might be ~8.
shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
}
#[derive(thiserror::Error, Debug)]
@@ -358,13 +380,46 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
shard_timelines: HashMap::new(),
}
}
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
/// this rather than naked flush() in order to shut down promptly. Without this, we would
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
/// in the flush.
/// Analogous to calling cancelled() on a Timeline's cancellation token: waits for cancellation.
///
/// We use many Timeline objects, and hold GateGuards on all of them. We must therefore respect
/// all of their cancellation tokens.
async fn timeline_cancelled(&self) {
// A short wait before we expend the cycles to walk our timeline map. This avoids incurring
// that cost every time we check for cancellation.
tokio::time::sleep(Duration::from_millis(10)).await;
// This function is never called concurrently with code that adds timelines to shard_timelines,
// which is enforced by the borrow checker (the future returned by this function carries the
// immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
// missing any inserts to the map.
let mut futs = self
.shard_timelines
.values()
.map(|ht| ht.timeline.cancel.cancelled())
.collect::<FuturesUnordered<_>>();
futs.next().await;
}
/// Analogous to calling is_cancelled() on a Timeline's cancellation token
fn timeline_is_cancelled(&self) -> bool {
self.shard_timelines
.values()
.any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
}
/// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
/// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
/// cancellation if there aren't any timelines in the cache.
///
/// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
/// timeline cancellation token.
async fn flush_cancellable<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
@@ -377,6 +432,9 @@ impl PageServerHandler {
flush_r = pgb.flush() => {
Ok(flush_r?)
},
_ = self.timeline_cancelled() => {
Err(QueryError::Shutdown)
}
_ = cancel.cancelled() => {
Err(QueryError::Shutdown)
}
@@ -452,7 +510,7 @@ impl PageServerHandler {
#[instrument(skip_all)]
async fn handle_pagerequests<IO>(
&self,
&mut self,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -463,10 +521,6 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id();
// Note that since one connection may contain getpage requests that target different
// shards (e.g. during splitting when the compute is not yet aware of the split), the tenant
// that we look up here may not be the one that serves all the actual requests: we will double
// check the mapping of key->shard later before calling into Timeline for getpage requests.
let tenant = mgr::get_active_tenant_with_timeout(
tenant_id,
ShardSelector::First,
@@ -487,19 +541,9 @@ impl PageServerHandler {
None
};
// Check that the timeline exists
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
// Avoid starting new requests if the timeline has already started shutting down,
// and block timeline shutdown until this request is complete, or drops out due
// to cancellation.
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
self.flush_cancellable(pgb, &tenant.cancel).await?;
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
@@ -507,7 +551,7 @@ impl PageServerHandler {
let msg = tokio::select! {
biased;
_ = timeline.cancel.cancelled() => {
_ = self.timeline_cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
@@ -544,7 +588,7 @@ impl PageServerHandler {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
(
self.handle_get_rel_exists_request(&timeline, &req, &ctx)
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
@@ -554,7 +598,7 @@ impl PageServerHandler {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
(
self.handle_get_nblocks_request(&timeline, &req, &ctx)
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
@@ -564,7 +608,7 @@ impl PageServerHandler {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
(
self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
@@ -574,7 +618,7 @@ impl PageServerHandler {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
(
self.handle_db_size_request(&timeline, &req, &ctx)
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
@@ -594,7 +638,7 @@ impl PageServerHandler {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
Err(e) if self.timeline_is_cancelled() => {
// This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
// shutdown error, this may be buried inside a PageReconstructError::Other for example.
//
@@ -617,7 +661,7 @@ impl PageServerHandler {
});
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
self.flush_cancellable(pgb, &tenant.cancel).await?;
}
}
}
@@ -814,11 +858,14 @@ impl PageServerHandler {
}
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
@@ -834,11 +881,13 @@ impl PageServerHandler {
}
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
@@ -854,11 +903,13 @@ impl PageServerHandler {
}
async fn handle_db_size_request(
&self,
timeline: &Timeline,
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
@@ -880,16 +931,160 @@ impl PageServerHandler {
}))
}
async fn do_handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
/// For most getpage requests, we will already have a Timeline to serve the request: this function
/// looks up such a Timeline synchronously and without touching any global state.
fn get_cached_timeline_for_page(
&mut self,
req: &PagestreamGetPageRequest,
) -> Result<&Arc<Timeline>, Key> {
let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
// Fastest path: single sharded case
if first_idx.shard_count < ShardCount(2) {
return Ok(&first_timeline.timeline);
}
let key = rel_block_to_key(req.rel, req.blkno);
let shard_num = first_timeline
.timeline
.get_shard_identity()
.get_shard_number(&key);
// Fast path: matched the first timeline in our local handler map. This case is common if
// only one shard per tenant is attached to this pageserver.
if first_timeline.timeline.get_shard_identity().number == shard_num {
return Ok(&first_timeline.timeline);
}
let shard_index = ShardIndex {
shard_number: shard_num,
shard_count: first_timeline.timeline.get_shard_identity().count,
};
// Fast-ish path: timeline is in the connection handler's local cache
if let Some(found) = self.shard_timelines.get(&shard_index) {
return Ok(&found.timeline);
}
key
} else {
rel_block_to_key(req.rel, req.blkno)
};
Err(key)
}
/// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
/// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
/// again.
///
/// Note that all the Timelines in this cache are for the same timeline_id: they're differ
/// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
/// based on key.
///
/// The typical size of this cache is 1, as we generally create shards to distribute work
/// across pageservers, so don't tend to have multiple shards for the same tenant on the
/// same pageserver.
fn cache_timeline(
&mut self,
timeline: Arc<Timeline>,
) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
let gate_guard = timeline
.gate
.enter()
.map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
let shard_index = timeline.tenant_shard_id.to_index();
let entry = self
.shard_timelines
.entry(shard_index)
.or_insert(HandlerTimeline {
timeline,
_guard: gate_guard,
});
Ok(&entry.timeline)
}
/// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
/// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
/// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
async fn load_timeline_for_page(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
// Slow path: we must call out to the TenantManager to find the timeline for this Key
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
.await?;
self.cache_timeline(timeline)
}
async fn get_timeline_shard_zero(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
// This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
// that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
// ref to salf. So instead, we first build a bool, and then return while not borrowing self.
let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
idx.shard_number == ShardNumber(0)
} else {
false
};
if have_cached {
let entry = self.shard_timelines.iter().next().unwrap();
Ok(&entry.1.timeline)
} else {
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
Ok(self.cache_timeline(timeline)?)
}
}
async fn handle_get_page_at_lsn_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self.get_cached_timeline_for_page(req) {
Ok(tl) => tl,
Err(key) => {
match self
.load_timeline_for_page(tenant_id, timeline_id, key)
.await
{
Ok(t) => t,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
}
}
};
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.await?;
@@ -899,60 +1094,6 @@ impl PageServerHandler {
}))
}
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let key = rel_block_to_key(req.rel, req.blkno);
if timeline.get_shard_identity().is_key_local(&key) {
self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
.await
} else {
// The Tenant shard we looked up at connection start does not hold this particular
// key: look for other shards in this tenant. This scenario occurs if a pageserver
// has multiple shards for the same tenant.
//
// TODO: optimize this (https://github.com/neondatabase/neon/pull/6037)
let timeline = match self
.get_active_tenant_timeline(
timeline.tenant_shard_id.tenant_id,
timeline.timeline_id,
ShardSelector::Page(key),
)
.await
{
Ok(t) => t,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
// the GateGuard was already held over the whole connection.
let _timeline_guard = timeline
.gate
.enter()
.map_err(|_| PageStreamError::Shutdown)?;
self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
.await
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
async fn handle_basebackup_request<IO>(

View File

@@ -18,6 +18,7 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TimelineState;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::TenantShardId;
@@ -2674,10 +2675,11 @@ impl Tenant {
}
}
// Legacy configs are implicitly in attached state
// Legacy configs are implicitly in attached state, and do not support sharding
Ok(LocationConf::attached_single(
tenant_conf,
Generation::none(),
&ShardParameters::default(),
))
} else {
// FIXME If the config file is not found, assume that we're attaching
@@ -3962,6 +3964,7 @@ pub(crate) mod harness {
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt::from(self.tenant_conf),
self.generation,
&ShardParameters::default(),
))
.unwrap(),
// This is a legacy/test code path: sharding isn't supported here.

View File

@@ -9,7 +9,7 @@
//! may lead to a data loss.
//!
use anyhow::bail;
use pageserver_api::models;
use pageserver_api::models::{self, ShardParameters};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize};
@@ -167,14 +167,17 @@ impl LocationConf {
/// For use when loading from a legacy configuration: presence of a tenant
/// implies it is in AttachmentMode::Single, which used to be the only
/// possible state. This function should eventually be removed.
pub(crate) fn attached_single(tenant_conf: TenantConfOpt, generation: Generation) -> Self {
pub(crate) fn attached_single(
tenant_conf: TenantConfOpt,
generation: Generation,
shard_params: &ShardParameters,
) -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
}),
// Legacy configuration loads are always from tenants created before sharding existed.
shard: ShardIdentity::unsharded(),
shard: ShardIdentity::from_params(ShardNumber(0), shard_params),
tenant_conf,
}
}

View File

@@ -3,7 +3,8 @@
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use pageserver_api::key::Key;
use pageserver_api::shard::{ShardIdentity, ShardNumber, TenantShardId};
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
@@ -760,6 +761,8 @@ pub(crate) enum SetNewTenantConfigError {
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
#[error(transparent)]
Other(anyhow::Error),
}
pub(crate) async fn set_new_tenant_config(
@@ -773,10 +776,21 @@ pub(crate) async fn set_new_tenant_config(
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_shard_id, true)?;
if tenant.tenant_shard_id().shard_count > ShardCount(0) {
// Note that we use ShardParameters::default below.
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
)));
}
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
let location_conf = LocationConf::attached_single(
new_tenant_conf,
tenant.generation,
&ShardParameters::default(),
);
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
.await

View File

@@ -3,22 +3,31 @@ pub mod heatmap;
mod heatmap_uploader;
mod scheduler;
use std::sync::Arc;
use std::{sync::Arc, time::SystemTime};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::{
config::PageServerConf,
disk_usage_eviction_task::DiskUsageEvictionInfo,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
virtual_file::MaybeFatalIo,
};
use self::{
downloader::{downloader_task, SecondaryDetail},
heatmap_uploader::heatmap_uploader_task,
};
use super::{config::SecondaryLocationConfig, mgr::TenantManager};
use super::{
config::SecondaryLocationConfig, mgr::TenantManager,
span::debug_assert_current_span_has_tenant_id, storage_layer::LayerFileName,
};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::{completion::Barrier, sync::gate::Gate};
use tracing::instrument;
use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
enum DownloadCommand {
Download(TenantShardId),
@@ -107,9 +116,67 @@ impl SecondaryTenant {
self.detail.lock().unwrap().config = config.clone();
}
fn get_tenant_shard_id(&self) -> &TenantShardId {
pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
&self.tenant_shard_id
}
pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> DiskUsageEvictionInfo {
self.detail.lock().unwrap().get_layers_for_eviction(self)
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
pub(crate) async fn evict_layer(
&self,
conf: &PageServerConf,
timeline_id: TimelineId,
name: LayerFileName,
) {
debug_assert_current_span_has_tenant_id();
let _guard = match self.gate.enter() {
Ok(g) => g,
Err(_) => {
tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
return;
}
};
let now = SystemTime::now();
let path = conf
.timeline_path(&self.tenant_shard_id, &timeline_id)
.join(name.file_name());
// We tolerate ENOENT, because between planning eviction and executing
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
// Other local I/O errors are process-fatal: these should never happen.
tokio::fs::remove_file(path)
.await
.or_else(fs_ext::ignore_not_found)
.fatal_err("Deleting layer during eviction");
// Update the timeline's state. This does not have to be synchronized with
// the download process, because:
// - If downloader is racing with us to remove a file (e.g. because it is
// removed from heatmap), then our mutual .remove() operations will both
// succeed.
// - If downloader is racing with us to download the object (this would require
// multiple eviction iterations to race with multiple download iterations), then
// if we remove it from the state, the worst that happens is the downloader
// downloads it again before re-inserting, or we delete the file but it remains
// in the state map (in which case it will be downloaded if this secondary
// tenant transitions to attached and tries to access it)
//
// The important assumption here is that the secondary timeline state does not
// have to 100% match what is on disk, because it's a best-effort warming
// of the cache.
let mut detail = self.detail.lock().unwrap();
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
timeline_detail.on_disk_layers.remove(&name);
timeline_detail.evicted_at.insert(name, now);
}
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,

View File

@@ -8,6 +8,9 @@ use std::{
use crate::{
config::PageServerConf,
disk_usage_eviction_task::{
finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
},
metrics::SECONDARY_MODE,
tenant::{
config::SecondaryLocationConfig,
@@ -142,6 +145,46 @@ impl SecondaryDetail {
timelines: HashMap::new(),
}
}
pub(super) fn get_layers_for_eviction(
&self,
parent: &Arc<SecondaryTenant>,
) -> DiskUsageEvictionInfo {
let mut result = DiskUsageEvictionInfo {
max_layer_size: None,
resident_layers: Vec::new(),
};
for (timeline_id, timeline_detail) in &self.timelines {
result
.resident_layers
.extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
EvictionCandidate {
layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
secondary_tenant: parent.clone(),
timeline_id: *timeline_id,
name: name.clone(),
metadata: ods.metadata.clone(),
}),
last_activity_ts: ods.access_time,
relative_last_activity: finite_f32::FiniteF32::ZERO,
}
}));
}
result.max_layer_size = result
.resident_layers
.iter()
.map(|l| l.layer.get_file_size())
.max();
tracing::debug!(
"eviction: secondary tenant {} found {} timelines, {} layers",
parent.get_tenant_shard_id(),
self.timelines.len(),
result.resident_layers.len()
);
result
}
}
struct PendingDownload {

View File

@@ -15,7 +15,7 @@ use utils::sync::heavier_once_cell;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::{remote_timeline_client::LayerFileMetadata, RemoteTimelineClient, Timeline};
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self, DeltaEntry};
use super::image_layer;
@@ -204,17 +204,14 @@ impl Layer {
///
/// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
/// of download-evict cycle on retry.
pub(crate) async fn evict_and_wait(
&self,
rtc: &RemoteTimelineClient,
) -> Result<(), EvictionError> {
self.0.evict_and_wait(rtc).await
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
self.0.evict_and_wait().await
}
/// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
/// then.
///
/// On drop, this will cause a call to [`RemoteTimelineClient::schedule_deletion_of_unlinked`].
/// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
/// This means that the unlinking by [gc] or [compaction] must have happened strictly before
/// the value this is called on gets dropped.
///
@@ -606,10 +603,7 @@ impl LayerInner {
/// Cancellation safe, however dropping the future and calling this method again might result
/// in a new attempt to evict OR join the previously started attempt.
pub(crate) async fn evict_and_wait(
&self,
_: &RemoteTimelineClient,
) -> Result<(), EvictionError> {
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
use tokio::sync::broadcast::error::RecvError;
assert!(self.have_remote_client);

View File

@@ -42,15 +42,6 @@ use std::{
ops::ControlFlow,
};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
@@ -58,7 +49,22 @@ use crate::tenant::{
metadata::{save_metadata, TimelineMetadata},
par_fsync,
};
use crate::{
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
disk_usage_eviction_task::DiskUsageEvictionInfo,
};
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
},
};
use crate::{
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
};
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
@@ -1133,12 +1139,7 @@ impl Timeline {
return Ok(None);
};
let rtc = self
.remote_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
match local_layer.evict_and_wait(rtc).await {
match local_layer.evict_and_wait().await {
Ok(()) => Ok(Some(true)),
Err(EvictionError::NotFound) => Ok(Some(false)),
Err(EvictionError::Downloaded) => Ok(Some(false)),
@@ -2102,7 +2103,7 @@ impl Timeline {
let layer_file_names = eviction_info
.resident_layers
.iter()
.map(|l| l.layer.layer_desc().filename())
.map(|l| l.layer.get_name())
.collect::<Vec<_>>();
let decorated = match remote_client.get_layers_metadata(layer_file_names) {
@@ -2120,7 +2121,7 @@ impl Timeline {
.filter_map(|(layer, remote_info)| {
remote_info.map(|remote_info| {
HeatMapLayer::new(
layer.layer.layer_desc().filename(),
layer.layer.get_name(),
IndexLayerMetadata::from(remote_info),
layer.last_activity_ts,
)
@@ -4423,43 +4424,6 @@ impl Timeline {
}
}
pub(crate) struct DiskUsageEvictionInfo {
/// Timeline's largest layer (remote or resident)
pub max_layer_size: Option<u64>,
/// Timeline's resident layers
pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
}
pub(crate) struct LocalLayerInfoForDiskUsageEviction {
pub layer: Layer,
pub last_activity_ts: SystemTime,
}
impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
// having to allocate a string to this is bad, but it will rarely be formatted
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
struct DisplayIsDebug<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
.field("layer", &DisplayIsDebug(&self.layer))
.field("last_activity", &ts)
.finish()
}
}
impl LocalLayerInfoForDiskUsageEviction {
pub fn file_size(&self) -> u64 {
self.layer.layer_desc().file_size
}
}
impl Timeline {
/// Returns non-remote layers for eviction.
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
@@ -4493,9 +4457,10 @@ impl Timeline {
SystemTime::now()
});
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
layer: l.drop_eviction_guard(),
resident_layers.push(EvictionCandidate {
layer: l.drop_eviction_guard().into(),
last_activity_ts,
relative_last_activity: finite_f32::FiniteF32::ZERO,
});
}
@@ -4652,11 +4617,6 @@ mod tests {
.await
.unwrap();
let rtc = timeline
.remote_client
.clone()
.expect("just configured this");
let layer = find_some_layer(&timeline).await;
let layer = layer
.keep_resident()
@@ -4665,8 +4625,8 @@ mod tests {
.expect("should had been resident")
.drop_eviction_guard();
let first = async { layer.evict_and_wait(&rtc).await };
let second = async { layer.evict_and_wait(&rtc).await };
let first = async { layer.evict_and_wait().await };
let second = async { layer.evict_and_wait().await };
let (first, second) = tokio::join!(first, second);

View File

@@ -215,13 +215,10 @@ impl Timeline {
// So, we just need to deal with this.
let remote_client = match self.remote_client.as_ref() {
Some(c) => c,
None => {
error!("no remote storage configured, cannot evict layers");
return ControlFlow::Continue(());
}
};
if self.remote_client.is_none() {
error!("no remote storage configured, cannot evict layers");
return ControlFlow::Continue(());
}
let mut js = tokio::task::JoinSet::new();
{
@@ -274,9 +271,8 @@ impl Timeline {
};
let layer = guard.drop_eviction_guard();
if no_activity_for > p.threshold {
let remote_client = remote_client.clone();
// this could cause a lot of allocations in some cases
js.spawn(async move { layer.evict_and_wait(&remote_client).await });
js.spawn(async move { layer.evict_and_wait().await });
stats.candidates += 1;
}
}

View File

@@ -5,8 +5,9 @@ edition.workspace = true
license.workspace = true
[features]
default = ["testing"]
default = ["tokio-console"]
testing = []
tokio-console = ["tokio/tracing", "dep:console-subscriber"]
[dependencies]
anyhow.workspace = true
@@ -17,6 +18,7 @@ bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
chrono.workspace = true
clap.workspace = true
console-subscriber = { workspace = true, optional = true }
consumption_metrics.workspace = true
dashmap.workspace = true
futures.workspace = true

View File

@@ -26,8 +26,12 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.await
.map(OpenTelemetryLayer::new);
tracing_subscriber::registry()
.with(env_filter)
let reg = tracing_subscriber::registry();
#[cfg(all(tokio_unstable, feature = "tokio-console"))]
let reg = reg.with(console_subscriber::spawn());
reg.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.try_init()?;

View File

@@ -15,6 +15,7 @@ use serde_json::Map;
use serde_json::Value;
use smol_str::SmolStr;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
@@ -59,6 +60,7 @@ enum Payload {
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
const SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART: &str = "api";
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -176,10 +178,11 @@ fn get_conn_info(
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split(':').next());
if hostname != sni_hostname {
// sni_hostname has to be either the same as hostname or the one used in serverless driver.
if !check_matches(&sni_hostname, hostname)? {
return Err(anyhow::anyhow!("mismatched SNI hostname and hostname"));
} else if let Some(h) = host_header {
if h != hostname {
if h != sni_hostname {
return Err(anyhow::anyhow!("mismatched host header and hostname"));
}
}
@@ -213,6 +216,20 @@ fn get_conn_info(
})
}
fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Error> {
if sni_hostname == hostname {
return Ok(true);
}
let (sni_hostname_first, sni_hostname_rest) = sni_hostname
.split_once('.')
.ok_or_else(|| anyhow::anyhow!("Unexpected sni format."))?;
let (_, hostname_rest) = hostname
.split_once('.')
.ok_or_else(|| anyhow::anyhow!("Unexpected hostname format."))?;
Ok(sni_hostname_rest == hostname_rest
&& sni_hostname_first == SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART)
}
// TODO: return different http error codes
pub async fn handle(
tls: &'static TlsConfig,
@@ -231,7 +248,7 @@ pub async fn handle(
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let mut message = format!("{:?}", e);
let db_error = e
.downcast_ref::<tokio_postgres::Error>()
.and_then(|e| e.as_db_error());
@@ -244,7 +261,25 @@ pub async fn handle(
.unwrap_or_default()
}
// TODO(conrad): db_error.position()
if let Some(db_error) = db_error {
db_error.message().clone_into(&mut message);
}
let position = db_error.and_then(|db| db.position());
let (position, internal_position, internal_query) = match position {
Some(ErrorPosition::Original(position)) => (
Value::String(position.to_string()),
Value::Null,
Value::Null,
),
Some(ErrorPosition::Internal { position, query }) => (
Value::Null,
Value::String(position.to_string()),
Value::String(query.clone()),
),
None => (Value::Null, Value::Null, Value::Null),
};
let code = get(db_error, |db| db.code().code());
let severity = get(db_error, |db| db.severity());
let detail = get(db_error, |db| db.detail());
@@ -256,7 +291,7 @@ pub async fn handle(
let datatype = get(db_error, |db| db.datatype());
let constraint = get(db_error, |db| db.constraint());
let file = get(db_error, |db| db.file());
let line = get(db_error, |db| db.line());
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
let routine = get(db_error, |db| db.routine());
error!(
@@ -271,12 +306,15 @@ pub async fn handle(
"code": code,
"detail": detail,
"hint": hint,
"position": position,
"internalPosition": internal_position,
"internalQuery": internal_query,
"severity": severity,
"where": where_,
"table": table,
"column": column,
"schema": schema,
"datatype": datatype,
"dataType": datatype,
"constraint": constraint,
"file": file,
"line": line,

View File

@@ -9,6 +9,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
wait_for_last_flush_lsn,
)
@@ -75,9 +76,15 @@ class EvictionOrder(str, enum.Enum):
if self == EvictionOrder.ABSOLUTE_ORDER:
return {"type": "AbsoluteAccessed"}
elif self == EvictionOrder.RELATIVE_ORDER_EQUAL:
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": False}}
return {
"type": "RelativeAccessed",
"args": {"highest_layer_count_loses_first": False},
}
elif self == EvictionOrder.RELATIVE_ORDER_SPARE:
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": True}}
return {
"type": "RelativeAccessed",
"args": {"highest_layer_count_loses_first": True},
}
else:
raise RuntimeError(f"not implemented: {self}")
@@ -91,14 +98,24 @@ class EvictionEnv:
layer_size: int
pgbench_init_lsns: Dict[TenantId, Lsn]
def timelines_du(self) -> Tuple[int, int, int]:
@property
def pageserver(self):
"""
Shortcut for tests that only use one pageserver.
"""
return self.neon_env.pageserver
def timelines_du(self, pageserver: NeonPageserver) -> Tuple[int, int, int]:
return poor_mans_du(
self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], verbose=False
self.neon_env,
[(tid, tlid) for tid, tlid in self.timelines],
pageserver,
verbose=False,
)
def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]:
def du_by_timeline(self, pageserver: NeonPageserver) -> Dict[Tuple[TenantId, TimelineId], int]:
return {
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], verbose=True)[0]
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], pageserver, verbose=True)[0]
for tid, tlid in self.timelines
}
@@ -126,7 +143,13 @@ class EvictionEnv:
_avg = cur.fetchone()
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior, eviction_order: EvictionOrder
self,
pageserver: NeonPageserver,
period,
max_usage_pct,
min_avail_bytes,
mock_behavior,
eviction_order: EvictionOrder,
):
disk_usage_config = {
"period": period,
@@ -138,7 +161,12 @@ class EvictionEnv:
enc = toml.TomlEncoder()
self.neon_env.pageserver.start(
# these can sometimes happen during startup before any tenants have been
# loaded, so nothing can be evicted, we just wait for next iteration which
# is able to evict.
pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
pageserver.start(
overrides=(
"--pageserver-config-override=disk_usage_based_eviction="
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
@@ -152,15 +180,10 @@ class EvictionEnv:
)
def statvfs_called():
assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*")
assert pageserver.log_contains(".*running mocked statvfs.*")
wait_until(10, 1, statvfs_called)
# these can sometimes happen during startup before any tenants have been
# loaded, so nothing can be evicted, we just wait for next iteration which
# is able to evict.
self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
def human_bytes(amt: float) -> str:
suffixes = ["", "Ki", "Mi", "Gi"]
@@ -175,23 +198,28 @@ def human_bytes(amt: float) -> str:
raise RuntimeError("unreachable")
@pytest.fixture
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
def _eviction_env(
request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int
) -> EvictionEnv:
"""
Creates two tenants, one somewhat larger than the other.
"""
log.info(f"setting up eviction_env for test {request.node.name}")
neon_env_builder.num_pageservers = num_pageservers
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# initial tenant will not be present on this pageserver
env = neon_env_builder.init_configs()
env.start()
pageserver_http = env.pageserver.http_client()
# We will create all tenants on the 0th pageserver
pageserver_http = env.pageservers[0].http_client()
# allow because we are invoking this manually; we always warn on executing disk based eviction
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
for ps in env.pageservers:
ps.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
# Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers.
# Large count of layers and small layer size is good for testing because it makes evictions predictable.
@@ -216,7 +244,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()])
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id, pageserver_id=1)
timelines.append((tenant_id, timeline_id))
@@ -252,6 +280,20 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
return eviction_env
@pytest.fixture
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=1)
@pytest.fixture
def eviction_env_ha(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
"""
Variant of the eviction environment with two pageservers for testing eviction on
HA configurations with a secondary location.
"""
return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=2)
def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
env = eviction_env
@@ -264,10 +306,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
healthy_tenant_id, healthy_timeline_id = env.timelines[1]
broken_size_pre, _, _ = poor_mans_du(
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
env.neon_env,
[(broken_tenant_id, broken_timeline_id)],
env.pageserver,
verbose=True,
)
healthy_size_pre, _, _ = poor_mans_du(
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
env.neon_env,
[(healthy_tenant_id, healthy_timeline_id)],
env.pageserver,
verbose=True,
)
# try to evict everything, then validate that broken tenant wasn't touched
@@ -277,10 +325,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
log.info(f"{response}")
broken_size_post, _, _ = poor_mans_du(
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
env.neon_env,
[(broken_tenant_id, broken_timeline_id)],
env.pageserver,
verbose=True,
)
healthy_size_post, _, _ = poor_mans_du(
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
env.neon_env,
[(healthy_tenant_id, healthy_timeline_id)],
env.pageserver,
verbose=True,
)
assert broken_size_pre == broken_size_post, "broken tenant should not be touched"
@@ -302,7 +356,7 @@ def test_pageserver_evicts_until_pressure_is_relieved(
env = eviction_env
pageserver_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
target = total_on_disk // 2
@@ -311,7 +365,7 @@ def test_pageserver_evicts_until_pressure_is_relieved(
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
actual_change = total_on_disk - later_total_on_disk
@@ -336,8 +390,8 @@ def test_pageserver_respects_overridden_resident_size(
env = eviction_env
ps_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
du_by_timeline = env.du_by_timeline()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
du_by_timeline = env.du_by_timeline(env.pageserver)
log.info("du_by_timeline: %s", du_by_timeline)
assert len(du_by_timeline) == 2, "this test assumes two tenants"
@@ -379,8 +433,8 @@ def test_pageserver_respects_overridden_resident_size(
GLOBAL_LRU_LOG_LINE,
), "this test is pointless if it fell back to global LRU"
(later_total_on_disk, _, _) = env.timelines_du()
later_du_by_timeline = env.du_by_timeline()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
later_du_by_timeline = env.du_by_timeline(env.pageserver)
log.info("later_du_by_timeline: %s", later_du_by_timeline)
actual_change = total_on_disk - later_total_on_disk
@@ -412,7 +466,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E
env = eviction_env
ps_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
target = total_on_disk
response = ps_http.disk_usage_eviction_run(
@@ -420,7 +474,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
actual_change = total_on_disk - later_total_on_disk
assert 0 <= actual_change, "nothing can load layers during this test"
assert actual_change >= target, "eviction must always evict more than target"
@@ -448,8 +502,8 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
env = eviction_env
ps_http = env.pageserver_http
(total_on_disk, _, _) = env.timelines_du()
du_by_timeline = env.du_by_timeline()
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
du_by_timeline = env.du_by_timeline(env.pageserver)
# pick smaller or greater (iteration order is insertion order of scale=4 and scale=6)
[warm, cold] = list(du_by_timeline.keys())
@@ -467,12 +521,12 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
actual_change = total_on_disk - later_total_on_disk
assert 0 <= actual_change, "nothing can load layers during this test"
assert actual_change >= target, "eviction must always evict more than target"
later_du_by_timeline = env.du_by_timeline()
later_du_by_timeline = env.du_by_timeline(env.pageserver)
for tenant, later_tenant_usage in later_du_by_timeline.items():
assert (
later_tenant_usage < du_by_timeline[tenant]
@@ -508,7 +562,10 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
def poor_mans_du(
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]], verbose: bool = False
env: NeonEnv,
timelines: list[Tuple[TenantId, TimelineId]],
pageserver: NeonPageserver,
verbose: bool = False,
) -> Tuple[int, int, int]:
"""
Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples;
@@ -518,7 +575,7 @@ def poor_mans_du(
largest_layer = 0
smallest_layer = None
for tenant_id, timeline_id in timelines:
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id)
assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}"
total = 0
for file in timeline_dir.iterdir():
@@ -549,6 +606,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv):
env = eviction_env
env.neon_env.pageserver.stop()
env.pageserver_start_with_disk_usage_eviction(
env.pageserver,
period="1s",
max_usage_pct=90,
min_avail_bytes=0,
@@ -573,11 +631,12 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
env.neon_env.pageserver.stop()
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
total_size, _, _ = env.timelines_du()
total_size, _, _ = env.timelines_du(env.pageserver)
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
env.pageserver_start_with_disk_usage_eviction(
env.pageserver,
period="1s",
max_usage_pct=33,
min_avail_bytes=0,
@@ -597,7 +656,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du()
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage"
@@ -612,13 +671,14 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
env.neon_env.pageserver.stop()
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
total_size, _, _ = env.timelines_du()
total_size, _, _ = env.timelines_du(env.pageserver)
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
min_avail_bytes = total_size // 3
env.pageserver_start_with_disk_usage_eviction(
env.pageserver,
period="1s",
max_usage_pct=100,
min_avail_bytes=min_avail_bytes,
@@ -638,7 +698,67 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du()
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
assert (
total_size - post_eviction_total_size >= min_avail_bytes
), "we requested at least min_avail_bytes worth of free space"
def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
env = eviction_env_ha
tenant_ids = [t[0] for t in env.timelines]
log.info("Setting up secondary location...")
ps_attached = env.neon_env.pageservers[0]
ps_secondary = env.neon_env.pageservers[1]
for tenant_id in tenant_ids:
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
readback_conf = ps_secondary.read_tenant_location_conf(tenant_id)
log.info(f"Read back conf: {readback_conf}")
# Request secondary location to download all layers that the attached location has
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
ps_secondary.http_client().tenant_secondary_download(tenant_id)
# Configure the secondary pageserver to have a phony small disk size
ps_secondary.stop()
total_size, _, _ = env.timelines_du(ps_secondary)
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
min_avail_bytes = total_size // 3
env.pageserver_start_with_disk_usage_eviction(
ps_secondary,
period="1s",
max_usage_pct=100,
min_avail_bytes=min_avail_bytes,
mock_behavior={
"type": "Success",
"blocksize": blocksize,
"total_blocks": total_blocks,
# Only count layer files towards used bytes in the mock_statvfs.
# This avoids accounting for metadata files & tenant conf in the tests.
"name_filter": ".*__.*",
},
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)
def relieved_log_message():
assert ps_secondary.log_contains(".*disk usage pressure relieved")
wait_until(10, 1, relieved_log_message)
post_eviction_total_size, _, _ = env.timelines_du(ps_secondary)
assert (
total_size - post_eviction_total_size >= min_avail_bytes

View File

@@ -203,6 +203,21 @@ def test_close_on_connections_exit(static_proxy: NeonProxy):
static_proxy.wait_for_exit()
def test_sql_over_http_serverless_driver(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
response = requests.post(
f"https://api.localtest.me:{static_proxy.external_http_port}/sql",
data=json.dumps({"query": "select 42 as answer", "params": []}),
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200, response.text
rows = response.json()["rows"]
assert rows == [{"answer": 42}]
def test_sql_over_http(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")