mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Compare commits
11 Commits
releases/2
...
tokio-cons
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e108ba8904 | ||
|
|
a1cc1a6373 | ||
|
|
4b0204ede5 | ||
|
|
bf4e708646 | ||
|
|
887e94d7da | ||
|
|
df9e9de541 | ||
|
|
3f2187eb92 | ||
|
|
2a3cfc9665 | ||
|
|
d34adf46b4 | ||
|
|
0bac8ddd76 | ||
|
|
0e1ef3713e |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -1131,7 +1131,7 @@ jobs:
|
||||
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||
exit 1
|
||||
|
||||
120
Cargo.lock
generated
120
Cargo.lock
generated
@@ -1226,6 +1226,43 @@ dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-api"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"prost 0.12.3",
|
||||
"prost-types 0.12.3",
|
||||
"tonic 0.10.2",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-subscriber"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e"
|
||||
dependencies = [
|
||||
"console-api",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"futures-task",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"prost-types 0.12.3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.10.2",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-oid"
|
||||
version = "0.9.5"
|
||||
@@ -3094,7 +3131,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry-http",
|
||||
"opentelemetry-proto",
|
||||
"prost",
|
||||
"prost 0.11.9",
|
||||
"reqwest",
|
||||
"thiserror",
|
||||
]
|
||||
@@ -3108,7 +3145,7 @@ dependencies = [
|
||||
"futures",
|
||||
"futures-util",
|
||||
"opentelemetry",
|
||||
"prost",
|
||||
"prost 0.11.9",
|
||||
"tonic 0.8.3",
|
||||
]
|
||||
|
||||
@@ -3823,7 +3860,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
"prost-derive 0.11.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3840,8 +3887,8 @@ dependencies = [
|
||||
"multimap",
|
||||
"petgraph",
|
||||
"prettyplease 0.1.25",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"prost 0.11.9",
|
||||
"prost-types 0.11.9",
|
||||
"regex",
|
||||
"syn 1.0.109",
|
||||
"tempfile",
|
||||
@@ -3861,13 +3908,35 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.11.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"prost 0.11.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e"
|
||||
dependencies = [
|
||||
"prost 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3883,6 +3952,7 @@ dependencies = [
|
||||
"camino-tempfile",
|
||||
"chrono",
|
||||
"clap",
|
||||
"console-subscriber",
|
||||
"consumption_metrics",
|
||||
"dashmap",
|
||||
"futures",
|
||||
@@ -5148,7 +5218,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"prost",
|
||||
"prost 0.11.9",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.9.2",
|
||||
@@ -5467,6 +5537,7 @@ dependencies = [
|
||||
"signal-hook-registry",
|
||||
"socket2 0.5.5",
|
||||
"tokio-macros",
|
||||
"tracing",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@@ -5657,8 +5728,8 @@ dependencies = [
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"prost-derive",
|
||||
"prost 0.11.9",
|
||||
"prost-derive 0.11.9",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -5689,7 +5760,7 @@ dependencies = [
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"prost 0.11.9",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile",
|
||||
"tokio",
|
||||
@@ -5701,6 +5772,33 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.21.1",
|
||||
"bytes",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost 0.12.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-build"
|
||||
version = "0.9.2"
|
||||
@@ -6606,7 +6704,7 @@ dependencies = [
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"parquet",
|
||||
"prost",
|
||||
"prost 0.11.9",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"regex-automata 0.4.3",
|
||||
|
||||
@@ -64,6 +64,7 @@ cfg-if = "1.0.0"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
close_fds = "0.3.2"
|
||||
console-subscriber = "0.2.0"
|
||||
comfy-table = "6.1"
|
||||
const_format = "0.2"
|
||||
crc32c = "0.6"
|
||||
|
||||
@@ -35,6 +35,7 @@ ARG BUILD_TAG
|
||||
ARG RUSTC_WRAPPER=cachepot
|
||||
ENV AWS_REGION=eu-central-1
|
||||
ENV CACHEPOT_S3_KEY_PREFIX=cachepot
|
||||
ENV RUSTFLAGS="--cfg tokio_unstable"
|
||||
ARG CACHEPOT_BUCKET=neon-github-dev
|
||||
#ARG AWS_ACCESS_KEY_ID
|
||||
#ARG AWS_SECRET_ACCESS_KEY
|
||||
|
||||
@@ -288,6 +288,21 @@ async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiErr
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let inspect_req = json_request::<InspectRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let locked = state.write().await;
|
||||
let tenant_state = locked.tenants.get(&inspect_req.tenant_id);
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
InspectResponse {
|
||||
attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
endpoint::make_router()
|
||||
.data(Arc::new(State::new(persistent_state)))
|
||||
@@ -295,6 +310,9 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body,
|
||||
.post("/validate", |r| request_span(r, handle_validate))
|
||||
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
|
||||
.post("/inspect", |r| request_span(r, handle_inspect))
|
||||
.post("/tenant/:tenant_id", |r| {
|
||||
request_span(r, handle_tenant_create)
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
||||
@@ -16,6 +16,7 @@ use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::tenant_migration::migrate_tenant;
|
||||
use control_plane::{broker, local_env};
|
||||
use pageserver_api::models::TimelineInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
@@ -279,7 +280,7 @@ async fn get_timeline_infos(
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<HashMap<TimelineId, TimelineInfo>> {
|
||||
Ok(get_default_pageserver(env)
|
||||
.timeline_list(tenant_id)
|
||||
.timeline_list(&TenantShardId::unsharded(*tenant_id))
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|timeline_info| (timeline_info.timeline_id, timeline_info))
|
||||
@@ -490,7 +491,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
match timeline_match.subcommand() {
|
||||
Some(("list", list_match)) => {
|
||||
let tenant_id = get_tenant_id(list_match, env)?;
|
||||
let timelines = pageserver.timeline_list(&tenant_id).await?;
|
||||
let timelines = pageserver
|
||||
.timeline_list(&TenantShardId::unsharded(tenant_id))
|
||||
.await?;
|
||||
print_timelines_tree(timelines, env.timeline_name_mappings())?;
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
|
||||
@@ -48,6 +48,7 @@ use anyhow::{anyhow, bail, Context, Result};
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
use nix::sys::signal::kill;
|
||||
use nix::sys::signal::Signal;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -543,6 +544,7 @@ impl Endpoint {
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE.0 as usize),
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::time::Duration;
|
||||
use anyhow::{bail, Context};
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo};
|
||||
use pageserver_api::models::{self, LocationConfig, ShardParameters, TenantInfo, TimelineInfo};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use postgres_backend::AuthType;
|
||||
@@ -376,6 +376,7 @@ impl PageServerNode {
|
||||
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
|
||||
generation,
|
||||
config,
|
||||
shard_parameters: ShardParameters::default(),
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -471,18 +472,21 @@ impl PageServerNode {
|
||||
|
||||
pub async fn location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
config: LocationConfig,
|
||||
flush_ms: Option<Duration>,
|
||||
) -> anyhow::Result<()> {
|
||||
Ok(self
|
||||
.http_client
|
||||
.location_config(tenant_id, config, flush_ms)
|
||||
.location_config(tenant_shard_id, config, flush_ms)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
|
||||
Ok(self.http_client.list_timelines(*tenant_id).await?)
|
||||
pub async fn timeline_list(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
) -> anyhow::Result<Vec<TimelineInfo>> {
|
||||
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
|
||||
}
|
||||
|
||||
pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> {
|
||||
|
||||
@@ -24,7 +24,9 @@ async fn get_lsns(
|
||||
tenant_id: TenantId,
|
||||
pageserver: &PageServerNode,
|
||||
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
|
||||
let timelines = pageserver.timeline_list(&tenant_id).await?;
|
||||
let timelines = pageserver
|
||||
.timeline_list(&TenantShardId::unsharded(tenant_id))
|
||||
.await?;
|
||||
Ok(timelines
|
||||
.into_iter()
|
||||
.map(|t| (t.timeline_id, t.last_record_lsn))
|
||||
@@ -120,7 +122,9 @@ pub async fn migrate_tenant(
|
||||
.attach_hook(tenant_id, dest_ps.conf.id)
|
||||
.await?;
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None).await?;
|
||||
dest_ps
|
||||
.location_config(TenantShardId::unsharded(tenant_id), dest_conf, None)
|
||||
.await?;
|
||||
println!("✅ Migration complete");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -130,7 +134,11 @@ pub async fn migrate_tenant(
|
||||
let stale_conf =
|
||||
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
|
||||
origin_ps
|
||||
.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))
|
||||
.location_config(
|
||||
TenantShardId::unsharded(tenant_id),
|
||||
stale_conf,
|
||||
Some(Duration::from_secs(10)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?);
|
||||
@@ -156,7 +164,9 @@ pub async fn migrate_tenant(
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
|
||||
|
||||
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None).await?;
|
||||
dest_ps
|
||||
.location_config(TenantShardId::unsharded(tenant_id), dest_conf, None)
|
||||
.await?;
|
||||
|
||||
if let Some(baseline) = baseline_lsns {
|
||||
println!("🕑 Waiting for LSN to catch up...");
|
||||
@@ -203,7 +213,7 @@ pub async fn migrate_tenant(
|
||||
other_ps.conf.id
|
||||
);
|
||||
other_ps
|
||||
.location_config(tenant_id, secondary_conf, None)
|
||||
.location_config(TenantShardId::unsharded(tenant_id), secondary_conf, None)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -212,7 +222,9 @@ pub async fn migrate_tenant(
|
||||
dest_ps.conf.id
|
||||
);
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None).await?;
|
||||
dest_ps
|
||||
.location_config(TenantShardId::unsharded(tenant_id), dest_conf, None)
|
||||
.await?;
|
||||
|
||||
println!("✅ Migration complete");
|
||||
|
||||
|
||||
@@ -75,6 +75,10 @@ pub struct ComputeSpec {
|
||||
pub remote_extensions: Option<RemoteExtSpec>,
|
||||
|
||||
pub pgbouncer_settings: Option<HashMap<String, String>>,
|
||||
|
||||
// Stripe size for pageserver sharding, in pages
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: Option<usize>,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
|
||||
@@ -18,7 +18,10 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{reltag::RelTag, shard::TenantShardId};
|
||||
use crate::{
|
||||
reltag::RelTag,
|
||||
shard::{ShardCount, ShardStripeSize, TenantShardId},
|
||||
};
|
||||
use anyhow::bail;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
@@ -188,6 +191,31 @@ pub struct TimelineCreateRequest {
|
||||
pub pg_version: Option<u32>,
|
||||
}
|
||||
|
||||
/// Parameters that apply to all shards in a tenant. Used during tenant creation.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct ShardParameters {
|
||||
pub count: ShardCount,
|
||||
pub stripe_size: ShardStripeSize,
|
||||
}
|
||||
|
||||
impl ShardParameters {
|
||||
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.count == ShardCount(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ShardParameters {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
count: ShardCount(0),
|
||||
stripe_size: Self::DEFAULT_STRIPE_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantCreateRequest {
|
||||
@@ -195,6 +223,12 @@ pub struct TenantCreateRequest {
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
|
||||
// If omitted, create a single shard with TenantShardId::unsharded()
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
|
||||
pub shard_parameters: ShardParameters,
|
||||
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
@@ -297,7 +331,7 @@ pub struct StatusResponse {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantLocationConfigRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub tenant_id: TenantShardId,
|
||||
#[serde(flatten)]
|
||||
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
@@ -660,6 +694,17 @@ pub struct PagestreamDbSizeResponse {
|
||||
pub db_size: i64,
|
||||
}
|
||||
|
||||
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
|
||||
// that require pageserver-internal types. It is sufficient to get the total size.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantHistorySize {
|
||||
pub id: TenantId,
|
||||
/// Size is a mixture of WAL and logical size, so the unit is bytes.
|
||||
///
|
||||
/// Will be none if `?inputs_only=true` was given.
|
||||
pub size: Option<u64>,
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
pub fn serialize(&self) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use std::{ops::RangeInclusive, str::FromStr};
|
||||
|
||||
use crate::key::{is_rel_block_key, Key};
|
||||
use crate::{
|
||||
key::{is_rel_block_key, Key},
|
||||
models::ShardParameters,
|
||||
};
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror;
|
||||
@@ -85,6 +88,12 @@ impl TenantShardId {
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
||||
}
|
||||
pub fn to_index(&self) -> ShardIndex {
|
||||
ShardIndex {
|
||||
shard_number: self.shard_number,
|
||||
shard_count: self.shard_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Formatting helper
|
||||
@@ -403,6 +412,17 @@ impl ShardIdentity {
|
||||
}
|
||||
}
|
||||
|
||||
/// For use when creating ShardIdentity instances for new shards, where a creation request
|
||||
/// specifies the ShardParameters that apply to all shards.
|
||||
pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
|
||||
Self {
|
||||
number,
|
||||
count: params.count,
|
||||
layout: LAYOUT_V1,
|
||||
stripe_size: params.stripe_size,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_broken(&self) -> bool {
|
||||
self.layout == LAYOUT_BROKEN
|
||||
}
|
||||
|
||||
@@ -5,7 +5,9 @@ use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::num::NonZeroU32;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||
use anyhow::Result;
|
||||
@@ -13,12 +15,14 @@ use azure_core::request_options::{MaxResults, Metadata, Range};
|
||||
use azure_core::RetryOptions;
|
||||
use azure_identity::DefaultAzureCredential;
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::CopyStatus;
|
||||
use azure_storage_blobs::prelude::ClientBuilder;
|
||||
use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
|
||||
use bytes::Bytes;
|
||||
use futures::stream::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use http_types::StatusCode;
|
||||
use http_types::{StatusCode, Url};
|
||||
use tokio::time::Instant;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::s3_bucket::RequestKind;
|
||||
@@ -323,10 +327,49 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn copy(&self, _from: &RemotePath, _to: &RemotePath) -> anyhow::Result<()> {
|
||||
Err(anyhow::anyhow!(
|
||||
"copy for azure blob storage is not implemented"
|
||||
))
|
||||
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
|
||||
let _permit = self.permit(RequestKind::Copy).await;
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
|
||||
|
||||
let source_url = format!(
|
||||
"{}/{}",
|
||||
self.client.url()?,
|
||||
self.relative_path_to_name(from)
|
||||
);
|
||||
let builder = blob_client.copy(Url::from_str(&source_url)?);
|
||||
|
||||
let result = builder.into_future().await?;
|
||||
|
||||
let mut copy_status = result.copy_status;
|
||||
let start_time = Instant::now();
|
||||
const MAX_WAIT_TIME: Duration = Duration::from_secs(60);
|
||||
loop {
|
||||
match copy_status {
|
||||
CopyStatus::Aborted => {
|
||||
anyhow::bail!("Received abort for copy from {from} to {to}.");
|
||||
}
|
||||
CopyStatus::Failed => {
|
||||
anyhow::bail!("Received failure response for copy from {from} to {to}.");
|
||||
}
|
||||
CopyStatus::Success => return Ok(()),
|
||||
CopyStatus::Pending => (),
|
||||
}
|
||||
// The copy is taking longer. Waiting a second and then re-trying.
|
||||
// TODO estimate time based on copy_progress and adjust time based on that
|
||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||
let properties = blob_client.get_properties().into_future().await?;
|
||||
let Some(status) = properties.blob.properties.copy_status else {
|
||||
tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
|
||||
return Ok(());
|
||||
};
|
||||
if start_time.elapsed() > MAX_WAIT_TIME {
|
||||
anyhow::bail!("Copy from from {from} to {to} took longer than limit MAX_WAIT_TIME={}s. copy_pogress={:?}.",
|
||||
MAX_WAIT_TIME.as_secs_f32(),
|
||||
properties.blob.properties.copy_progress,
|
||||
);
|
||||
}
|
||||
copy_status = status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -263,6 +263,44 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledAzure)]
|
||||
#[tokio::test]
|
||||
async fn azure_copy_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> {
|
||||
let MaybeEnabledAzure::Enabled(ctx) = ctx else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let path = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
let path_dest = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_dest", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
|
||||
let (data, len) = wrap_stream(orig.clone());
|
||||
|
||||
ctx.client.upload(data, len, &path, None).await?;
|
||||
|
||||
// Normal download request
|
||||
ctx.client.copy_object(&path, &path_dest).await?;
|
||||
|
||||
let dl = ctx.client.download(&path_dest).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
debug!("Cleanup: deleting file at path {path:?}");
|
||||
ctx.client
|
||||
.delete_objects(&[path.clone(), path_dest.clone()])
|
||||
.await
|
||||
.with_context(|| format!("{path:?} removal"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnabledAzure {
|
||||
client: Arc<GenericRemoteStorage>,
|
||||
base_prefix: &'static str,
|
||||
|
||||
@@ -259,6 +259,44 @@ async fn s3_upload_download_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledS3)]
|
||||
#[tokio::test]
|
||||
async fn s3_copy_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
|
||||
let MaybeEnabledS3::Enabled(ctx) = ctx else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let path = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_to_copy", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
let path_dest = RemotePath::new(Utf8Path::new(
|
||||
format!("{}/file_dest", ctx.base_prefix).as_str(),
|
||||
))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let orig = bytes::Bytes::from_static("remote blob data content".as_bytes());
|
||||
|
||||
let (data, len) = wrap_stream(orig.clone());
|
||||
|
||||
ctx.client.upload(data, len, &path, None).await?;
|
||||
|
||||
// Normal download request
|
||||
ctx.client.copy_object(&path, &path_dest).await?;
|
||||
|
||||
let dl = ctx.client.download(&path_dest).await?;
|
||||
let buf = download_to_vec(dl).await?;
|
||||
assert_eq!(&buf, &orig);
|
||||
|
||||
debug!("Cleanup: deleting file at path {path:?}");
|
||||
ctx.client
|
||||
.delete_objects(&[path.clone(), path_dest.clone()])
|
||||
.await
|
||||
.with_context(|| format!("{path:?} removal"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnabledS3 {
|
||||
client: Arc<GenericRemoteStorage>,
|
||||
base_prefix: &'static str,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use pageserver_api::{models::*, shard::TenantShardId};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use utils::{
|
||||
http::error::HttpErrorBody,
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -22,8 +22,8 @@ pub enum Error {
|
||||
#[error("receive error body: {0}")]
|
||||
ReceiveErrorBody(String),
|
||||
|
||||
#[error("pageserver API: {0}")]
|
||||
ApiError(String),
|
||||
#[error("pageserver API: {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -41,7 +41,7 @@ impl ResponseErrorMessageExt for reqwest::Response {
|
||||
|
||||
let url = self.url().to_owned();
|
||||
Err(match self.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => Error::ApiError(msg),
|
||||
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
|
||||
Err(_) => {
|
||||
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
|
||||
}
|
||||
@@ -71,9 +71,9 @@ impl Client {
|
||||
|
||||
pub async fn tenant_details(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<pageserver_api::models::TenantDetails> {
|
||||
let uri = format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint);
|
||||
let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
|
||||
self.get(uri)
|
||||
.await?
|
||||
.json()
|
||||
@@ -83,9 +83,12 @@ impl Client {
|
||||
|
||||
pub async fn list_timelines(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
|
||||
let uri = format!("{}/v1/tenant/{tenant_id}/timeline", self.mgmt_api_endpoint);
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
self.get(&uri)
|
||||
.await?
|
||||
.json()
|
||||
@@ -179,23 +182,23 @@ impl Client {
|
||||
"{}/v1/tenant/{}/secondary/download",
|
||||
self.mgmt_api_endpoint, tenant_id
|
||||
);
|
||||
self.request(Method::POST, &uri, ())
|
||||
.await?
|
||||
.error_for_status()
|
||||
.map(|_| ())
|
||||
.map_err(|e| Error::ApiError(format!("{}", e)))
|
||||
self.request(Method::POST, &uri, ()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
config: LocationConfig,
|
||||
flush_ms: Option<std::time::Duration>,
|
||||
) -> Result<()> {
|
||||
let req_body = TenantLocationConfigRequest { tenant_id, config };
|
||||
let req_body = TenantLocationConfigRequest {
|
||||
tenant_id: tenant_shard_id,
|
||||
config,
|
||||
};
|
||||
let path = format!(
|
||||
"{}/v1/tenant/{}/location_config",
|
||||
self.mgmt_api_endpoint, tenant_id
|
||||
self.mgmt_api_endpoint, tenant_shard_id
|
||||
);
|
||||
let path = if let Some(flush_ms) = flush_ms {
|
||||
format!("{}?flush_ms={}", path, flush_ms.as_millis())
|
||||
@@ -233,4 +236,19 @@ impl Client {
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn tenant_synthetic_size(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<TenantHistorySize> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/synthetic_size",
|
||||
self.mgmt_api_endpoint, tenant_shard_id
|
||||
);
|
||||
self.get(&uri)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::id::{TenantId, TenantTimelineId};
|
||||
|
||||
@@ -31,7 +32,10 @@ pub async fn get_pageserver_tenant_timelines_unsharded(
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
|
||||
mgmt_api_client
|
||||
.tenant_details(TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
|
||||
@@ -349,10 +349,10 @@ async fn main_impl(
|
||||
|
||||
let work_sender_task = tokio::spawn(work_sender);
|
||||
|
||||
info!("waiting for everything to become ready");
|
||||
start_work_barrier.wait().await;
|
||||
info!("work started");
|
||||
if let Some(runtime) = args.runtime {
|
||||
info!("waiting for everything to become ready");
|
||||
start_work_barrier.wait().await;
|
||||
info!("work started");
|
||||
tokio::time::sleep(runtime.into()).await;
|
||||
info!("runtime over, signalling cancellation");
|
||||
cancel.cancel();
|
||||
|
||||
@@ -527,6 +527,7 @@ fn start_pageserver(
|
||||
conf,
|
||||
remote_storage.clone(),
|
||||
disk_usage_eviction_state.clone(),
|
||||
tenant_manager.clone(),
|
||||
background_jobs_barrier.clone(),
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -47,21 +47,24 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
use utils::completion;
|
||||
use utils::serde_percent::Percent;
|
||||
use utils::{completion, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
self,
|
||||
storage_layer::{AsLayerDesc, EvictionError, Layer},
|
||||
mgr::TenantManager,
|
||||
remote_timeline_client::LayerFileMetadata,
|
||||
secondary::SecondaryTenant,
|
||||
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName},
|
||||
Timeline,
|
||||
},
|
||||
};
|
||||
@@ -125,6 +128,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
conf: &'static PageServerConf,
|
||||
storage: GenericRemoteStorage,
|
||||
state: Arc<State>,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
background_jobs_barrier: completion::Barrier,
|
||||
) -> anyhow::Result<()> {
|
||||
let Some(task_config) = &conf.disk_usage_based_eviction else {
|
||||
@@ -150,8 +154,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
_ = background_jobs_barrier.wait() => { }
|
||||
};
|
||||
|
||||
disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel)
|
||||
.await;
|
||||
disk_usage_eviction_task(&state, task_config, &storage, tenant_manager, cancel).await;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
@@ -164,7 +167,7 @@ async fn disk_usage_eviction_task(
|
||||
state: &State,
|
||||
task_config: &DiskUsageEvictionTaskConfig,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenants_dir: &Utf8Path,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
scopeguard::defer! {
|
||||
@@ -191,7 +194,7 @@ async fn disk_usage_eviction_task(
|
||||
state,
|
||||
task_config,
|
||||
storage,
|
||||
tenants_dir,
|
||||
&tenant_manager,
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
@@ -226,15 +229,17 @@ async fn disk_usage_eviction_task_iteration(
|
||||
state: &State,
|
||||
task_config: &DiskUsageEvictionTaskConfig,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenants_dir: &Utf8Path,
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
|
||||
let tenants_dir = tenant_manager.get_conf().tenants_path();
|
||||
let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config)
|
||||
.context("get filesystem-level disk usage before evictions")?;
|
||||
let res = disk_usage_eviction_task_iteration_impl(
|
||||
state,
|
||||
storage,
|
||||
usage_pre,
|
||||
tenant_manager,
|
||||
task_config.eviction_order,
|
||||
cancel,
|
||||
)
|
||||
@@ -248,7 +253,7 @@ async fn disk_usage_eviction_task_iteration(
|
||||
}
|
||||
IterationOutcome::Finished(outcome) => {
|
||||
// Verify with statvfs whether we made any real progress
|
||||
let after = filesystem_level_usage::get(tenants_dir, task_config)
|
||||
let after = filesystem_level_usage::get(&tenants_dir, task_config)
|
||||
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
|
||||
.context("get filesystem-level disk usage after evictions")?;
|
||||
|
||||
@@ -324,6 +329,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
state: &State,
|
||||
_storage: &GenericRemoteStorage,
|
||||
usage_pre: U,
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
eviction_order: EvictionOrder,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<IterationOutcome<U>> {
|
||||
@@ -344,29 +350,29 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
"running disk usage based eviction due to pressure"
|
||||
);
|
||||
|
||||
let candidates = match collect_eviction_candidates(eviction_order, cancel).await? {
|
||||
EvictionCandidates::Cancelled => {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
EvictionCandidates::Finished(partitioned) => partitioned,
|
||||
};
|
||||
let candidates =
|
||||
match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? {
|
||||
EvictionCandidates::Cancelled => {
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
EvictionCandidates::Finished(partitioned) => partitioned,
|
||||
};
|
||||
|
||||
// Debug-log the list of candidates
|
||||
let now = SystemTime::now();
|
||||
for (i, (partition, candidate)) in candidates.iter().enumerate() {
|
||||
let nth = i + 1;
|
||||
let desc = candidate.layer.layer_desc();
|
||||
let total_candidates = candidates.len();
|
||||
let size = desc.file_size;
|
||||
let size = candidate.layer.get_file_size();
|
||||
let rel = candidate.relative_last_activity;
|
||||
debug!(
|
||||
"cand {nth}/{total_candidates}: size={size}, rel_last_activity={rel}, no_access_for={}us, partition={partition:?}, {}/{}/{}",
|
||||
now.duration_since(candidate.last_activity_ts)
|
||||
.unwrap()
|
||||
.as_micros(),
|
||||
desc.tenant_shard_id,
|
||||
desc.timeline_id,
|
||||
candidate.layer,
|
||||
candidate.layer.get_tenant_shard_id(),
|
||||
candidate.layer.get_timeline_id(),
|
||||
candidate.layer.get_name(),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -398,7 +404,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
warned = Some(usage_planned);
|
||||
}
|
||||
|
||||
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
|
||||
usage_planned.add_available_bytes(candidate.layer.get_file_size());
|
||||
evicted_amount += 1;
|
||||
}
|
||||
|
||||
@@ -463,19 +469,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
continue;
|
||||
};
|
||||
|
||||
js.spawn(async move {
|
||||
let rtc = candidate.timeline.remote_client.as_ref().expect(
|
||||
"holding the witness, all timelines must have a remote timeline client",
|
||||
);
|
||||
let file_size = candidate.layer.layer_desc().file_size;
|
||||
candidate
|
||||
.layer
|
||||
.evict_and_wait(rtc)
|
||||
.await
|
||||
.map(|()| file_size)
|
||||
.map_err(|e| (file_size, e))
|
||||
});
|
||||
match candidate.layer {
|
||||
EvictionLayer::Attached(layer) => {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
js.spawn(async move {
|
||||
layer
|
||||
.evict_and_wait()
|
||||
.await
|
||||
.map(|()| file_size)
|
||||
.map_err(|e| (file_size, e))
|
||||
});
|
||||
}
|
||||
EvictionLayer::Secondary(layer) => {
|
||||
let file_size = layer.metadata.file_size();
|
||||
let tenant_manager = tenant_manager.clone();
|
||||
|
||||
js.spawn(async move {
|
||||
layer
|
||||
.secondary_tenant
|
||||
.evict_layer(tenant_manager.get_conf(), layer.timeline_id, layer.name)
|
||||
.await;
|
||||
Ok(file_size)
|
||||
});
|
||||
}
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
@@ -502,11 +519,100 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
layer: Layer,
|
||||
last_activity_ts: SystemTime,
|
||||
relative_last_activity: finite_f32::FiniteF32,
|
||||
pub(crate) struct EvictionSecondaryLayer {
|
||||
pub(crate) secondary_tenant: Arc<SecondaryTenant>,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
pub(crate) name: LayerFileName,
|
||||
pub(crate) metadata: LayerFileMetadata,
|
||||
}
|
||||
|
||||
/// Full [`Layer`] objects are specific to tenants in attached mode. This type is a layer
|
||||
/// of indirection to store either a `Layer`, or a reference to a secondary tenant and a layer name.
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum EvictionLayer {
|
||||
Attached(Layer),
|
||||
#[allow(dead_code)]
|
||||
Secondary(EvictionSecondaryLayer),
|
||||
}
|
||||
|
||||
impl From<Layer> for EvictionLayer {
|
||||
fn from(value: Layer) -> Self {
|
||||
Self::Attached(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl EvictionLayer {
|
||||
pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
match self {
|
||||
Self::Attached(l) => &l.layer_desc().tenant_shard_id,
|
||||
Self::Secondary(sl) => sl.secondary_tenant.get_tenant_shard_id(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_timeline_id(&self) -> &TimelineId {
|
||||
match self {
|
||||
Self::Attached(l) => &l.layer_desc().timeline_id,
|
||||
Self::Secondary(sl) => &sl.timeline_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_name(&self) -> LayerFileName {
|
||||
match self {
|
||||
Self::Attached(l) => l.layer_desc().filename(),
|
||||
Self::Secondary(sl) => sl.name.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_file_size(&self) -> u64 {
|
||||
match self {
|
||||
Self::Attached(l) => l.layer_desc().file_size,
|
||||
Self::Secondary(sl) => sl.metadata.file_size(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct EvictionCandidate {
|
||||
pub(crate) layer: EvictionLayer,
|
||||
pub(crate) last_activity_ts: SystemTime,
|
||||
pub(crate) relative_last_activity: finite_f32::FiniteF32,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EvictionLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Attached(l) => l.fmt(f),
|
||||
Self::Secondary(sl) => {
|
||||
write!(f, "{}/{}", sl.timeline_id, sl.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct DiskUsageEvictionInfo {
|
||||
/// Timeline's largest layer (remote or resident)
|
||||
pub max_layer_size: Option<u64>,
|
||||
/// Timeline's resident layers
|
||||
pub resident_layers: Vec<EvictionCandidate>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for EvictionCandidate {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
|
||||
// having to allocate a string to this is bad, but it will rarely be formatted
|
||||
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
|
||||
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
|
||||
struct DisplayIsDebug<'a, T>(&'a T);
|
||||
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
|
||||
.field("layer", &DisplayIsDebug(&self.layer))
|
||||
.field("last_activity", &ts)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
@@ -623,6 +729,7 @@ enum EvictionCandidates {
|
||||
/// - tenant B 1 layer
|
||||
/// - tenant C 8 layers
|
||||
async fn collect_eviction_candidates(
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
eviction_order: EvictionOrder,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<EvictionCandidates> {
|
||||
@@ -631,13 +738,16 @@ async fn collect_eviction_candidates(
|
||||
.await
|
||||
.context("get list of tenants")?;
|
||||
|
||||
// TODO: avoid listing every layer in every tenant: this loop can block the executor,
|
||||
// and the resulting data structure can be huge.
|
||||
// (https://github.com/neondatabase/neon/issues/6224)
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state, _gen) in &tenants {
|
||||
for (tenant_id, _state, _gen) in tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
|
||||
let tenant = match tenant::mgr::get_tenant(tenant_id, true) {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
// this can happen if tenant has lifecycle transition after we fetched it
|
||||
@@ -665,11 +775,7 @@ async fn collect_eviction_candidates(
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
.into_iter()
|
||||
.map(|layer_infos| (tl.clone(), layer_infos)),
|
||||
);
|
||||
tenant_candidates.extend(info.resident_layers.into_iter());
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
@@ -707,7 +813,7 @@ async fn collect_eviction_candidates(
|
||||
// Sort layers most-recently-used first, then partition by
|
||||
// cumsum above/below min_resident_size.
|
||||
tenant_candidates
|
||||
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
let mut cumsum: i128 = 0;
|
||||
|
||||
// keeping the -1 or not decides if every tenant should lose their least recently accessed
|
||||
@@ -741,12 +847,10 @@ async fn collect_eviction_candidates(
|
||||
.unwrap_or(1);
|
||||
let divider = total as f32;
|
||||
|
||||
for (i, (timeline, layer_info)) in tenant_candidates.into_iter().enumerate() {
|
||||
let file_size = layer_info.file_size();
|
||||
|
||||
for (i, mut candidate) in tenant_candidates.into_iter().enumerate() {
|
||||
// as we iterate this reverse sorted list, the most recently accessed layer will always
|
||||
// be 1.0; this is for us to evict it last.
|
||||
let relative_last_activity = if matches!(
|
||||
candidate.relative_last_activity = if matches!(
|
||||
eviction_order,
|
||||
EvictionOrder::RelativeAccessed { .. }
|
||||
) {
|
||||
@@ -761,22 +865,46 @@ async fn collect_eviction_candidates(
|
||||
finite_f32::FiniteF32::ZERO
|
||||
};
|
||||
|
||||
let candidate = EvictionCandidate {
|
||||
timeline,
|
||||
last_activity_ts: layer_info.last_activity_ts,
|
||||
layer: layer_info.layer,
|
||||
relative_last_activity,
|
||||
};
|
||||
let partition = if cumsum > min_resident_size as i128 {
|
||||
MinResidentSizePartition::Above
|
||||
} else {
|
||||
MinResidentSizePartition::Below
|
||||
};
|
||||
cumsum += i128::from(candidate.layer.get_file_size());
|
||||
candidates.push((partition, candidate));
|
||||
cumsum += i128::from(file_size);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: the same tenant ID might be hit twice, if it transitions from attached to
|
||||
// secondary while we run. That is okay: when we eventually try and run the eviction,
|
||||
// the `Gate` on the object will ensure that whichever one has already been shut down
|
||||
// will not delete anything.
|
||||
|
||||
let mut secondary_tenants = Vec::new();
|
||||
tenant_manager.foreach_secondary_tenants(
|
||||
|_tenant_shard_id: &TenantShardId, state: &Arc<SecondaryTenant>| {
|
||||
secondary_tenants.push(state.clone());
|
||||
},
|
||||
);
|
||||
|
||||
for secondary_tenant in secondary_tenants {
|
||||
let mut layer_info = secondary_tenant.get_layers_for_eviction();
|
||||
|
||||
layer_info
|
||||
.resident_layers
|
||||
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
|
||||
candidates.extend(layer_info.resident_layers.into_iter().map(|candidate| {
|
||||
(
|
||||
// Secondary locations' layers are always considered above the min resident size,
|
||||
// i.e. secondary locations are permitted to be trimmed to zero layers if all
|
||||
// the layers have sufficiently old access times.
|
||||
MinResidentSizePartition::Above,
|
||||
candidate,
|
||||
)
|
||||
}));
|
||||
}
|
||||
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
|
||||
|
||||
@@ -821,7 +949,7 @@ impl std::ops::Deref for TimelineKey {
|
||||
}
|
||||
|
||||
/// A totally ordered f32 subset we can use with sorting functions.
|
||||
mod finite_f32 {
|
||||
pub(crate) mod finite_f32 {
|
||||
|
||||
/// A totally ordered f32 subset we can use with sorting functions.
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
|
||||
@@ -14,6 +14,7 @@ use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
@@ -265,7 +266,7 @@ impl From<SetNewTenantConfigError> for ApiError {
|
||||
SetNewTenantConfigError::GetTenant(tid) => {
|
||||
ApiError::NotFound(anyhow!("tenant {}", tid).into())
|
||||
}
|
||||
e @ SetNewTenantConfigError::Persist(_) => {
|
||||
e @ (SetNewTenantConfigError::Persist(_) | SetNewTenantConfigError::Other(_)) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
}
|
||||
}
|
||||
@@ -704,7 +705,9 @@ async fn tenant_attach_handler(
|
||||
}
|
||||
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||
let shard_params = ShardParameters::default();
|
||||
let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params);
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.upsert_location(
|
||||
@@ -1194,7 +1197,8 @@ async fn tenant_create_handler(
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||
let location_conf =
|
||||
LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
|
||||
|
||||
let new_tenant = state
|
||||
.tenant_manager
|
||||
@@ -1213,7 +1217,6 @@ async fn tenant_create_handler(
|
||||
"Upsert succeeded but didn't return tenant!"
|
||||
)));
|
||||
};
|
||||
|
||||
// We created the tenant. Existing API semantics are that the tenant
|
||||
// is Active when this function returns.
|
||||
if let res @ Err(_) = new_tenant
|
||||
@@ -1652,12 +1655,13 @@ async fn disk_usage_eviction_run(
|
||||
)));
|
||||
};
|
||||
|
||||
let state = state.disk_usage_eviction_state.clone();
|
||||
let eviction_state = state.disk_usage_eviction_state.clone();
|
||||
|
||||
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
|
||||
&state,
|
||||
&eviction_state,
|
||||
storage,
|
||||
usage,
|
||||
&state.tenant_manager,
|
||||
config.eviction_order,
|
||||
&cancel,
|
||||
)
|
||||
|
||||
@@ -337,15 +337,6 @@ pub(crate) mod page_cache_eviction_metrics {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_page_cache_acquire_pinned_slot_seconds",
|
||||
"Time spent acquiring a pinned slot in the page cache",
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"page_cache_errors_total",
|
||||
|
||||
@@ -550,7 +550,6 @@ impl PageCache {
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
|
||||
@@ -563,7 +562,6 @@ impl PageCache {
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
Err(_timeout) => {
|
||||
timer.stop_and_discard();
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
|
||||
);
|
||||
|
||||
@@ -13,7 +13,10 @@ use anyhow::Context;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
@@ -21,11 +24,14 @@ use pageserver_api::models::{
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber};
|
||||
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::pin::pin;
|
||||
@@ -40,6 +46,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::sync::gate::GateGuard;
|
||||
use utils::{
|
||||
auth::{Claims, Scope, SwappableJwtAuth},
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -274,6 +281,13 @@ async fn page_service_conn_main(
|
||||
}
|
||||
}
|
||||
|
||||
/// While a handler holds a reference to a Timeline, it also holds a the
|
||||
/// timeline's Gate open.
|
||||
struct HandlerTimeline {
|
||||
timeline: Arc<Timeline>,
|
||||
_guard: GateGuard,
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
_conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
@@ -285,6 +299,14 @@ struct PageServerHandler {
|
||||
/// For each query received over the connection,
|
||||
/// `process_query` creates a child context from this one.
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
/// See [`Self::cache_timeline`] for usage.
|
||||
///
|
||||
/// Note on size: the typical size of this map is 1. The largest size we expect
|
||||
/// to see is the number of shards divided by the number of pageservers (typically < 2),
|
||||
/// or the ratio used when splitting shards (i.e. how many children created from one)
|
||||
/// parent shard, where a "large" number might be ~8.
|
||||
shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@@ -358,13 +380,46 @@ impl PageServerHandler {
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
shard_timelines: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
|
||||
/// this rather than naked flush() in order to shut down promptly. Without this, we would
|
||||
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
|
||||
/// in the flush.
|
||||
/// Analogous to calling cancelled() on a Timeline's cancellation token: waits for cancellation.
|
||||
///
|
||||
/// We use many Timeline objects, and hold GateGuards on all of them. We must therefore respect
|
||||
/// all of their cancellation tokens.
|
||||
async fn timeline_cancelled(&self) {
|
||||
// A short wait before we expend the cycles to walk our timeline map. This avoids incurring
|
||||
// that cost every time we check for cancellation.
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
// This function is never called concurrently with code that adds timelines to shard_timelines,
|
||||
// which is enforced by the borrow checker (the future returned by this function carries the
|
||||
// immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
|
||||
// missing any inserts to the map.
|
||||
|
||||
let mut futs = self
|
||||
.shard_timelines
|
||||
.values()
|
||||
.map(|ht| ht.timeline.cancel.cancelled())
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
futs.next().await;
|
||||
}
|
||||
|
||||
/// Analogous to calling is_cancelled() on a Timeline's cancellation token
|
||||
fn timeline_is_cancelled(&self) -> bool {
|
||||
self.shard_timelines
|
||||
.values()
|
||||
.any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
|
||||
}
|
||||
|
||||
/// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
|
||||
/// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
|
||||
/// cancellation if there aren't any timelines in the cache.
|
||||
///
|
||||
/// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
|
||||
/// timeline cancellation token.
|
||||
async fn flush_cancellable<IO>(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -377,6 +432,9 @@ impl PageServerHandler {
|
||||
flush_r = pgb.flush() => {
|
||||
Ok(flush_r?)
|
||||
},
|
||||
_ = self.timeline_cancelled() => {
|
||||
Err(QueryError::Shutdown)
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
Err(QueryError::Shutdown)
|
||||
}
|
||||
@@ -452,7 +510,7 @@ impl PageServerHandler {
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_pagerequests<IO>(
|
||||
&self,
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -463,10 +521,6 @@ impl PageServerHandler {
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Note that since one connection may contain getpage requests that target different
|
||||
// shards (e.g. during splitting when the compute is not yet aware of the split), the tenant
|
||||
// that we look up here may not be the one that serves all the actual requests: we will double
|
||||
// check the mapping of key->shard later before calling into Timeline for getpage requests.
|
||||
let tenant = mgr::get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::First,
|
||||
@@ -487,19 +541,9 @@ impl PageServerHandler {
|
||||
None
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
|
||||
|
||||
// Avoid starting new requests if the timeline has already started shutting down,
|
||||
// and block timeline shutdown until this request is complete, or drops out due
|
||||
// to cancellation.
|
||||
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
self.flush_cancellable(pgb, &tenant.cancel).await?;
|
||||
|
||||
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
|
||||
|
||||
@@ -507,7 +551,7 @@ impl PageServerHandler {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = timeline.cancel.cancelled() => {
|
||||
_ = self.timeline_cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
@@ -544,7 +588,7 @@ impl PageServerHandler {
|
||||
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
|
||||
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_get_rel_exists_request(&timeline, &req, &ctx)
|
||||
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -554,7 +598,7 @@ impl PageServerHandler {
|
||||
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
|
||||
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_get_nblocks_request(&timeline, &req, &ctx)
|
||||
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -564,7 +608,7 @@ impl PageServerHandler {
|
||||
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
|
||||
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
|
||||
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -574,7 +618,7 @@ impl PageServerHandler {
|
||||
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
|
||||
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_db_size_request(&timeline, &req, &ctx)
|
||||
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
@@ -594,7 +638,7 @@ impl PageServerHandler {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
|
||||
Err(e) if self.timeline_is_cancelled() => {
|
||||
// This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
|
||||
// shutdown error, this may be buried inside a PageReconstructError::Other for example.
|
||||
//
|
||||
@@ -617,7 +661,7 @@ impl PageServerHandler {
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
self.flush_cancellable(pgb, &timeline.cancel).await?;
|
||||
self.flush_cancellable(pgb, &tenant.cancel).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -814,11 +858,14 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
async fn handle_get_rel_exists_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -834,11 +881,13 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
async fn handle_get_nblocks_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -854,11 +903,13 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
async fn handle_db_size_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
@@ -880,16 +931,160 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn do_handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
/// For most getpage requests, we will already have a Timeline to serve the request: this function
|
||||
/// looks up such a Timeline synchronously and without touching any global state.
|
||||
fn get_cached_timeline_for_page(
|
||||
&mut self,
|
||||
req: &PagestreamGetPageRequest,
|
||||
) -> Result<&Arc<Timeline>, Key> {
|
||||
let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
|
||||
// Fastest path: single sharded case
|
||||
if first_idx.shard_count < ShardCount(2) {
|
||||
return Ok(&first_timeline.timeline);
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard_num = first_timeline
|
||||
.timeline
|
||||
.get_shard_identity()
|
||||
.get_shard_number(&key);
|
||||
|
||||
// Fast path: matched the first timeline in our local handler map. This case is common if
|
||||
// only one shard per tenant is attached to this pageserver.
|
||||
if first_timeline.timeline.get_shard_identity().number == shard_num {
|
||||
return Ok(&first_timeline.timeline);
|
||||
}
|
||||
|
||||
let shard_index = ShardIndex {
|
||||
shard_number: shard_num,
|
||||
shard_count: first_timeline.timeline.get_shard_identity().count,
|
||||
};
|
||||
|
||||
// Fast-ish path: timeline is in the connection handler's local cache
|
||||
if let Some(found) = self.shard_timelines.get(&shard_index) {
|
||||
return Ok(&found.timeline);
|
||||
}
|
||||
|
||||
key
|
||||
} else {
|
||||
rel_block_to_key(req.rel, req.blkno)
|
||||
};
|
||||
|
||||
Err(key)
|
||||
}
|
||||
|
||||
/// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
|
||||
/// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
|
||||
/// again.
|
||||
///
|
||||
/// Note that all the Timelines in this cache are for the same timeline_id: they're differ
|
||||
/// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
|
||||
/// based on key.
|
||||
///
|
||||
/// The typical size of this cache is 1, as we generally create shards to distribute work
|
||||
/// across pageservers, so don't tend to have multiple shards for the same tenant on the
|
||||
/// same pageserver.
|
||||
fn cache_timeline(
|
||||
&mut self,
|
||||
timeline: Arc<Timeline>,
|
||||
) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
|
||||
let gate_guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
|
||||
|
||||
let shard_index = timeline.tenant_shard_id.to_index();
|
||||
let entry = self
|
||||
.shard_timelines
|
||||
.entry(shard_index)
|
||||
.or_insert(HandlerTimeline {
|
||||
timeline,
|
||||
_guard: gate_guard,
|
||||
});
|
||||
|
||||
Ok(&entry.timeline)
|
||||
}
|
||||
|
||||
/// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
|
||||
/// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
|
||||
/// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
|
||||
async fn load_timeline_for_page(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
|
||||
// Slow path: we must call out to the TenantManager to find the timeline for this Key
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
.await?;
|
||||
|
||||
self.cache_timeline(timeline)
|
||||
}
|
||||
|
||||
async fn get_timeline_shard_zero(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
|
||||
// This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
|
||||
// that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
|
||||
// ref to salf. So instead, we first build a bool, and then return while not borrowing self.
|
||||
let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
|
||||
idx.shard_number == ShardNumber(0)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if have_cached {
|
||||
let entry = self.shard_timelines.iter().next().unwrap();
|
||||
Ok(&entry.1.timeline)
|
||||
} else {
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
Ok(self.cache_timeline(timeline)?)
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetPageRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = match self.get_cached_timeline_for_page(req) {
|
||||
Ok(tl) => tl,
|
||||
Err(key) => {
|
||||
match self
|
||||
.load_timeline_for_page(tenant_id, timeline_id, key)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
||||
.await?;
|
||||
@@ -899,60 +1094,6 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
if timeline.get_shard_identity().is_key_local(&key) {
|
||||
self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
|
||||
.await
|
||||
} else {
|
||||
// The Tenant shard we looked up at connection start does not hold this particular
|
||||
// key: look for other shards in this tenant. This scenario occurs if a pageserver
|
||||
// has multiple shards for the same tenant.
|
||||
//
|
||||
// TODO: optimize this (https://github.com/neondatabase/neon/pull/6037)
|
||||
let timeline = match self
|
||||
.get_active_tenant_timeline(
|
||||
timeline.tenant_shard_id.tenant_id,
|
||||
timeline.timeline_id,
|
||||
ShardSelector::Page(key),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
|
||||
timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
|
||||
// the GateGuard was already held over the whole connection.
|
||||
let _timeline_guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| PageStreamError::Shutdown)?;
|
||||
|
||||
self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
|
||||
async fn handle_basebackup_request<IO>(
|
||||
|
||||
@@ -18,6 +18,7 @@ use enumset::EnumSet;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -2674,10 +2675,11 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy configs are implicitly in attached state
|
||||
// Legacy configs are implicitly in attached state, and do not support sharding
|
||||
Ok(LocationConf::attached_single(
|
||||
tenant_conf,
|
||||
Generation::none(),
|
||||
&ShardParameters::default(),
|
||||
))
|
||||
} else {
|
||||
// FIXME If the config file is not found, assume that we're attaching
|
||||
@@ -3962,6 +3964,7 @@ pub(crate) mod harness {
|
||||
AttachedTenantConf::try_from(LocationConf::attached_single(
|
||||
TenantConfOpt::from(self.tenant_conf),
|
||||
self.generation,
|
||||
&ShardParameters::default(),
|
||||
))
|
||||
.unwrap(),
|
||||
// This is a legacy/test code path: sharding isn't supported here.
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
//! may lead to a data loss.
|
||||
//!
|
||||
use anyhow::bail;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::{self, ShardParameters};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::de::IntoDeserializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -167,14 +167,17 @@ impl LocationConf {
|
||||
/// For use when loading from a legacy configuration: presence of a tenant
|
||||
/// implies it is in AttachmentMode::Single, which used to be the only
|
||||
/// possible state. This function should eventually be removed.
|
||||
pub(crate) fn attached_single(tenant_conf: TenantConfOpt, generation: Generation) -> Self {
|
||||
pub(crate) fn attached_single(
|
||||
tenant_conf: TenantConfOpt,
|
||||
generation: Generation,
|
||||
shard_params: &ShardParameters,
|
||||
) -> Self {
|
||||
Self {
|
||||
mode: LocationMode::Attached(AttachedLocationConfig {
|
||||
generation,
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
// Legacy configuration loads are always from tenants created before sharding existed.
|
||||
shard: ShardIdentity::unsharded(),
|
||||
shard: ShardIdentity::from_params(ShardNumber(0), shard_params),
|
||||
tenant_conf,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardNumber, TenantShardId};
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
@@ -760,6 +761,8 @@ pub(crate) enum SetNewTenantConfigError {
|
||||
GetTenant(#[from] GetTenantError),
|
||||
#[error(transparent)]
|
||||
Persist(anyhow::Error),
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn set_new_tenant_config(
|
||||
@@ -773,10 +776,21 @@ pub(crate) async fn set_new_tenant_config(
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
if tenant.tenant_shard_id().shard_count > ShardCount(0) {
|
||||
// Note that we use ShardParameters::default below.
|
||||
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
|
||||
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
|
||||
)));
|
||||
}
|
||||
|
||||
// This is a legacy API that only operates on attached tenants: the preferred
|
||||
// API to use is the location_config/ endpoint, which lets the caller provide
|
||||
// the full LocationConf.
|
||||
let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
|
||||
let location_conf = LocationConf::attached_single(
|
||||
new_tenant_conf,
|
||||
tenant.generation,
|
||||
&ShardParameters::default(),
|
||||
);
|
||||
|
||||
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
|
||||
.await
|
||||
|
||||
@@ -3,22 +3,31 @@ pub mod heatmap;
|
||||
mod heatmap_uploader;
|
||||
mod scheduler;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
virtual_file::MaybeFatalIo,
|
||||
};
|
||||
|
||||
use self::{
|
||||
downloader::{downloader_task, SecondaryDetail},
|
||||
heatmap_uploader::heatmap_uploader_task,
|
||||
};
|
||||
|
||||
use super::{config::SecondaryLocationConfig, mgr::TenantManager};
|
||||
use super::{
|
||||
config::SecondaryLocationConfig, mgr::TenantManager,
|
||||
span::debug_assert_current_span_has_tenant_id, storage_layer::LayerFileName,
|
||||
};
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{completion::Barrier, sync::gate::Gate};
|
||||
use tracing::instrument;
|
||||
use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
|
||||
|
||||
enum DownloadCommand {
|
||||
Download(TenantShardId),
|
||||
@@ -107,9 +116,67 @@ impl SecondaryTenant {
|
||||
self.detail.lock().unwrap().config = config.clone();
|
||||
}
|
||||
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
&self.tenant_shard_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> DiskUsageEvictionInfo {
|
||||
self.detail.lock().unwrap().get_layers_for_eviction(self)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
|
||||
pub(crate) async fn evict_layer(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
name: LayerFileName,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let _guard = match self.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
|
||||
let path = conf
|
||||
.timeline_path(&self.tenant_shard_id, &timeline_id)
|
||||
.join(name.file_name());
|
||||
|
||||
// We tolerate ENOENT, because between planning eviction and executing
|
||||
// it, the secondary downloader could have seen an updated heatmap that
|
||||
// resulted in a layer being deleted.
|
||||
// Other local I/O errors are process-fatal: these should never happen.
|
||||
tokio::fs::remove_file(path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("Deleting layer during eviction");
|
||||
|
||||
// Update the timeline's state. This does not have to be synchronized with
|
||||
// the download process, because:
|
||||
// - If downloader is racing with us to remove a file (e.g. because it is
|
||||
// removed from heatmap), then our mutual .remove() operations will both
|
||||
// succeed.
|
||||
// - If downloader is racing with us to download the object (this would require
|
||||
// multiple eviction iterations to race with multiple download iterations), then
|
||||
// if we remove it from the state, the worst that happens is the downloader
|
||||
// downloads it again before re-inserting, or we delete the file but it remains
|
||||
// in the state map (in which case it will be downloaded if this secondary
|
||||
// tenant transitions to attached and tries to access it)
|
||||
//
|
||||
// The important assumption here is that the secondary timeline state does not
|
||||
// have to 100% match what is on disk, because it's a best-effort warming
|
||||
// of the cache.
|
||||
let mut detail = self.detail.lock().unwrap();
|
||||
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
|
||||
timeline_detail.on_disk_layers.remove(&name);
|
||||
timeline_detail.evicted_at.insert(name, now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
|
||||
|
||||
@@ -8,6 +8,9 @@ use std::{
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
disk_usage_eviction_task::{
|
||||
finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer,
|
||||
},
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
config::SecondaryLocationConfig,
|
||||
@@ -142,6 +145,46 @@ impl SecondaryDetail {
|
||||
timelines: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn get_layers_for_eviction(
|
||||
&self,
|
||||
parent: &Arc<SecondaryTenant>,
|
||||
) -> DiskUsageEvictionInfo {
|
||||
let mut result = DiskUsageEvictionInfo {
|
||||
max_layer_size: None,
|
||||
resident_layers: Vec::new(),
|
||||
};
|
||||
for (timeline_id, timeline_detail) in &self.timelines {
|
||||
result
|
||||
.resident_layers
|
||||
.extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| {
|
||||
EvictionCandidate {
|
||||
layer: EvictionLayer::Secondary(EvictionSecondaryLayer {
|
||||
secondary_tenant: parent.clone(),
|
||||
timeline_id: *timeline_id,
|
||||
name: name.clone(),
|
||||
metadata: ods.metadata.clone(),
|
||||
}),
|
||||
last_activity_ts: ods.access_time,
|
||||
relative_last_activity: finite_f32::FiniteF32::ZERO,
|
||||
}
|
||||
}));
|
||||
}
|
||||
result.max_layer_size = result
|
||||
.resident_layers
|
||||
.iter()
|
||||
.map(|l| l.layer.get_file_size())
|
||||
.max();
|
||||
|
||||
tracing::debug!(
|
||||
"eviction: secondary tenant {} found {} timelines, {} layers",
|
||||
parent.get_tenant_shard_id(),
|
||||
self.timelines.len(),
|
||||
result.resident_layers.len()
|
||||
);
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
struct PendingDownload {
|
||||
|
||||
@@ -15,7 +15,7 @@ use utils::sync::heavier_once_cell;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::{remote_timeline_client::LayerFileMetadata, RemoteTimelineClient, Timeline};
|
||||
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
|
||||
|
||||
use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer;
|
||||
@@ -204,17 +204,14 @@ impl Layer {
|
||||
///
|
||||
/// Technically cancellation safe, but cancelling might shift the viewpoint of what generation
|
||||
/// of download-evict cycle on retry.
|
||||
pub(crate) async fn evict_and_wait(
|
||||
&self,
|
||||
rtc: &RemoteTimelineClient,
|
||||
) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait(rtc).await
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
self.0.evict_and_wait().await
|
||||
}
|
||||
|
||||
/// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload
|
||||
/// then.
|
||||
///
|
||||
/// On drop, this will cause a call to [`RemoteTimelineClient::schedule_deletion_of_unlinked`].
|
||||
/// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`].
|
||||
/// This means that the unlinking by [gc] or [compaction] must have happened strictly before
|
||||
/// the value this is called on gets dropped.
|
||||
///
|
||||
@@ -606,10 +603,7 @@ impl LayerInner {
|
||||
|
||||
/// Cancellation safe, however dropping the future and calling this method again might result
|
||||
/// in a new attempt to evict OR join the previously started attempt.
|
||||
pub(crate) async fn evict_and_wait(
|
||||
&self,
|
||||
_: &RemoteTimelineClient,
|
||||
) -> Result<(), EvictionError> {
|
||||
pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
assert!(self.have_remote_client);
|
||||
|
||||
@@ -42,15 +42,6 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
|
||||
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
|
||||
ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
@@ -58,7 +49,22 @@ use crate::tenant::{
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{
|
||||
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::finite_f32,
|
||||
tenant::storage_layer::{
|
||||
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
|
||||
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
|
||||
ValueReconstructState,
|
||||
},
|
||||
};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
@@ -1133,12 +1139,7 @@ impl Timeline {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let rtc = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
|
||||
|
||||
match local_layer.evict_and_wait(rtc).await {
|
||||
match local_layer.evict_and_wait().await {
|
||||
Ok(()) => Ok(Some(true)),
|
||||
Err(EvictionError::NotFound) => Ok(Some(false)),
|
||||
Err(EvictionError::Downloaded) => Ok(Some(false)),
|
||||
@@ -2102,7 +2103,7 @@ impl Timeline {
|
||||
let layer_file_names = eviction_info
|
||||
.resident_layers
|
||||
.iter()
|
||||
.map(|l| l.layer.layer_desc().filename())
|
||||
.map(|l| l.layer.get_name())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let decorated = match remote_client.get_layers_metadata(layer_file_names) {
|
||||
@@ -2120,7 +2121,7 @@ impl Timeline {
|
||||
.filter_map(|(layer, remote_info)| {
|
||||
remote_info.map(|remote_info| {
|
||||
HeatMapLayer::new(
|
||||
layer.layer.layer_desc().filename(),
|
||||
layer.layer.get_name(),
|
||||
IndexLayerMetadata::from(remote_info),
|
||||
layer.last_activity_ts,
|
||||
)
|
||||
@@ -4423,43 +4424,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct DiskUsageEvictionInfo {
|
||||
/// Timeline's largest layer (remote or resident)
|
||||
pub max_layer_size: Option<u64>,
|
||||
/// Timeline's resident layers
|
||||
pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
|
||||
}
|
||||
|
||||
pub(crate) struct LocalLayerInfoForDiskUsageEviction {
|
||||
pub layer: Layer,
|
||||
pub last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
|
||||
// having to allocate a string to this is bad, but it will rarely be formatted
|
||||
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
|
||||
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
|
||||
struct DisplayIsDebug<'a, T>(&'a T);
|
||||
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
f.debug_struct("LocalLayerInfoForDiskUsageEviction")
|
||||
.field("layer", &DisplayIsDebug(&self.layer))
|
||||
.field("last_activity", &ts)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalLayerInfoForDiskUsageEviction {
|
||||
pub fn file_size(&self) -> u64 {
|
||||
self.layer.layer_desc().file_size
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Returns non-remote layers for eviction.
|
||||
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
@@ -4493,9 +4457,10 @@ impl Timeline {
|
||||
SystemTime::now()
|
||||
});
|
||||
|
||||
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
|
||||
layer: l.drop_eviction_guard(),
|
||||
resident_layers.push(EvictionCandidate {
|
||||
layer: l.drop_eviction_guard().into(),
|
||||
last_activity_ts,
|
||||
relative_last_activity: finite_f32::FiniteF32::ZERO,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4652,11 +4617,6 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rtc = timeline
|
||||
.remote_client
|
||||
.clone()
|
||||
.expect("just configured this");
|
||||
|
||||
let layer = find_some_layer(&timeline).await;
|
||||
let layer = layer
|
||||
.keep_resident()
|
||||
@@ -4665,8 +4625,8 @@ mod tests {
|
||||
.expect("should had been resident")
|
||||
.drop_eviction_guard();
|
||||
|
||||
let first = async { layer.evict_and_wait(&rtc).await };
|
||||
let second = async { layer.evict_and_wait(&rtc).await };
|
||||
let first = async { layer.evict_and_wait().await };
|
||||
let second = async { layer.evict_and_wait().await };
|
||||
|
||||
let (first, second) = tokio::join!(first, second);
|
||||
|
||||
|
||||
@@ -215,13 +215,10 @@ impl Timeline {
|
||||
|
||||
// So, we just need to deal with this.
|
||||
|
||||
let remote_client = match self.remote_client.as_ref() {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
error!("no remote storage configured, cannot evict layers");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
};
|
||||
if self.remote_client.is_none() {
|
||||
error!("no remote storage configured, cannot evict layers");
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
{
|
||||
@@ -274,9 +271,8 @@ impl Timeline {
|
||||
};
|
||||
let layer = guard.drop_eviction_guard();
|
||||
if no_activity_for > p.threshold {
|
||||
let remote_client = remote_client.clone();
|
||||
// this could cause a lot of allocations in some cases
|
||||
js.spawn(async move { layer.evict_and_wait(&remote_client).await });
|
||||
js.spawn(async move { layer.evict_and_wait().await });
|
||||
stats.candidates += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,9 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["testing"]
|
||||
default = ["tokio-console"]
|
||||
testing = []
|
||||
tokio-console = ["tokio/tracing", "dep:console-subscriber"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
@@ -17,6 +18,7 @@ bytes = { workspace = true, features = ["serde"] }
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
console-subscriber = { workspace = true, optional = true }
|
||||
consumption_metrics.workspace = true
|
||||
dashmap.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -26,8 +26,12 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
.await
|
||||
.map(OpenTelemetryLayer::new);
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
let reg = tracing_subscriber::registry();
|
||||
|
||||
#[cfg(all(tokio_unstable, feature = "tokio-console"))]
|
||||
let reg = reg.with(console_subscriber::spawn());
|
||||
|
||||
reg.with(env_filter)
|
||||
.with(otlp_layer)
|
||||
.with(fmt_layer)
|
||||
.try_init()?;
|
||||
|
||||
@@ -15,6 +15,7 @@ use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use smol_str::SmolStr;
|
||||
use tokio_postgres::error::DbError;
|
||||
use tokio_postgres::error::ErrorPosition;
|
||||
use tokio_postgres::types::Kind;
|
||||
use tokio_postgres::types::Type;
|
||||
use tokio_postgres::GenericClient;
|
||||
@@ -59,6 +60,7 @@ enum Payload {
|
||||
|
||||
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
|
||||
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
|
||||
const SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART: &str = "api";
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
@@ -176,10 +178,11 @@ fn get_conn_info(
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|h| h.split(':').next());
|
||||
|
||||
if hostname != sni_hostname {
|
||||
// sni_hostname has to be either the same as hostname or the one used in serverless driver.
|
||||
if !check_matches(&sni_hostname, hostname)? {
|
||||
return Err(anyhow::anyhow!("mismatched SNI hostname and hostname"));
|
||||
} else if let Some(h) = host_header {
|
||||
if h != hostname {
|
||||
if h != sni_hostname {
|
||||
return Err(anyhow::anyhow!("mismatched host header and hostname"));
|
||||
}
|
||||
}
|
||||
@@ -213,6 +216,20 @@ fn get_conn_info(
|
||||
})
|
||||
}
|
||||
|
||||
fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Error> {
|
||||
if sni_hostname == hostname {
|
||||
return Ok(true);
|
||||
}
|
||||
let (sni_hostname_first, sni_hostname_rest) = sni_hostname
|
||||
.split_once('.')
|
||||
.ok_or_else(|| anyhow::anyhow!("Unexpected sni format."))?;
|
||||
let (_, hostname_rest) = hostname
|
||||
.split_once('.')
|
||||
.ok_or_else(|| anyhow::anyhow!("Unexpected hostname format."))?;
|
||||
Ok(sni_hostname_rest == hostname_rest
|
||||
&& sni_hostname_first == SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART)
|
||||
}
|
||||
|
||||
// TODO: return different http error codes
|
||||
pub async fn handle(
|
||||
tls: &'static TlsConfig,
|
||||
@@ -231,7 +248,7 @@ pub async fn handle(
|
||||
Ok(r) => match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let mut message = format!("{:?}", e);
|
||||
let db_error = e
|
||||
.downcast_ref::<tokio_postgres::Error>()
|
||||
.and_then(|e| e.as_db_error());
|
||||
@@ -244,7 +261,25 @@ pub async fn handle(
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
// TODO(conrad): db_error.position()
|
||||
if let Some(db_error) = db_error {
|
||||
db_error.message().clone_into(&mut message);
|
||||
}
|
||||
|
||||
let position = db_error.and_then(|db| db.position());
|
||||
let (position, internal_position, internal_query) = match position {
|
||||
Some(ErrorPosition::Original(position)) => (
|
||||
Value::String(position.to_string()),
|
||||
Value::Null,
|
||||
Value::Null,
|
||||
),
|
||||
Some(ErrorPosition::Internal { position, query }) => (
|
||||
Value::Null,
|
||||
Value::String(position.to_string()),
|
||||
Value::String(query.clone()),
|
||||
),
|
||||
None => (Value::Null, Value::Null, Value::Null),
|
||||
};
|
||||
|
||||
let code = get(db_error, |db| db.code().code());
|
||||
let severity = get(db_error, |db| db.severity());
|
||||
let detail = get(db_error, |db| db.detail());
|
||||
@@ -256,7 +291,7 @@ pub async fn handle(
|
||||
let datatype = get(db_error, |db| db.datatype());
|
||||
let constraint = get(db_error, |db| db.constraint());
|
||||
let file = get(db_error, |db| db.file());
|
||||
let line = get(db_error, |db| db.line());
|
||||
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
|
||||
let routine = get(db_error, |db| db.routine());
|
||||
|
||||
error!(
|
||||
@@ -271,12 +306,15 @@ pub async fn handle(
|
||||
"code": code,
|
||||
"detail": detail,
|
||||
"hint": hint,
|
||||
"position": position,
|
||||
"internalPosition": internal_position,
|
||||
"internalQuery": internal_query,
|
||||
"severity": severity,
|
||||
"where": where_,
|
||||
"table": table,
|
||||
"column": column,
|
||||
"schema": schema,
|
||||
"datatype": datatype,
|
||||
"dataType": datatype,
|
||||
"constraint": constraint,
|
||||
"file": file,
|
||||
"line": line,
|
||||
|
||||
@@ -9,6 +9,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PgBin,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
@@ -75,9 +76,15 @@ class EvictionOrder(str, enum.Enum):
|
||||
if self == EvictionOrder.ABSOLUTE_ORDER:
|
||||
return {"type": "AbsoluteAccessed"}
|
||||
elif self == EvictionOrder.RELATIVE_ORDER_EQUAL:
|
||||
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": False}}
|
||||
return {
|
||||
"type": "RelativeAccessed",
|
||||
"args": {"highest_layer_count_loses_first": False},
|
||||
}
|
||||
elif self == EvictionOrder.RELATIVE_ORDER_SPARE:
|
||||
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": True}}
|
||||
return {
|
||||
"type": "RelativeAccessed",
|
||||
"args": {"highest_layer_count_loses_first": True},
|
||||
}
|
||||
else:
|
||||
raise RuntimeError(f"not implemented: {self}")
|
||||
|
||||
@@ -91,14 +98,24 @@ class EvictionEnv:
|
||||
layer_size: int
|
||||
pgbench_init_lsns: Dict[TenantId, Lsn]
|
||||
|
||||
def timelines_du(self) -> Tuple[int, int, int]:
|
||||
@property
|
||||
def pageserver(self):
|
||||
"""
|
||||
Shortcut for tests that only use one pageserver.
|
||||
"""
|
||||
return self.neon_env.pageserver
|
||||
|
||||
def timelines_du(self, pageserver: NeonPageserver) -> Tuple[int, int, int]:
|
||||
return poor_mans_du(
|
||||
self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], verbose=False
|
||||
self.neon_env,
|
||||
[(tid, tlid) for tid, tlid in self.timelines],
|
||||
pageserver,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]:
|
||||
def du_by_timeline(self, pageserver: NeonPageserver) -> Dict[Tuple[TenantId, TimelineId], int]:
|
||||
return {
|
||||
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], verbose=True)[0]
|
||||
(tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], pageserver, verbose=True)[0]
|
||||
for tid, tlid in self.timelines
|
||||
}
|
||||
|
||||
@@ -126,7 +143,13 @@ class EvictionEnv:
|
||||
_avg = cur.fetchone()
|
||||
|
||||
def pageserver_start_with_disk_usage_eviction(
|
||||
self, period, max_usage_pct, min_avail_bytes, mock_behavior, eviction_order: EvictionOrder
|
||||
self,
|
||||
pageserver: NeonPageserver,
|
||||
period,
|
||||
max_usage_pct,
|
||||
min_avail_bytes,
|
||||
mock_behavior,
|
||||
eviction_order: EvictionOrder,
|
||||
):
|
||||
disk_usage_config = {
|
||||
"period": period,
|
||||
@@ -138,7 +161,12 @@ class EvictionEnv:
|
||||
|
||||
enc = toml.TomlEncoder()
|
||||
|
||||
self.neon_env.pageserver.start(
|
||||
# these can sometimes happen during startup before any tenants have been
|
||||
# loaded, so nothing can be evicted, we just wait for next iteration which
|
||||
# is able to evict.
|
||||
pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
|
||||
|
||||
pageserver.start(
|
||||
overrides=(
|
||||
"--pageserver-config-override=disk_usage_based_eviction="
|
||||
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
|
||||
@@ -152,15 +180,10 @@ class EvictionEnv:
|
||||
)
|
||||
|
||||
def statvfs_called():
|
||||
assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*")
|
||||
assert pageserver.log_contains(".*running mocked statvfs.*")
|
||||
|
||||
wait_until(10, 1, statvfs_called)
|
||||
|
||||
# these can sometimes happen during startup before any tenants have been
|
||||
# loaded, so nothing can be evicted, we just wait for next iteration which
|
||||
# is able to evict.
|
||||
self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
|
||||
|
||||
|
||||
def human_bytes(amt: float) -> str:
|
||||
suffixes = ["", "Ki", "Mi", "Gi"]
|
||||
@@ -175,23 +198,28 @@ def human_bytes(amt: float) -> str:
|
||||
raise RuntimeError("unreachable")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
|
||||
def _eviction_env(
|
||||
request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int
|
||||
) -> EvictionEnv:
|
||||
"""
|
||||
Creates two tenants, one somewhat larger than the other.
|
||||
"""
|
||||
|
||||
log.info(f"setting up eviction_env for test {request.node.name}")
|
||||
|
||||
neon_env_builder.num_pageservers = num_pageservers
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# initial tenant will not be present on this pageserver
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# We will create all tenants on the 0th pageserver
|
||||
pageserver_http = env.pageservers[0].http_client()
|
||||
|
||||
# allow because we are invoking this manually; we always warn on executing disk based eviction
|
||||
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
|
||||
|
||||
# Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers.
|
||||
# Large count of layers and small layer size is good for testing because it makes evictions predictable.
|
||||
@@ -216,7 +244,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()])
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id, pageserver_id=1)
|
||||
|
||||
timelines.append((tenant_id, timeline_id))
|
||||
|
||||
@@ -252,6 +280,20 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
|
||||
return eviction_env
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
|
||||
return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=1)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eviction_env_ha(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
|
||||
"""
|
||||
Variant of the eviction environment with two pageservers for testing eviction on
|
||||
HA configurations with a secondary location.
|
||||
"""
|
||||
return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=2)
|
||||
|
||||
|
||||
def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
|
||||
env = eviction_env
|
||||
|
||||
@@ -264,10 +306,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
|
||||
healthy_tenant_id, healthy_timeline_id = env.timelines[1]
|
||||
|
||||
broken_size_pre, _, _ = poor_mans_du(
|
||||
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
|
||||
env.neon_env,
|
||||
[(broken_tenant_id, broken_timeline_id)],
|
||||
env.pageserver,
|
||||
verbose=True,
|
||||
)
|
||||
healthy_size_pre, _, _ = poor_mans_du(
|
||||
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
|
||||
env.neon_env,
|
||||
[(healthy_tenant_id, healthy_timeline_id)],
|
||||
env.pageserver,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
# try to evict everything, then validate that broken tenant wasn't touched
|
||||
@@ -277,10 +325,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
|
||||
log.info(f"{response}")
|
||||
|
||||
broken_size_post, _, _ = poor_mans_du(
|
||||
env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True
|
||||
env.neon_env,
|
||||
[(broken_tenant_id, broken_timeline_id)],
|
||||
env.pageserver,
|
||||
verbose=True,
|
||||
)
|
||||
healthy_size_post, _, _ = poor_mans_du(
|
||||
env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True
|
||||
env.neon_env,
|
||||
[(healthy_tenant_id, healthy_timeline_id)],
|
||||
env.pageserver,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
assert broken_size_pre == broken_size_post, "broken tenant should not be touched"
|
||||
@@ -302,7 +356,7 @@ def test_pageserver_evicts_until_pressure_is_relieved(
|
||||
env = eviction_env
|
||||
pageserver_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
|
||||
target = total_on_disk // 2
|
||||
|
||||
@@ -311,7 +365,7 @@ def test_pageserver_evicts_until_pressure_is_relieved(
|
||||
)
|
||||
log.info(f"{response}")
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
|
||||
@@ -336,8 +390,8 @@ def test_pageserver_respects_overridden_resident_size(
|
||||
env = eviction_env
|
||||
ps_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
du_by_timeline = env.du_by_timeline()
|
||||
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
du_by_timeline = env.du_by_timeline(env.pageserver)
|
||||
log.info("du_by_timeline: %s", du_by_timeline)
|
||||
|
||||
assert len(du_by_timeline) == 2, "this test assumes two tenants"
|
||||
@@ -379,8 +433,8 @@ def test_pageserver_respects_overridden_resident_size(
|
||||
GLOBAL_LRU_LOG_LINE,
|
||||
), "this test is pointless if it fell back to global LRU"
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
later_du_by_timeline = env.du_by_timeline()
|
||||
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
later_du_by_timeline = env.du_by_timeline(env.pageserver)
|
||||
log.info("later_du_by_timeline: %s", later_du_by_timeline)
|
||||
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
@@ -412,7 +466,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E
|
||||
env = eviction_env
|
||||
ps_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
target = total_on_disk
|
||||
|
||||
response = ps_http.disk_usage_eviction_run(
|
||||
@@ -420,7 +474,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E
|
||||
)
|
||||
log.info(f"{response}")
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
assert 0 <= actual_change, "nothing can load layers during this test"
|
||||
assert actual_change >= target, "eviction must always evict more than target"
|
||||
@@ -448,8 +502,8 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
|
||||
env = eviction_env
|
||||
ps_http = env.pageserver_http
|
||||
|
||||
(total_on_disk, _, _) = env.timelines_du()
|
||||
du_by_timeline = env.du_by_timeline()
|
||||
(total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
du_by_timeline = env.du_by_timeline(env.pageserver)
|
||||
|
||||
# pick smaller or greater (iteration order is insertion order of scale=4 and scale=6)
|
||||
[warm, cold] = list(du_by_timeline.keys())
|
||||
@@ -467,12 +521,12 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
|
||||
)
|
||||
log.info(f"{response}")
|
||||
|
||||
(later_total_on_disk, _, _) = env.timelines_du()
|
||||
(later_total_on_disk, _, _) = env.timelines_du(env.pageserver)
|
||||
actual_change = total_on_disk - later_total_on_disk
|
||||
assert 0 <= actual_change, "nothing can load layers during this test"
|
||||
assert actual_change >= target, "eviction must always evict more than target"
|
||||
|
||||
later_du_by_timeline = env.du_by_timeline()
|
||||
later_du_by_timeline = env.du_by_timeline(env.pageserver)
|
||||
for tenant, later_tenant_usage in later_du_by_timeline.items():
|
||||
assert (
|
||||
later_tenant_usage < du_by_timeline[tenant]
|
||||
@@ -508,7 +562,10 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
|
||||
|
||||
|
||||
def poor_mans_du(
|
||||
env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]], verbose: bool = False
|
||||
env: NeonEnv,
|
||||
timelines: list[Tuple[TenantId, TimelineId]],
|
||||
pageserver: NeonPageserver,
|
||||
verbose: bool = False,
|
||||
) -> Tuple[int, int, int]:
|
||||
"""
|
||||
Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples;
|
||||
@@ -518,7 +575,7 @@ def poor_mans_du(
|
||||
largest_layer = 0
|
||||
smallest_layer = None
|
||||
for tenant_id, timeline_id in timelines:
|
||||
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
|
||||
timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id)
|
||||
assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}"
|
||||
total = 0
|
||||
for file in timeline_dir.iterdir():
|
||||
@@ -549,6 +606,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv):
|
||||
env = eviction_env
|
||||
env.neon_env.pageserver.stop()
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
env.pageserver,
|
||||
period="1s",
|
||||
max_usage_pct=90,
|
||||
min_avail_bytes=0,
|
||||
@@ -573,11 +631,12 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
|
||||
env.neon_env.pageserver.stop()
|
||||
|
||||
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
|
||||
total_size, _, _ = env.timelines_du()
|
||||
total_size, _, _ = env.timelines_du(env.pageserver)
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
env.pageserver,
|
||||
period="1s",
|
||||
max_usage_pct=33,
|
||||
min_avail_bytes=0,
|
||||
@@ -597,7 +656,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
|
||||
|
||||
wait_until(10, 1, relieved_log_message)
|
||||
|
||||
post_eviction_total_size, _, _ = env.timelines_du()
|
||||
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
|
||||
|
||||
assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage"
|
||||
|
||||
@@ -612,13 +671,14 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
|
||||
env.neon_env.pageserver.stop()
|
||||
|
||||
# make it seem like we're at 100% utilization by setting total bytes to the used bytes
|
||||
total_size, _, _ = env.timelines_du()
|
||||
total_size, _, _ = env.timelines_du(env.pageserver)
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
|
||||
min_avail_bytes = total_size // 3
|
||||
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
env.pageserver,
|
||||
period="1s",
|
||||
max_usage_pct=100,
|
||||
min_avail_bytes=min_avail_bytes,
|
||||
@@ -638,7 +698,67 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
|
||||
|
||||
wait_until(10, 1, relieved_log_message)
|
||||
|
||||
post_eviction_total_size, _, _ = env.timelines_du()
|
||||
post_eviction_total_size, _, _ = env.timelines_du(env.pageserver)
|
||||
|
||||
assert (
|
||||
total_size - post_eviction_total_size >= min_avail_bytes
|
||||
), "we requested at least min_avail_bytes worth of free space"
|
||||
|
||||
|
||||
def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
|
||||
env = eviction_env_ha
|
||||
|
||||
tenant_ids = [t[0] for t in env.timelines]
|
||||
|
||||
log.info("Setting up secondary location...")
|
||||
ps_attached = env.neon_env.pageservers[0]
|
||||
ps_secondary = env.neon_env.pageservers[1]
|
||||
for tenant_id in tenant_ids:
|
||||
ps_secondary.tenant_location_configure(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Secondary",
|
||||
"secondary_conf": {"warm": True},
|
||||
"tenant_conf": {},
|
||||
},
|
||||
)
|
||||
readback_conf = ps_secondary.read_tenant_location_conf(tenant_id)
|
||||
log.info(f"Read back conf: {readback_conf}")
|
||||
|
||||
# Request secondary location to download all layers that the attached location has
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
# Configure the secondary pageserver to have a phony small disk size
|
||||
ps_secondary.stop()
|
||||
total_size, _, _ = env.timelines_du(ps_secondary)
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
|
||||
min_avail_bytes = total_size // 3
|
||||
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
ps_secondary,
|
||||
period="1s",
|
||||
max_usage_pct=100,
|
||||
min_avail_bytes=min_avail_bytes,
|
||||
mock_behavior={
|
||||
"type": "Success",
|
||||
"blocksize": blocksize,
|
||||
"total_blocks": total_blocks,
|
||||
# Only count layer files towards used bytes in the mock_statvfs.
|
||||
# This avoids accounting for metadata files & tenant conf in the tests.
|
||||
"name_filter": ".*__.*",
|
||||
},
|
||||
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
|
||||
)
|
||||
|
||||
def relieved_log_message():
|
||||
assert ps_secondary.log_contains(".*disk usage pressure relieved")
|
||||
|
||||
wait_until(10, 1, relieved_log_message)
|
||||
|
||||
post_eviction_total_size, _, _ = env.timelines_du(ps_secondary)
|
||||
|
||||
assert (
|
||||
total_size - post_eviction_total_size >= min_avail_bytes
|
||||
|
||||
@@ -203,6 +203,21 @@ def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
static_proxy.wait_for_exit()
|
||||
|
||||
|
||||
def test_sql_over_http_serverless_driver(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://api.localtest.me:{static_proxy.external_http_port}/sql",
|
||||
data=json.dumps({"query": "select 42 as answer", "params": []}),
|
||||
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200, response.text
|
||||
rows = response.json()["rows"]
|
||||
assert rows == [{"answer": 42}]
|
||||
|
||||
|
||||
def test_sql_over_http(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user