Compare commits

..

23 Commits

Author SHA1 Message Date
John Spray
0baf91fcac pageserver: include secondary tenants in disk usage eviction 2023-10-26 20:33:47 +01:00
John Spray
cd27f42839 pageserver: pass TenantManager into disk usage eviction task 2023-10-26 20:33:47 +01:00
John Spray
edf303ff44 pageserver: add Layer::for_secondary 2023-10-26 20:33:47 +01:00
John Spray
ec189338eb tests: avoid deprecated log.warn() 2023-10-26 20:33:47 +01:00
John Spray
e22f9b9fbb tests: more logging from attachment_service 2023-10-26 20:33:47 +01:00
John Spray
83fc486636 tests: log in UTC 2023-10-26 20:33:47 +01:00
John Spray
3afec731dc pageserver: add a CancellationToken to Tenant 2023-10-26 20:33:47 +01:00
John Spray
559611f9a6 pageserver: add Tenant::heatmap_hook 2023-10-26 20:33:47 +01:00
John Spray
d56dad3824 tests: add test_pageserver_secondary 2023-10-26 20:33:47 +01:00
John Spray
547acb6653 pageserver: create timelines/ dir when configuring secondary location 2023-10-26 20:33:47 +01:00
John Spray
5264ffc2a9 pageserver/http: add testing routes for secondary mode 2023-10-26 20:33:47 +01:00
John Spray
ee841708ff pageserver: implement flush on location conf update 2023-10-26 20:33:47 +01:00
John Spray
f356df1860 pageserver: more logging during tenant shutdown 2023-10-26 20:33:47 +01:00
John Spray
fbe1da2981 tests: add helpers for location_conf 2023-10-26 20:33:47 +01:00
John Spray
22c94d99f5 tests: refactor a path helper 2023-10-26 20:33:47 +01:00
John Spray
6de414b5a5 tests: add multi attach test 2023-10-26 20:33:47 +01:00
John Spray
8c0ce9723a Refactor Workload into shared location 2023-10-26 20:33:47 +01:00
John Spray
5a4c371e94 pageserver: launch tasks for secondary mode 2023-10-26 20:33:47 +01:00
John Spray
65096ac992 pageserver: add secondary downloader & heatmaps 2023-10-26 20:33:47 +01:00
John Spray
049cb1fb4b Add Timeline::generate_heatmap, remote client heatmap upload 2023-10-26 20:33:47 +01:00
John Spray
640350a2c0 TenantManager: implement hooks for secondary downloads 2023-10-26 20:33:47 +01:00
John Spray
d422105d88 pageserver: start refactoring into TenantManager 2023-10-26 11:14:24 +01:00
John Spray
ead1931167 pageserver: add InProgress top level state & make TenantsMap lock synchronous 2023-10-26 11:13:16 +01:00
187 changed files with 8621 additions and 8281 deletions

View File

@@ -22,11 +22,5 @@ platforms = [
# "x86_64-pc-windows-msvc",
]
[final-excludes]
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
# from depending on workspace-hack because most of the dependencies are not used.
workspace-members = ["vm_monitor"]
# Write out exact versions rather than a semver range. (Defaults to false.)
# exact-versions = true

View File

@@ -723,7 +723,6 @@ jobs:
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
--context .
--build-arg GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
--build-arg BUILD_TAG=${{ needs.tag.outputs.build-tag }}
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
--destination neondatabase/neon:${{needs.tag.outputs.build-tag}}
@@ -848,7 +847,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.18.5
VM_BUILDER_VERSION: v0.18.2
steps:
- name: Checkout

View File

@@ -2,7 +2,7 @@ name: Create Release Branch
on:
schedule:
- cron: '0 7 * * 5'
- cron: '0 7 * * 2'
workflow_dispatch:
jobs:

54
Cargo.lock generated
View File

@@ -170,12 +170,6 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "archery"
version = "0.5.0"
@@ -1615,6 +1609,16 @@ dependencies = [
"subtle",
]
[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
dependencies = [
"quote",
"syn 1.0.109",
]
[[package]]
name = "ctr"
version = "0.6.0"
@@ -2710,10 +2714,11 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.20"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
"value-bag",
]
@@ -3556,7 +3561,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -3569,7 +3574,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
dependencies = [
"native-tls",
"tokio",
@@ -3580,7 +3585,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -3598,7 +3603,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4425,7 +4430,6 @@ dependencies = [
"itertools",
"pageserver",
"rand 0.8.5",
"remote_storage",
"reqwest",
"serde",
"serde_json",
@@ -4484,7 +4488,6 @@ dependencies = [
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
"toml_edit",
"tracing",
"url",
@@ -4687,16 +4690,6 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_assert"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eda563240c1288b044209be1f0d38bb4d15044fb3e00dc354fbc922ab4733e80"
dependencies = [
"hashbrown 0.13.2",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.183"
@@ -5414,7 +5407,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
dependencies = [
"async-trait",
"byteorder",
@@ -5957,7 +5950,6 @@ name = "utils"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-trait",
"bincode",
"byteorder",
@@ -5984,7 +5976,6 @@ dependencies = [
"routerify",
"sentry",
"serde",
"serde_assert",
"serde_json",
"serde_with",
"signal-hook",
@@ -6020,9 +6011,13 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "value-bag"
version = "1.4.2"
version = "1.0.0-alpha.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a72e1902dde2bd6441347de2b70b7f5d59bf157c6c62f0c44572607a1d55bbe"
checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55"
dependencies = [
"ctor",
"version_check",
]
[[package]]
name = "vcpkg"
@@ -6055,6 +6050,7 @@ dependencies = [
"tokio-util",
"tracing",
"tracing-subscriber",
"workspace_hack",
]
[[package]]

View File

@@ -36,7 +36,6 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
azure_core = "0.16"
azure_identity = "0.16"
@@ -125,7 +124,6 @@ sentry = { version = "0.31", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"
smallvec = "1.11"
@@ -163,11 +161,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -204,7 +202,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
################# Binary contents sections

View File

@@ -27,7 +27,6 @@ RUN set -e \
FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
# Enable https://github.com/paritytech/cachepot to cache Rust crates' compilation results in Docker builds.
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
@@ -79,9 +78,9 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/

View File

@@ -72,10 +72,6 @@ neon: postgres-headers walproposer-lib
#
$(POSTGRES_INSTALL_DIR)/build/%/config.status:
+@echo "Configuring Postgres $* build"
@test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \
echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
exit 1; }
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
(cd $(POSTGRES_INSTALL_DIR)/build/$* && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure \

View File

@@ -156,7 +156,6 @@ fn main() -> Result<()> {
let path = Path::new(sp);
let file = File::open(path)?;
spec = Some(serde_json::from_reader(file)?);
live_config_allowed = true;
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
live_config_allowed = true;
@@ -278,26 +277,32 @@ fn main() -> Result<()> {
if #[cfg(target_os = "linux")] {
use std::env;
use tokio_util::sync::CancellationToken;
let vm_monitor_addr = matches
.get_one::<String>("vm-monitor-addr")
.expect("--vm-monitor-addr should always be set because it has a default arg");
use tracing::warn;
let vm_monitor_addr = matches.get_one::<String>("vm-monitor-addr");
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
let cgroup = matches.get_one::<String>("cgroup");
let file_cache_on_disk = matches.get_flag("file-cache-on-disk");
// Only make a runtime if we need to.
// Note: it seems like you can make a runtime in an inner scope and
// if you start a task in it it won't be dropped. However, make it
// in the outermost scope just to be safe.
let rt = if env::var_os("AUTOSCALING").is_some() {
Some(
let rt = match (env::var_os("AUTOSCALING"), vm_monitor_addr) {
(None, None) => None,
(None, Some(_)) => {
warn!("--vm-monitor-addr option set but AUTOSCALING env var not present");
None
}
(Some(_), None) => {
panic!("AUTOSCALING env var present but --vm-monitor-addr option not set")
}
(Some(_), Some(_)) => Some(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.expect("failed to create tokio runtime for monitor")
)
} else {
None
.expect("failed to create tokio runtime for monitor"),
),
};
// This token is used internally by the monitor to clean up all threads
@@ -308,7 +313,8 @@ fn main() -> Result<()> {
Box::leak(Box::new(vm_monitor::Args {
cgroup: cgroup.cloned(),
pgconnstr: file_cache_connstr.cloned(),
addr: vm_monitor_addr.clone(),
addr: vm_monitor_addr.cloned().unwrap(),
file_cache_on_disk,
})),
token.clone(),
))
@@ -480,8 +486,6 @@ fn cli() -> clap::Command {
.value_name("FILECACHE_CONNSTR"),
)
.arg(
// DEPRECATED, NO LONGER DOES ANYTHING.
// See https://github.com/neondatabase/cloud/issues/7516
Arg::new("file-cache-on-disk")
.long("file-cache-on-disk")
.action(clap::ArgAction::SetTrue),

View File

@@ -710,12 +710,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
@@ -728,9 +724,9 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
self.pg_reload_conf()?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.

View File

@@ -1,7 +1,7 @@
//!
//! Various tools and helpers to handle cluster / compute node (Postgres)
//! configuration.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
//!
pub mod checker;
pub mod config;
pub mod configurator;

View File

@@ -193,16 +193,11 @@ impl Escaping for PgIdent {
/// Build a list of existing Postgres roles
pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result<Vec<Role>> {
let postgres_roles = xact
.query(
"SELECT rolname, rolpassword, rolreplication, rolbypassrls FROM pg_catalog.pg_authid",
&[],
)?
.query("SELECT rolname, rolpassword FROM pg_catalog.pg_authid", &[])?
.iter()
.map(|row| Role {
name: row.get("rolname"),
encrypted_password: row.get("rolpassword"),
replication: Some(row.get("rolreplication")),
bypassrls: Some(row.get("rolbypassrls")),
options: None,
})
.collect();

View File

@@ -24,7 +24,7 @@ fn do_control_plane_request(
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
.header("Authorization", jwt)
.send()
.map_err(|e| {
(
@@ -68,7 +68,7 @@ pub fn get_spec_from_control_plane(
base_uri: &str,
compute_id: &str,
) -> Result<Option<ComputeSpec>> {
let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
Ok(v) => v,
Err(_) => "".to_string(),
@@ -265,8 +265,6 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let action = if let Some(r) = pg_role {
if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
|| (r.encrypted_password.is_some() && role.encrypted_password.is_none())
|| !r.bypassrls.unwrap_or(false)
|| !r.replication.unwrap_or(false)
{
RoleAction::Update
} else if let Some(pg_pwd) = &r.encrypted_password {
@@ -298,8 +296,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
match action {
RoleAction::None => {}
RoleAction::Update => {
let mut query: String =
format!("ALTER ROLE {} BYPASSRLS REPLICATION", name.pg_quote());
let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
}

View File

@@ -2,6 +2,7 @@ use crate::{background_process, local_env::LocalEnv};
use anyhow::anyhow;
use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::{path::PathBuf, process::Child};
use utils::id::{NodeId, TenantId};
@@ -13,10 +14,12 @@ pub struct AttachmentService {
const COMMAND: &str = "attachment_service";
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
pub node_id: Option<NodeId>,
pub pageserver_id: Option<NodeId>,
}
#[derive(Serialize, Deserialize)]
@@ -82,7 +85,7 @@ impl AttachmentService {
.control_plane_api
.clone()
.unwrap()
.join("attach-hook")
.join("attach_hook")
.unwrap();
let client = reqwest::blocking::ClientBuilder::new()
.build()
@@ -90,7 +93,7 @@ impl AttachmentService {
let request = AttachHookRequest {
tenant_id,
node_id: Some(pageserver_id),
pageserver_id: Some(pageserver_id),
};
let response = client.post(url).json(&request).send()?;

View File

@@ -262,7 +262,7 @@ where
P: Into<Utf8PathBuf>,
{
let path: Utf8PathBuf = path.into();
// SAFETY:
// SAFETY
// pre_exec is marked unsafe because it runs between fork and exec.
// Why is that dangerous in various ways?
// Long answer: https://github.com/rust-lang/rust/issues/39575

View File

@@ -12,7 +12,6 @@ use hyper::{Body, Request, Response};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::{collections::HashMap, sync::Arc};
use utils::http::endpoint::request_span;
use utils::logging::{self, LogFormat};
use utils::signals::{ShutdownSignals, Signal};
@@ -172,7 +171,7 @@ async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiE
state.generation += 1;
response.tenants.push(ReAttachResponseTenant {
id: *t,
gen: state.generation,
generation: state.generation,
});
}
}
@@ -218,31 +217,27 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
.tenants
.entry(attach_req.tenant_id)
.or_insert_with(|| TenantState {
pageserver: attach_req.node_id,
pageserver: attach_req.pageserver_id,
generation: 0,
});
if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
if let Some(attaching_pageserver) = attach_req.pageserver_id.as_ref() {
tenant_state.generation += 1;
tracing::info!(
tenant_id = %attach_req.tenant_id,
ps_id = %attaching_pageserver,
generation = %tenant_state.generation,
"issuing",
"attach_hook: issuing generation {} to pageserver {}",
attaching_pageserver,
tenant_state.generation
);
} else if let Some(ps_id) = tenant_state.pageserver {
tracing::info!(
tenant_id = %attach_req.tenant_id,
%ps_id,
generation = %tenant_state.generation,
"dropping",
"attach_hook: dropping pageserver {} in generation {}",
ps_id,
tenant_state.generation
);
} else {
tracing::info!(
tenant_id = %attach_req.tenant_id,
"no-op: tenant already has no pageserver");
tracing::info!("attach_hook: no-op: tenant already has no pageserver");
}
tenant_state.pageserver = attach_req.node_id;
tenant_state.pageserver = attach_req.pageserver_id;
let generation = tenant_state.generation;
locked.save().await.map_err(ApiError::InternalServerError)?;
@@ -250,7 +245,7 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
json_response(
StatusCode::OK,
AttachHookResponse {
gen: attach_req.node_id.map(|_| generation),
gen: attach_req.pageserver_id.map(|_| generation),
},
)
}
@@ -258,9 +253,9 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router()
.data(Arc::new(State::new(persistent_state)))
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
.post("/re-attach", handle_re_attach)
.post("/validate", handle_validate)
.post("/attach_hook", handle_attach_hook)
}
#[tokio::main]

View File

@@ -798,24 +798,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
ep.start(&auth_token, safekeepers, remote_ext_config)?;
}
}
"reconfigure" => {
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?;
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
Some(NodeId(
id_str.parse().context("while parsing pageserver id")?,
))
} else {
None
};
endpoint.reconfigure(pageserver_id)?;
}
"stop" => {
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
@@ -1387,12 +1369,6 @@ fn cli() -> Command {
.arg(safekeepers_arg)
.arg(remote_ext_config_args)
)
.subcommand(Command::new("reconfigure")
.about("Reconfigure the endpoint")
.arg(endpoint_pageserver_id_arg)
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
)
.subcommand(
Command::new("stop")
.arg(endpoint_id_arg)

View File

@@ -46,6 +46,7 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
@@ -56,10 +57,13 @@ use compute_api::responses::{ComputeState, ComputeStatus};
use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
endpoint_id: String,
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
mode: ComputeMode,
pg_port: u16,
@@ -410,32 +414,16 @@ impl Endpoint {
);
}
Ok(())
}
fn wait_for_compute_ctl_to_exit(&self) -> Result<()> {
// Also wait for the compute_ctl process to die. It might have some cleanup
// work to do after postgres stops, like syncing safekeepers, etc.
//
// TODO use background_process::stop_process instead
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
let pid = nix::unistd::Pid::from_raw(pid as i32);
crate::background_process::wait_until_stopped("compute_ctl", pid)?;
Ok(())
}
fn read_postgresql_conf(&self) -> Result<String> {
// Slurp the endpoints/<endpoint id>/postgresql.conf file into
// memory. We will include it in the spec file that we pass to
// `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
// in the data directory.
let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
match std::fs::read(&postgresql_conf_path) {
Ok(content) => Ok(String::from_utf8(content)?),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
Err(e) => Err(anyhow::Error::new(e).context(format!(
"failed to read config file in {}",
postgresql_conf_path.to_str().unwrap()
))),
}
Ok(())
}
pub fn start(
@@ -448,7 +436,21 @@ impl Endpoint {
anyhow::bail!("The endpoint is already running");
}
let postgresql_conf = self.read_postgresql_conf()?;
// Slurp the endpoints/<endpoint id>/postgresql.conf file into
// memory. We will include it in the spec file that we pass to
// `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
// in the data directory.
let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
let postgresql_conf = match std::fs::read(&postgresql_conf_path) {
Ok(content) => String::from_utf8(content)?,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(),
Err(e) => {
return Err(anyhow::Error::new(e).context(format!(
"failed to read config file in {}",
postgresql_conf_path.to_str().unwrap()
)))
}
};
// We always start the compute node from scratch, so if the Postgres
// data dir exists from a previous launch, remove it first.
@@ -619,61 +621,6 @@ impl Endpoint {
}
}
pub fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
let mut spec: ComputeSpec = {
let spec_path = self.endpoint_path().join("spec.json");
let file = std::fs::File::open(spec_path)?;
serde_json::from_reader(file)?
};
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
if let Some(pageserver_id) = pageserver_id {
let endpoint_config_path = self.endpoint_path().join("endpoint.json");
let mut endpoint_conf: EndpointConf = {
let file = std::fs::File::open(&endpoint_config_path)?;
serde_json::from_reader(file)?
};
endpoint_conf.pageserver_id = pageserver_id;
std::fs::write(
endpoint_config_path,
serde_json::to_string_pretty(&endpoint_conf)?,
)?;
let pageserver =
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
let ps_http_conf = &pageserver.pg_connection_config;
let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
}
let client = reqwest::blocking::Client::new();
let response = client
.post(format!(
"http://{}:{}/configure",
self.http_address.ip(),
self.http_address.port()
))
.body(format!(
"{{\"spec\":{}}}",
serde_json::to_string_pretty(&spec)?
))
.send()?;
let status = response.status();
if !(status.is_client_error() || status.is_server_error()) {
Ok(())
} else {
let url = response.url().to_owned();
let msg = match response.text() {
Ok(err_body) => format!("Error: {}", err_body),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
}
}
pub fn stop(&self, destroy: bool) -> Result<()> {
// If we are going to destroy data directory,
// use immediate shutdown mode, otherwise,
@@ -682,25 +629,15 @@ impl Endpoint {
// Postgres is always started from scratch, so stop
// without destroy only used for testing and debugging.
//
self.pg_ctl(
if destroy {
&["-m", "immediate", "stop"]
} else {
&["stop"]
},
&None,
)?;
// Also wait for the compute_ctl process to die. It might have some cleanup
// work to do after postgres stops, like syncing safekeepers, etc.
//
self.wait_for_compute_ctl_to_exit()?;
if destroy {
self.pg_ctl(&["-m", "immediate", "stop"], &None)?;
println!(
"Destroying postgres data directory '{}'",
self.pgdata().to_str().unwrap()
);
std::fs::remove_dir_all(self.endpoint_path())?;
} else {
self.pg_ctl(&["stop"], &None)?;
}
Ok(())
}

View File

@@ -1,10 +1,11 @@
//! Local control plane.
//!
//! Can start, configure and stop postgres instances running as a local processes.
//!
//! Intended to be used in integration tests and in CLI tools for
//! local installations.
#![deny(clippy::undocumented_unsafe_blocks)]
//
// Local control plane.
//
// Can start, configure and stop postgres instances running as a local processes.
//
// Intended to be used in integration tests and in CLI tools for
// local installations.
//
pub mod attachment_service;
mod background_process;

View File

@@ -8,6 +8,7 @@ use anyhow::{bail, ensure, Context};
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::env;
use std::fs;
@@ -32,6 +33,7 @@ pub const DEFAULT_PG_VERSION: u32 = 15;
// to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
// an example.
//
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct LocalEnv {
// Base directory for all the nodes (the pageserver, safekeepers and
@@ -57,6 +59,7 @@ pub struct LocalEnv {
// Default tenant ID to use with the 'neon_local' command line utility, when
// --tenant_id is not explicitly specified.
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub default_tenant_id: Option<TenantId>,
// used to issue tokens during e.g pg start
@@ -81,6 +84,7 @@ pub struct LocalEnv {
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}

View File

@@ -1,108 +0,0 @@
# Updating Postgres
## Minor Versions
When upgrading to a new minor version of Postgres, please follow these steps:
_Example: 15.4 is the new minor version to upgrade to from 15.3._
1. Clone the Neon Postgres repository if you have not done so already.
```shell
git clone git@github.com:neondatabase/postgres.git
```
1. Add the Postgres upstream remote.
```shell
git remote add upstream https://git.postgresql.org/git/postgresql.git
```
1. Create a new branch based on the stable branch you are updating.
```shell
git checkout -b my-branch REL_15_STABLE_neon
```
1. Tag the last commit on the stable branch you are updating.
```shell
git tag REL_15_3_neon
```
1. Push the new tag to the Neon Postgres repository.
```shell
git push origin REL_15_3_neon
```
1. Find the release tags you're looking for. They are of the form `REL_X_Y`.
1. Rebase the branch you created on the tag and resolve any conflicts.
```shell
git fetch upstream REL_15_4
git rebase REL_15_4
```
1. Run the Postgres test suite to make sure our commits have not affected
Postgres in a negative way.
```shell
make check
# OR
meson test -C builddir
```
1. Push your branch to the Neon Postgres repository.
```shell
git push origin my-branch
```
1. Clone the Neon repository if you have not done so already.
```shell
git clone git@github.com:neondatabase/neon.git
```
1. Create a new branch.
1. Change the `revisions.json` file to point at the HEAD of your Postgres
branch.
1. Update the Git submodule.
```shell
git submodule set-branch --branch my-branch vendor/postgres-v15
git submodule update --remote vendor/postgres-v15
```
1. Run the Neon test suite to make sure that Neon is still good to go on this
minor Postgres release.
```shell
./scripts/poetry -k pg15
```
1. Commit your changes.
1. Create a pull request, and wait for CI to go green.
1. Force push the rebased Postgres branches into the Neon Postgres repository.
```shell
git push --force origin my-branch:REL_15_STABLE_neon
```
It may require disabling various branch protections.
1. Update your Neon PR to point at the branches.
```shell
git submodule set-branch --branch REL_15_STABLE_neon vendor/postgres-v15
git commit --amend --no-edit
git push --force origin
```
1. Merge the pull request after getting approval(s) and CI completion.

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod requests;
pub mod responses;
pub mod spec;

View File

@@ -6,6 +6,7 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -18,6 +19,7 @@ pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[serde_as]
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct ComputeSpec {
pub format_version: f32,
@@ -48,12 +50,12 @@ pub struct ComputeSpec {
// these, and instead set the "neon.tenant_id", "neon.timeline_id",
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
// updated to fill these fields, we can make these non optional.
#[serde_as(as = "Option<DisplayFromStr>")]
pub tenant_id: Option<TenantId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub pageserver_connstring: Option<String>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,
@@ -138,13 +140,14 @@ impl RemoteExtSpec {
}
}
#[serde_as]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeMode {
/// A read-write node
#[default]
Primary,
/// A read-only node, pinned at a particular LSN
Static(Lsn),
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
/// A read-only node that follows the tip of the branch in hot standby mode
///
/// Future versions may want to distinguish between replicas with hot standby
@@ -187,8 +190,6 @@ pub struct DeltaOp {
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub replication: Option<bool>,
pub bypassrls: Option<bool>,
pub options: GenericOptions,
}

View File

@@ -1,6 +1,6 @@
//!
//! Shared code for consumption metics collection
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
//!
use chrono::{DateTime, Utc};
use rand::Rng;
use serde::{Deserialize, Serialize};

View File

@@ -2,7 +2,6 @@
//! make sure that we use the same dep version everywhere.
//! Otherwise, we might not see all metrics registered via
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
@@ -90,14 +89,14 @@ pub const DISK_WRITE_SECONDS_BUCKETS: &[f64] = &[
0.000_050, 0.000_100, 0.000_500, 0.001, 0.003, 0.005, 0.01, 0.05, 0.1, 0.3, 0.5,
];
pub fn set_build_info_metric(revision: &str, build_tag: &str) {
pub fn set_build_info_metric(revision: &str) {
let metric = register_int_gauge_vec!(
"libmetrics_build_info",
"Build/version information",
&["revision", "build_tag"]
&["revision"]
)
.expect("Failed to register build info metric");
metric.with_label_values(&[revision, build_tag]).set(1);
metric.with_label_values(&[revision]).set(1);
}
// Records I/O stats in a "cross-platform" way.

View File

@@ -4,6 +4,7 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId};
#[derive(Serialize, Deserialize)]
@@ -11,10 +12,12 @@ pub struct ReAttachRequest {
pub node_id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub gen: u32,
pub generation: u32,
}
#[derive(Serialize, Deserialize)]
@@ -22,8 +25,10 @@ pub struct ReAttachResponse {
pub tenants: Vec<ReAttachResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateRequestTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub gen: u32,
}
@@ -38,8 +43,10 @@ pub struct ValidateResponse {
pub tenants: Vec<ValidateResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub valid: bool,
}

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
/// Public API types

View File

@@ -6,7 +6,7 @@ use std::{
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
completion,
@@ -110,6 +110,7 @@ impl TenantState {
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
@@ -174,19 +175,25 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_timeline_id: TimelineId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
@@ -195,6 +202,7 @@ pub struct TenantCreateRequest {
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLoadRequest {
@@ -271,26 +279,31 @@ pub struct LocationConfig {
pub tenant_conf: TenantConfig,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct TenantCreateResponse(pub TenantId);
pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
#[derive(Serialize)]
pub struct StatusResponse {
pub id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
@@ -362,8 +375,10 @@ pub enum TenantAttachmentStatus {
Failed { reason: String },
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
// NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
pub state: TenantState,
@@ -374,22 +389,33 @@ pub struct TenantInfo {
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
/// The LSN that we have succesfully uploaded to remote storage
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
/// The LSN that we are advertizing to safekeepers
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn_visible: Lsn,
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
@@ -401,6 +427,7 @@ pub struct TimelineInfo {
pub timeline_dir_layer_file_size_sum: Option<u64>,
pub wal_source_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
@@ -497,13 +524,23 @@ pub struct LayerAccessStats {
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open { lsn_start: Lsn },
Frozen { lsn_start: Lsn, lsn_end: Lsn },
Open {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
},
Frozen {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
},
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
@@ -511,7 +548,9 @@ pub enum HistoricLayerInfo {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
@@ -520,6 +559,7 @@ pub enum HistoricLayerInfo {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,

View File

@@ -2,8 +2,6 @@
//! To use, create PostgresBackend and run() it, passing the Handler
//! implementation determining how to process the queries. Currently its API
//! is rather narrow, but we can extend it once required.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::Context;
use bytes::Bytes;
use futures::pin_mut;
@@ -244,7 +242,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
}
}
/// Cancellation safe as long as the underlying IO is cancellation safe.
async fn shutdown(&mut self) -> io::Result<()> {
match self {
MaybeWriteOnly::Full(framed) => framed.shutdown().await,
@@ -396,23 +393,13 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
shutdown_watcher: F,
) -> Result<(), QueryError>
where
F: Fn() -> S + Clone,
F: Fn() -> S,
S: Future,
{
let ret = self
.run_message_loop(handler, shutdown_watcher.clone())
.await;
tokio::select! {
_ = shutdown_watcher() => {
// do nothing; we most likely got already stopped by shutdown and will log it next.
}
_ = self.framed.shutdown() => {
// socket might be already closed, e.g. if previously received error,
// so ignore result.
},
}
let ret = self.run_message_loop(handler, shutdown_watcher).await;
// socket might be already closed, e.g. if previously received error,
// so ignore result.
self.framed.shutdown().await.ok();
match ret {
Ok(()) => Ok(()),
Err(QueryError::Shutdown) => {
@@ -730,17 +717,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
trace!("got query {query_string:?}");
if let Err(e) = handler.process_query(self, query_string).await {
match e {
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
e => {
log_query_error(query_string, &e);
let short_error = short_error(&e);
self.write_message_noflush(&BeMessage::ErrorResponse(
&short_error,
Some(e.pg_error_code()),
))?;
}
}
log_query_error(query_string, &e);
let short_error = short_error(&e);
self.write_message_noflush(&BeMessage::ErrorResponse(
&short_error,
Some(e.pg_error_code()),
))?;
}
self.write_message_noflush(&BeMessage::ReadyForQuery)?;
}

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;

View File

@@ -8,7 +8,6 @@
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.
#![allow(clippy::duplicate_mod)]
#![deny(clippy::undocumented_unsafe_blocks)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;
@@ -21,7 +20,6 @@ macro_rules! postgres_ffi {
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
#![allow(clippy::undocumented_unsafe_blocks)]
use serde::{Deserialize, Serialize};
include!(concat!(

View File

@@ -14,7 +14,6 @@ macro_rules! xlog_utils_test {
($version:ident) => {
#[path = "."]
mod $version {
#[allow(unused_imports)]
pub use postgres_ffi::$version::wal_craft_test_export::*;
#[allow(clippy::duplicate_mod)]
#[cfg(test)]

View File

@@ -214,24 +214,27 @@ where
}
}
/// Cancellation safe as long as the AsyncWrite is cancellation safe.
async fn flush<S: AsyncWrite + Unpin>(
stream: &mut S,
write_buf: &mut BytesMut,
) -> Result<(), io::Error> {
while write_buf.has_remaining() {
let bytes_written = stream.write_buf(write_buf).await?;
let bytes_written = stream.write(write_buf.chunk()).await?;
if bytes_written == 0 {
return Err(io::Error::new(
ErrorKind::WriteZero,
"failed to write message",
));
}
// The advanced part will be garbage collected, likely during shifting
// data left on next attempt to write to buffer when free space is not
// enough.
write_buf.advance(bytes_written);
}
write_buf.clear();
stream.flush().await
}
/// Cancellation safe as long as the AsyncWrite is cancellation safe.
async fn shutdown<S: AsyncWrite + Unpin>(
stream: &mut S,
write_buf: &mut BytesMut,

View File

@@ -1,7 +1,6 @@
//! Postgres protocol messages serialization-deserialization. See
//! <https://www.postgresql.org/docs/devel/protocol-message-formats.html>
//! on message formats.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod framed;

View File

@@ -1,18 +1,21 @@
//! Azure Blob Storage wrapper
use std::collections::HashMap;
use std::env;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::{borrow::Cow, io::Cursor};
use std::{borrow::Cow, collections::HashMap, io::Cursor};
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
use anyhow::Result;
use azure_core::request_options::{MaxResults, Metadata, Range};
use azure_core::Header;
use azure_identity::DefaultAzureCredential;
use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::ClientBuilder;
use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
use azure_storage_blobs::{
blob::operations::GetBlobBuilder,
prelude::{BlobClient, ContainerClient},
};
use futures_util::StreamExt;
use http_types::StatusCode;
use tokio::io::AsyncRead;
@@ -20,8 +23,8 @@ use tracing::debug;
use crate::s3_bucket::RequestKind;
use crate::{
AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
RemoteStorage, StorageMetadata,
AzureConfig, ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage,
StorageMetadata,
};
pub struct AzureBlobStorage {
@@ -109,19 +112,16 @@ impl AzureBlobStorage {
async fn download_for_builder(
&self,
metadata: StorageMetadata,
builder: GetBlobBuilder,
) -> Result<Download, DownloadError> {
let mut response = builder.into_stream();
let mut metadata = HashMap::new();
// TODO give proper streaming response instead of buffering into RAM
// https://github.com/neondatabase/neon/issues/5563
let mut buf = Vec::new();
while let Some(part) = response.next().await {
let part = part.map_err(to_download_error)?;
if let Some(blob_meta) = part.blob.metadata {
metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
}
let data = part
.data
.collect()
@@ -131,9 +131,28 @@ impl AzureBlobStorage {
}
Ok(Download {
download_stream: Box::pin(Cursor::new(buf)),
metadata: Some(StorageMetadata(metadata)),
metadata: Some(metadata),
})
}
// TODO get rid of this function once we have metadata included in the response
// https://github.com/Azure/azure-sdk-for-rust/issues/1439
async fn get_metadata(
&self,
blob_client: &BlobClient,
) -> Result<StorageMetadata, DownloadError> {
let builder = blob_client.get_metadata();
let response = builder.into_future().await.map_err(to_download_error)?;
let mut map = HashMap::new();
for md in response.metadata.iter() {
map.insert(
md.name().as_str().to_string(),
md.value().as_str().to_string(),
);
}
Ok(StorageMetadata(map))
}
async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
self.concurrency_limiter
@@ -165,11 +184,10 @@ fn to_download_error(error: azure_core::Error) -> DownloadError {
#[async_trait::async_trait]
impl RemoteStorage for AzureBlobStorage {
async fn list(
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError> {
) -> Result<Vec<RemotePath>, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_name(p))
@@ -177,19 +195,16 @@ impl RemoteStorage for AzureBlobStorage {
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
let mut builder = self.client.list_blobs();
if let ListingMode::WithDelimiter = mode {
builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let mut builder = self
.client
.list_blobs()
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
if let Some(prefix) = list_prefix {
builder = builder.prefix(Cow::from(prefix.to_owned()));
@@ -200,23 +215,46 @@ impl RemoteStorage for AzureBlobStorage {
}
let mut response = builder.into_stream();
let mut res = Listing::default();
while let Some(l) = response.next().await {
let entry = l.map_err(to_download_error)?;
let prefix_iter = entry
let mut res = Vec::new();
while let Some(entry) = response.next().await {
let entry = entry.map_err(to_download_error)?;
let name_iter = entry
.blobs
.prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name));
res.prefixes.extend(prefix_iter);
let blob_iter = entry
.blobs
.blobs()
.map(|k| self.name_to_relative_path(&k.name));
res.keys.extend(blob_iter);
res.extend(name_iter);
}
Ok(res)
}
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let folder_name = folder
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone());
let mut builder = self.client.list_blobs();
if let Some(folder_name) = folder_name {
builder = builder.prefix(Cow::from(folder_name.to_owned()));
}
if let Some(limit) = self.max_keys_per_list_response {
builder = builder.max_results(MaxResults::new(limit));
}
let mut response = builder.into_stream();
let mut res = Vec::new();
while let Some(l) = response.next().await {
let entry = l.map_err(anyhow::Error::new)?;
let name_iter = entry
.blobs
.blobs()
.map(|bl| self.name_to_relative_path(&bl.name));
res.extend(name_iter);
}
Ok(res)
}
async fn upload(
&self,
mut from: impl AsyncRead + Unpin + Send + Sync + 'static,
@@ -250,9 +288,11 @@ impl RemoteStorage for AzureBlobStorage {
let _permit = self.permit(RequestKind::Get).await;
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
let metadata = self.get_metadata(&blob_client).await?;
let builder = blob_client.get();
self.download_for_builder(builder).await
self.download_for_builder(metadata, builder).await
}
async fn download_byte_range(
@@ -264,6 +304,8 @@ impl RemoteStorage for AzureBlobStorage {
let _permit = self.permit(RequestKind::Get).await;
let blob_client = self.client.blob_client(self.relative_path_to_name(from));
let metadata = self.get_metadata(&blob_client).await?;
let mut builder = blob_client.get();
if let Some(end_exclusive) = end_exclusive {
@@ -278,7 +320,7 @@ impl RemoteStorage for AzureBlobStorage {
builder = builder.range(Range::new(start_inclusive, end_exclusive));
}
self.download_for_builder(builder).await
self.download_for_builder(metadata, builder).await
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
@@ -311,8 +353,4 @@ impl RemoteStorage for AzureBlobStorage {
}
Ok(())
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
unimplemented!()
}
}

View File

@@ -6,8 +6,6 @@
//! * [`s3_bucket`] uses AWS S3 bucket as an external storage
//! * [`azure_blob`] allows to use Azure Blob storage as an external storage
//!
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
mod azure_blob;
mod local_fs;
@@ -114,7 +112,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
pub fn join(&self, segment: &Utf8Path) -> Self {
Self(self.0.join(segment))
}
@@ -131,22 +129,6 @@ impl RemotePath {
}
}
/// We don't need callers to be able to pass arbitrary delimiters: just control
/// whether listings will use a '/' separator or not.
///
/// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
/// NoDelimiter mode will only populate `keys`.
pub enum ListingMode {
WithDelimiter,
NoDelimiter,
}
#[derive(Default)]
pub struct Listing {
pub prefixes: Vec<RemotePath>,
pub keys: Vec<RemotePath>,
}
/// Storage (potentially remote) API to manage its state.
/// This storage tries to be unaware of any layered repository context,
/// providing basic CRUD operations for storage files.
@@ -159,13 +141,8 @@ pub trait RemoteStorage: Send + Sync + 'static {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let result = self
.list(prefix, ListingMode::WithDelimiter)
.await?
.prefixes;
Ok(result)
}
) -> Result<Vec<RemotePath>, DownloadError>;
/// Lists all files in directory "recursively"
/// (not really recursively, because AWS has a flat namespace)
/// Note: This is subtely different than list_prefixes,
@@ -177,16 +154,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// whereas,
/// list_prefixes("foo/bar/") = ["cat", "dog"]
/// See `test_real_s3.rs` for more details.
async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
Ok(result)
}
async fn list(
&self,
prefix: Option<&RemotePath>,
_mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError>;
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
@@ -215,8 +183,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()>;
}
pub struct Download {
@@ -239,9 +205,6 @@ pub enum DownloadError {
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// A cancellation token aborted the download, typically during
/// tenant detach or process shutdown.
Cancelled,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
@@ -252,7 +215,6 @@ impl std::fmt::Display for DownloadError {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
}
@@ -272,19 +234,6 @@ pub enum GenericRemoteStorage {
}
impl GenericRemoteStorage {
pub async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError> {
match self {
Self::LocalFs(s) => s.list(prefix, mode).await,
Self::AwsS3(s) => s.list(prefix, mode).await,
Self::AzureBlob(s) => s.list(prefix, mode).await,
Self::Unreliable(s) => s.list(prefix, mode).await,
}
}
// A function for listing all the files in a "directory"
// Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
@@ -379,15 +328,6 @@ impl GenericRemoteStorage {
Self::Unreliable(s) => s.delete_objects(paths).await,
}
}
pub async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.copy_object(src, dst).await,
Self::AwsS3(s) => s.copy_object(src, dst).await,
Self::AzureBlob(s) => s.copy_object(src, dst).await,
Self::Unreliable(s) => s.copy_object(src, dst).await,
}
}
}
impl GenericRemoteStorage {

View File

@@ -15,7 +15,7 @@ use tokio::{
use tracing::*;
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
use crate::{Download, DownloadError, Listing, ListingMode, RemotePath};
use crate::{Download, DownloadError, RemotePath};
use super::{RemoteStorage, StorageMetadata};
@@ -75,7 +75,7 @@ impl LocalFs {
}
#[cfg(test)]
async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
Ok(get_all_files(&self.storage_root, true)
.await?
.into_iter()
@@ -89,10 +89,52 @@ impl LocalFs {
})
.collect())
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
prefixes.push(
prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
),
)
}
Ok(prefixes)
}
// recursively lists all files in a directory,
// mirroring the `list_files` for `s3_bucket`
async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let full_path = match folder {
Some(folder) => folder.with_base(&self.storage_root),
None => self.storage_root.clone(),
@@ -144,70 +186,6 @@ impl LocalFs {
Ok(files)
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
let mut result = Listing::default();
if let ListingMode::NoDelimiter = mode {
let keys = self
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
result.keys = keys
.into_iter()
.filter(|k| {
let path = k.with_base(&self.storage_root);
!path.is_dir()
})
.collect();
return Ok(result);
}
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
let stripped = prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
);
if prefix.is_dir() {
result.prefixes.push(stripped);
} else {
result.keys.push(stripped);
}
}
Ok(result)
}
async fn upload(
&self,
@@ -393,27 +371,6 @@ impl RemoteStorage for LocalFs {
}
Ok(())
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
let src_path = src.with_base(&self.storage_root);
let dst_path = dst.with_base(&self.storage_root);
// If the destination file already exists, we need to delete it first.
if dst_path.exists() {
fs::remove_file(&dst_path).await?;
}
// Copy the file.
fs::copy(&src_path, &dst_path).await?;
// Copy the metadata.
let metadata_path = storage_metadata_path(&src_path);
if metadata_path.exists() {
fs::copy(&metadata_path, storage_metadata_path(&dst_path)).await?;
}
Ok(())
}
}
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
@@ -522,7 +479,7 @@ mod fs_tests {
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
assert_eq!(
storage.list_all().await?,
storage.list().await?,
vec![target_path_1.clone()],
"Should list a single file after first upload"
);
@@ -710,7 +667,7 @@ mod fs_tests {
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
storage.delete(&upload_target).await?;
assert!(storage.list_all().await?.is_empty());
assert!(storage.list().await?.is_empty());
storage
.delete(&upload_target)
@@ -768,43 +725,6 @@ mod fs_tests {
Ok(())
}
#[tokio::test]
async fn list() -> anyhow::Result<()> {
// No delimiter: should recursively list everything
let storage = create_storage()?;
let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
let listing = storage.list(None, ListingMode::NoDelimiter).await?;
assert!(listing.prefixes.is_empty());
assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
// Delimiter: should only go one deep
let listing = storage.list(None, ListingMode::WithDelimiter).await?;
assert_eq!(
listing.prefixes,
[RemotePath::from_string("timelines").unwrap()].to_vec()
);
assert!(listing.keys.is_empty());
// Delimiter & prefix
let listing = storage
.list(
Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
ListingMode::WithDelimiter,
)
.await?;
assert_eq!(
listing.prefixes,
[RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
.to_vec()
);
assert_eq!(listing.keys, [uncle.clone()].to_vec());
Ok(())
}
async fn upload_dummy_file(
storage: &LocalFs,
name: &str,
@@ -857,7 +777,7 @@ mod fs_tests {
}
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
let mut files = storage.list_all().await?;
let mut files = storage.list().await?;
files.sort_by(|a, b| a.0.cmp(&b.0));
Ok(files)
}

View File

@@ -30,8 +30,8 @@ use tracing::debug;
use super::StorageMetadata;
use crate::{
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, S3Config,
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
pub(super) mod metrics;
@@ -221,8 +221,6 @@ impl S3Bucket {
)),
}
}
}
pin_project_lite::pin_project! {
@@ -301,13 +299,13 @@ impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
async fn list(
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
) -> Result<Vec<RemotePath>, DownloadError> {
let kind = RequestKind::List;
let mut result = Listing::default();
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
@@ -316,33 +314,28 @@ impl RemoteStorage for S3Bucket {
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
let mut document_keys = Vec::new();
let mut continuation_token = None;
loop {
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
let mut request = self
let fetch_response = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token)
.set_max_keys(self.max_keys_per_list_response);
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let response = request
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.context("Failed to list S3 prefixes")
@@ -352,35 +345,71 @@ impl RemoteStorage for S3Bucket {
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &response, started_at);
.observe_elapsed(kind, &fetch_response, started_at);
let response = response?;
let fetch_response = fetch_response?;
let keys = response.contents().unwrap_or_default();
let empty = Vec::new();
let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
tracing::info!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
for object in keys {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
result.keys.push(remote_path);
}
result.prefixes.extend(
prefixes
.iter()
document_keys.extend(
fetch_response
.common_prefixes
.unwrap_or_default()
.into_iter()
.filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
);
continuation_token = match response.next_continuation_token {
continuation_token = match fetch_response.next_continuation_token {
Some(new_token) => Some(new_token),
None => break,
};
}
Ok(result)
Ok(document_keys)
}
/// See the doc for `RemoteStorage::list_files`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let kind = RequestKind::List;
let folder_name = folder
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// AWS may need to break the response into several parts
let mut continuation_token = None;
let mut all_files = vec![];
loop {
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
let response = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(folder_name.clone())
.set_continuation_token(continuation_token)
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.context("Failed to list files in S3 bucket");
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &response, started_at);
let response = response?;
for object in response.contents().unwrap_or_default() {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
all_files.push(remote_path);
}
match response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(all_files)
}
async fn upload(
@@ -517,11 +546,6 @@ impl RemoteStorage for S3Bucket {
let paths = std::array::from_ref(path);
self.delete_objects(paths).await
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
unimplemented!()
}
}
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
@@ -604,6 +628,4 @@ mod tests {
}
}
}
}

View File

@@ -5,9 +5,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Mutex;
use crate::{
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
};
use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata};
pub struct UnreliableWrapper {
inner: crate::GenericRemoteStorage,
@@ -97,15 +95,6 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list_files(folder).await
}
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
self.inner.list(prefix, mode).await
}
async fn upload(
&self,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
@@ -160,8 +149,4 @@ impl RemoteStorage for UnreliableWrapper {
}
Ok(())
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
unimplemented!()
}
}

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
/// Public API types

View File

@@ -1,18 +1,23 @@
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
};
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub peer_ids: Option<Vec<NodeId>>,
pub pg_version: u32,
pub system_id: Option<u64>,
pub wal_seg_size: Option<u32>,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
// If not passed, it is assigned to the beginning of commit_lsn segment.
pub local_start_lsn: Option<Lsn>,
@@ -23,6 +28,7 @@ fn lsn_invalid() -> Lsn {
}
/// Data about safekeeper's timeline, mirrors broker.proto.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term.
@@ -30,19 +36,25 @@ pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub flush_lsn: Lsn,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub commit_lsn: Lsn,
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub backup_lsn: Lsn,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub remote_consistent_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub local_start_lsn: Lsn,
/// A connection string to use for WAL receiving.

View File

@@ -1,6 +1,4 @@
//! Synthetic size calculation
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
mod calculation;
pub mod svg;

View File

@@ -32,8 +32,6 @@
//! .init();
//! }
//! ```
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
arc-swap.workspace = true
sentry.workspace = true
async-trait.workspace = true
anyhow.workspace = true
@@ -56,7 +55,6 @@ bytes.workspace = true
criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
serde_assert.workspace = true
[[bench]]
name = "benchmarks"

View File

@@ -1,8 +1,7 @@
// For details about authentication see docs/authentication.md
use arc_swap::ArcSwap;
use serde;
use std::{fs, sync::Arc};
use std::fs;
use anyhow::Result;
use camino::Utf8Path;
@@ -10,6 +9,7 @@ use jsonwebtoken::{
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use crate::id::TenantId;
@@ -32,9 +32,11 @@ pub enum Scope {
}
/// JWT payload. See docs/authentication.md for the format
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Claims {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub tenant_id: Option<TenantId>,
pub scope: Scope,
}
@@ -45,88 +47,31 @@ impl Claims {
}
}
pub struct SwappableJwtAuth(ArcSwap<JwtAuth>);
impl SwappableJwtAuth {
pub fn new(jwt_auth: JwtAuth) -> Self {
SwappableJwtAuth(ArcSwap::new(Arc::new(jwt_auth)))
}
pub fn swap(&self, jwt_auth: JwtAuth) {
self.0.swap(Arc::new(jwt_auth));
}
pub fn decode(&self, token: &str) -> Result<TokenData<Claims>> {
self.0.load().decode(token)
}
}
impl std::fmt::Debug for SwappableJwtAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Swappable({:?})", self.0.load())
}
}
pub struct JwtAuth {
decoding_keys: Vec<DecodingKey>,
decoding_key: DecodingKey,
validation: Validation,
}
impl JwtAuth {
pub fn new(decoding_keys: Vec<DecodingKey>) -> Self {
pub fn new(decoding_key: DecodingKey) -> Self {
let mut validation = Validation::default();
validation.algorithms = vec![STORAGE_TOKEN_ALGORITHM];
// The default 'required_spec_claims' is 'exp'. But we don't want to require
// expiration.
validation.required_spec_claims = [].into();
Self {
decoding_keys,
decoding_key,
validation,
}
}
pub fn from_key_path(key_path: &Utf8Path) -> Result<Self> {
let metadata = key_path.metadata()?;
let decoding_keys = if metadata.is_dir() {
let mut keys = Vec::new();
for entry in fs::read_dir(key_path)? {
let path = entry?.path();
if !path.is_file() {
// Ignore directories (don't recurse)
continue;
}
let public_key = fs::read(path)?;
keys.push(DecodingKey::from_ed_pem(&public_key)?);
}
keys
} else if metadata.is_file() {
let public_key = fs::read(key_path)?;
vec![DecodingKey::from_ed_pem(&public_key)?]
} else {
anyhow::bail!("path is neither a directory or a file")
};
if decoding_keys.is_empty() {
anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected.");
}
Ok(Self::new(decoding_keys))
let public_key = fs::read(key_path)?;
Ok(Self::new(DecodingKey::from_ed_pem(&public_key)?))
}
/// Attempt to decode the token with the internal decoding keys.
///
/// The function tries the stored decoding keys in succession,
/// and returns the first yielding a successful result.
/// If there is no working decoding key, it returns the last error.
pub fn decode(&self, token: &str) -> Result<TokenData<Claims>> {
let mut res = None;
for decoding_key in &self.decoding_keys {
res = Some(decode(token, decoding_key, &self.validation));
if let Some(Ok(res)) = res {
return Ok(res);
}
}
if let Some(res) = res {
res.map_err(anyhow::Error::new)
} else {
anyhow::bail!("no JWT decoding keys configured")
}
Ok(decode(token, &self.decoding_key, &self.validation)?)
}
}
@@ -187,7 +132,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJleHAiOjE3MDkyMDA4NzksImlhdCI6MTY3ODQ0MjQ3OX0.U3eA8j-uU-JnhzeO3EDHRuXLwkAUFCPxtGHEgw6p7Ccc3YRbFs2tmCdbD9PZEXP-XsxSeBQi1FY0YPcT3NXADw";
// Check it can be validated with the public key
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?]);
let auth = JwtAuth::new(DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?);
let claims_from_token = auth.decode(encoded_eddsa)?.claims;
assert_eq!(claims_from_token, expected_claims);
@@ -204,7 +149,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519)?;
// decode it back
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?]);
let auth = JwtAuth::new(DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519)?);
let decoded = auth.decode(&encoded)?;
assert_eq!(decoded.claims, claims);

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
///
/// See docs/rfcs/025-generation-numbers.md for detail on how generation
/// numbers are used.
#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
pub enum Generation {
// Generations with this magic value will not add a suffix to S3 keys, and will not
// be included in persisted index_part.json. This value is only to be used

View File

@@ -1,41 +0,0 @@
/// Useful type for asserting that expected bytes match reporting the bytes more readable
/// array-syntax compatible hex bytes.
///
/// # Usage
///
/// ```
/// use utils::Hex;
///
/// let actual = serialize_something();
/// let expected = [0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64];
///
/// // the type implements PartialEq and on mismatch, both sides are printed in 16 wide multiline
/// // output suffixed with an array style length for easier comparisons.
/// assert_eq!(Hex(&actual), Hex(&expected));
///
/// // with `let expected = [0x68];` the error would had been:
/// // assertion `left == right` failed
/// // left: [0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64; 11]
/// // right: [0x68; 1]
/// # fn serialize_something() -> Vec<u8> { "hello world".as_bytes().to_vec() }
/// ```
#[derive(PartialEq)]
pub struct Hex<'a>(pub &'a [u8]);
impl std::fmt::Debug for Hex<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
for (i, c) in self.0.chunks(16).enumerate() {
if i > 0 && !c.is_empty() {
writeln!(f, ", ")?;
}
for (j, b) in c.iter().enumerate() {
if j > 0 {
write!(f, ", ")?;
}
write!(f, "0x{b:02x}")?;
}
}
write!(f, "; {}]", self.0.len())
}
}

View File

@@ -1,4 +1,4 @@
use crate::auth::{Claims, SwappableJwtAuth};
use crate::auth::{Claims, JwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use anyhow::Context;
use hyper::header::{HeaderName, AUTHORIZATION};
@@ -14,11 +14,6 @@ use tracing::{self, debug, info, info_span, warn, Instrument};
use std::future::Future;
use std::str::FromStr;
use bytes::{Bytes, BytesMut};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
@@ -151,89 +146,94 @@ impl Drop for RequestCancelled {
}
}
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
pub struct ChannelWriter {
buffer: BytesMut,
pub tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
}
impl ChannelWriter {
pub fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
assert_ne!(buf_len, 0);
ChannelWriter {
// split about half off the buffer from the start, because we flush depending on
// capacity. first flush will come sooner than without this, but now resizes will
// have better chance of picking up the "other" half. not guaranteed of course.
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
}
}
pub fn flush0(&mut self) -> std::io::Result<usize> {
let n = self.buffer.len();
if n == 0 {
return Ok(0);
}
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
// throttle sending to allow reuse of our buffer in `write`.
self.tx.reserve().await.map_err(|_| ())?;
// now the response task has picked up the buffer and hopefully started
// sending it to the client.
Ok(())
});
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
self.written += n;
Ok(n)
}
pub fn flushed_bytes(&self) -> usize {
self.written
}
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
let remaining = self.buffer.capacity() - self.buffer.len();
let out_of_space = remaining < buf.len();
let original_len = buf.len();
if out_of_space {
let can_still_fit = buf.len() - remaining;
self.buffer.extend_from_slice(&buf[..can_still_fit]);
buf = &buf[can_still_fit..];
self.flush0()?;
}
// assume that this will often under normal operation just move the pointer back to the
// beginning of allocation, because previous split off parts are already sent and
// dropped.
self.buffer.extend_from_slice(buf);
Ok(original_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush0().map(|_| ())
}
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
use bytes::{Bytes, BytesMut};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
SERVE_METRICS_COUNT.inc();
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
struct ChannelWriter {
buffer: BytesMut,
tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
}
impl ChannelWriter {
fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
assert_ne!(buf_len, 0);
ChannelWriter {
// split about half off the buffer from the start, because we flush depending on
// capacity. first flush will come sooner than without this, but now resizes will
// have better chance of picking up the "other" half. not guaranteed of course.
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
}
}
fn flush0(&mut self) -> std::io::Result<usize> {
let n = self.buffer.len();
if n == 0 {
return Ok(0);
}
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
// throttle sending to allow reuse of our buffer in `write`.
self.tx.reserve().await.map_err(|_| ())?;
// now the response task has picked up the buffer and hopefully started
// sending it to the client.
Ok(())
});
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
self.written += n;
Ok(n)
}
fn flushed_bytes(&self) -> usize {
self.written
}
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
let remaining = self.buffer.capacity() - self.buffer.len();
let out_of_space = remaining < buf.len();
let original_len = buf.len();
if out_of_space {
let can_still_fit = buf.len() - remaining;
self.buffer.extend_from_slice(&buf[..can_still_fit]);
buf = &buf[can_still_fit..];
self.flush0()?;
}
// assume that this will often under normal operation just move the pointer back to the
// beginning of allocation, because previous split off parts are already sent and
// dropped.
self.buffer.extend_from_slice(buf);
Ok(original_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush0().map(|_| ())
}
}
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);
@@ -389,7 +389,7 @@ fn parse_token(header_value: &str) -> Result<&str, ApiError> {
}
pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
provide_auth: fn(&Request<Body>) -> Option<&SwappableJwtAuth>,
provide_auth: fn(&Request<Body>) -> Option<&JwtAuth>,
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
if let Some(auth) = provide_auth(&req) {

View File

@@ -3,7 +3,6 @@ use std::{fmt, str::FromStr};
use anyhow::Context;
use hex::FromHex;
use rand::Rng;
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -18,74 +17,12 @@ pub enum IdError {
///
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
///
/// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
/// Check the `serde_with::serde_as` documentation for options for more complex types.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
struct Id([u8; 16]);
impl Serialize for Id {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
self.0.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for Id {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> Visitor<'de> for IdVisitor {
type Value = Id;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 16])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 16] = Deserialize::deserialize(s)?;
Ok(Id::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Id::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
16,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
impl Id {
pub fn get_from_buf(buf: &mut impl bytes::Buf) -> Id {
let mut arr = [0u8; 16];
@@ -120,8 +57,6 @@ impl Id {
chunk[0] = HEX[((b >> 4) & 0xf) as usize];
chunk[1] = HEX[(b & 0xf) as usize];
}
// SAFETY: vec constructed out of `HEX`, it can only be ascii
unsafe { String::from_utf8_unchecked(buf) }
}
}
@@ -373,112 +308,3 @@ impl fmt::Display for NodeId {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use crate::bin_ser::BeSer;
use super::*;
#[test]
fn test_id_serde_non_human_readable() {
let original_id = Id([
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
]);
let expected_tokens = Tokens(vec![
Token::Tuple { len: 16 },
Token::U8(173),
Token::U8(80),
Token::U8(132),
Token::U8(115),
Token::U8(129),
Token::U8(226),
Token::U8(72),
Token::U8(254),
Token::U8(170),
Token::U8(201),
Token::U8(135),
Token::U8(108),
Token::U8(199),
Token::U8(26),
Token::U8(228),
Token::U8(24),
Token::TupleEnd,
]);
let serializer = Serializer::builder().is_human_readable(false).build();
let serialized_tokens = original_id.serialize(&serializer).unwrap();
assert_eq!(serialized_tokens, expected_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(serialized_tokens)
.build();
let deserialized_id = Id::deserialize(&mut deserializer).unwrap();
assert_eq!(deserialized_id, original_id);
}
#[test]
fn test_id_serde_human_readable() {
let original_id = Id([
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
]);
let expected_tokens = Tokens(vec![Token::Str(String::from(
"ad50847381e248feaac9876cc71ae418",
))]);
let serializer = Serializer::builder().is_human_readable(true).build();
let serialized_tokens = original_id.serialize(&serializer).unwrap();
assert_eq!(serialized_tokens, expected_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
.tokens(Tokens(vec![Token::Str(String::from(
"ad50847381e248feaac9876cc71ae418",
))]))
.build();
assert_eq!(Id::deserialize(&mut deserializer).unwrap(), original_id);
}
macro_rules! roundtrip_type {
($type:ty, $expected_bytes:expr) => {{
let expected_bytes: [u8; 16] = $expected_bytes;
let original_id = <$type>::from(expected_bytes);
let ser_bytes = original_id.ser().unwrap();
assert_eq!(ser_bytes, expected_bytes);
let des_id = <$type>::des(&ser_bytes).unwrap();
assert_eq!(des_id, original_id);
}};
}
#[test]
fn test_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(Id, expected_bytes);
}
#[test]
fn test_tenant_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(TenantId, expected_bytes);
}
#[test]
fn test_timeline_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(TimelineId, expected_bytes);
}
}

View File

@@ -1,6 +1,5 @@
//! `utils` is intended to be a place to put code that is shared
//! between other crates in this repository.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod backoff;
@@ -25,10 +24,6 @@ pub mod auth;
// utility functions and helper traits for unified unique id generation/serialization etc.
pub mod id;
mod hex;
pub use hex::Hex;
// http endpoint utils
pub mod http;
@@ -78,9 +73,6 @@ pub mod completion;
/// Reporting utilities
pub mod error;
/// async timeout helper
pub mod timeout;
pub mod sync;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
@@ -138,21 +130,6 @@ macro_rules! project_git_version {
};
}
/// This is a shortcut to embed build tag into binaries and avoid copying the same build script to all packages
#[macro_export]
macro_rules! project_build_tag {
($const_identifier:ident) => {
const $const_identifier: &::core::primitive::str = {
const __ARG: &[&::core::primitive::str; 2] = &match ::core::option_env!("BUILD_TAG") {
::core::option::Option::Some(x) => ["build_tag-env:", x],
::core::option::Option::None => ["build_tag:", ""],
};
$crate::__const_format::concatcp!(__ARG[0], __ARG[1])
};
};
}
/// Re-export for `project_git_version` macro
#[doc(hidden)]
pub use const_format as __const_format;

View File

@@ -1,7 +1,7 @@
#![warn(missing_docs)]
use camino::Utf8Path;
use serde::{de::Visitor, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::str::FromStr;
@@ -13,114 +13,10 @@ use crate::seqwait::MonotonicCounter;
pub const XLOG_BLCKSZ: u32 = 8192;
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash)]
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Lsn(pub u64);
impl Serialize for Lsn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
self.0.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for Lsn {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct LsnVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> Visitor<'de> for LsnVisitor {
type Value = Lsn;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str(
"value in form of hex string({upper_u32_hex}/{lower_u32_hex}) representing u64 integer",
)
} else {
formatter.write_str("value in form of integer(u64)")
}
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Lsn(v))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Lsn::from_str(v).map_err(|e| E::custom(e))
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(LsnVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_u64(LsnVisitor {
is_human_readable_deserializer: false,
})
}
}
}
/// Allows (de)serialization of an `Lsn` always as `u64`.
///
/// ### Example
///
/// ```rust
/// # use serde::{Serialize, Deserialize};
/// use utils::lsn::Lsn;
///
/// #[derive(PartialEq, Serialize, Deserialize, Debug)]
/// struct Foo {
/// #[serde(with = "utils::lsn::serde_as_u64")]
/// always_u64: Lsn,
/// }
///
/// let orig = Foo { always_u64: Lsn(1234) };
///
/// let res = serde_json::to_string(&orig).unwrap();
/// assert_eq!(res, r#"{"always_u64":1234}"#);
///
/// let foo = serde_json::from_str::<Foo>(&res).unwrap();
/// assert_eq!(foo, orig);
/// ```
///
pub mod serde_as_u64 {
use super::Lsn;
/// Serializes the Lsn as u64 disregarding the human readability of the format.
///
/// Meant to be used via `#[serde(with = "...")]` or `#[serde(serialize_with = "...")]`.
pub fn serialize<S: serde::Serializer>(lsn: &Lsn, serializer: S) -> Result<S::Ok, S::Error> {
use serde::Serialize;
lsn.0.serialize(serializer)
}
/// Deserializes the Lsn as u64 disregarding the human readability of the format.
///
/// Meant to be used via `#[serde(with = "...")]` or `#[serde(deserialize_with = "...")]`.
pub fn deserialize<'de, D: serde::Deserializer<'de>>(deserializer: D) -> Result<Lsn, D::Error> {
use serde::Deserialize;
u64::deserialize(deserializer).map(Lsn)
}
}
/// We tried to parse an LSN from a string, but failed
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("LsnParseError")]
@@ -368,13 +264,8 @@ impl MonotonicCounter<Lsn> for RecordLsn {
#[cfg(test)]
mod tests {
use crate::bin_ser::BeSer;
use super::*;
use serde::ser::Serialize;
use serde_assert::{Deserializer, Serializer, Token, Tokens};
#[test]
fn test_lsn_strings() {
assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555)));
@@ -450,95 +341,4 @@ mod tests {
assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678));
assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000));
}
#[test]
fn test_lsn_serde() {
let original_lsn = Lsn(0x0123456789abcdef);
let expected_readable_tokens = Tokens(vec![Token::U64(0x0123456789abcdef)]);
let expected_non_readable_tokens =
Tokens(vec![Token::Str(String::from("1234567/89ABCDEF"))]);
// Testing human_readable ser/de
let serializer = Serializer::builder().is_human_readable(false).build();
let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
assert_eq!(readable_ser_tokens, expected_readable_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(readable_ser_tokens)
.build();
let des_lsn = Lsn::deserialize(&mut deserializer).unwrap();
assert_eq!(des_lsn, original_lsn);
// Testing NON human_readable ser/de
let serializer = Serializer::builder().is_human_readable(true).build();
let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
assert_eq!(non_readable_ser_tokens, expected_non_readable_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
.tokens(non_readable_ser_tokens)
.build();
let des_lsn = Lsn::deserialize(&mut deserializer).unwrap();
assert_eq!(des_lsn, original_lsn);
// Testing mismatching ser/de
let serializer = Serializer::builder().is_human_readable(false).build();
let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
.tokens(non_readable_ser_tokens)
.build();
Lsn::deserialize(&mut deserializer).unwrap_err();
let serializer = Serializer::builder().is_human_readable(true).build();
let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(readable_ser_tokens)
.build();
Lsn::deserialize(&mut deserializer).unwrap_err();
}
#[test]
fn test_lsn_ensure_roundtrip() {
let original_lsn = Lsn(0xaaaabbbb);
let serializer = Serializer::builder().is_human_readable(false).build();
let ser_tokens = original_lsn.serialize(&serializer).unwrap();
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(ser_tokens)
.build();
let des_lsn = Lsn::deserialize(&mut deserializer).unwrap();
assert_eq!(des_lsn, original_lsn);
}
#[test]
fn test_lsn_bincode_serde() {
let lsn = Lsn(0x0123456789abcdef);
let expected_bytes = [0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef];
let ser_bytes = lsn.ser().unwrap();
assert_eq!(ser_bytes, expected_bytes);
let des_lsn = Lsn::des(&ser_bytes).unwrap();
assert_eq!(des_lsn, lsn);
}
#[test]
fn test_lsn_bincode_ensure_roundtrip() {
let original_lsn = Lsn(0x01_02_03_04_05_06_07_08);
let expected_bytes = vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
let ser_bytes = original_lsn.ser().unwrap();
assert_eq!(ser_bytes, expected_bytes);
let des_lsn = Lsn::des(&ser_bytes).unwrap();
assert_eq!(des_lsn, original_lsn);
}
}

View File

@@ -3,6 +3,7 @@ use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tracing::{trace, warn};
use crate::lsn::Lsn;
@@ -14,17 +15,21 @@ use crate::lsn::Lsn;
///
/// serde Serialize is used only for human readable dump to json (e.g. in
/// safekeepers debug_dump).
#[serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
/// LSN last received and ingested by the pageserver. Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub last_received_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver to its local disc.
/// Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
// Serialize with RFC3339 format.
#[serde(with = "serde_systemtime")]

View File

@@ -1,7 +1,6 @@
/// Immediately terminate the calling process without calling
/// atexit callbacks, C runtime destructors etc. We mainly use
/// this to protect coverage data from concurrent writes.
pub fn exit_now(code: u8) -> ! {
// SAFETY: exiting is safe, the ffi is not safe
pub fn exit_now(code: u8) {
unsafe { nix::libc::_exit(code as _) };
}

View File

@@ -1,3 +1 @@
pub mod heavier_once_cell;
pub mod gate;

View File

@@ -1,158 +0,0 @@
use std::{sync::Arc, time::Duration};
/// Gates are a concurrency helper, primarily used for implementing safe shutdown.
///
/// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
/// the resource calls `close()` when they want to ensure that all holders of guards
/// have released them, and that no future guards will be issued.
pub struct Gate {
/// Each caller of enter() takes one unit from the semaphore. In close(), we
/// take all the units to ensure all GateGuards are destroyed.
sem: Arc<tokio::sync::Semaphore>,
/// For observability only: a name that will be used to log warnings if a particular
/// gate is holding up shutdown
name: String,
}
/// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
/// not complete.
#[derive(Debug)]
pub struct GateGuard(tokio::sync::OwnedSemaphorePermit);
/// Observability helper: every `warn_period`, emit a log warning that we're still waiting on this gate
async fn warn_if_stuck<Fut: std::future::Future>(
fut: Fut,
name: &str,
warn_period: std::time::Duration,
) -> <Fut as std::future::Future>::Output {
let started = std::time::Instant::now();
let mut fut = std::pin::pin!(fut);
loop {
match tokio::time::timeout(warn_period, &mut fut).await {
Ok(ret) => return ret,
Err(_) => {
tracing::warn!(
gate = name,
elapsed_ms = started.elapsed().as_millis(),
"still waiting, taking longer than expected..."
);
}
}
}
}
#[derive(Debug)]
pub enum GateError {
GateClosed,
}
impl Gate {
const MAX_UNITS: u32 = u32::MAX;
pub fn new(name: String) -> Self {
Self {
sem: Arc::new(tokio::sync::Semaphore::new(Self::MAX_UNITS as usize)),
name,
}
}
/// Acquire a guard that will prevent close() calls from completing. If close()
/// was already called, this will return an error which should be interpreted
/// as "shutting down".
///
/// This function would typically be used from e.g. request handlers. While holding
/// the guard returned from this function, it is important to respect a CancellationToken
/// to avoid blocking close() indefinitely: typically types that contain a Gate will
/// also contain a CancellationToken.
pub fn enter(&self) -> Result<GateGuard, GateError> {
self.sem
.clone()
.try_acquire_owned()
.map(GateGuard)
.map_err(|_| GateError::GateClosed)
}
/// Types with a shutdown() method and a gate should call this method at the
/// end of shutdown, to ensure that all GateGuard holders are done.
///
/// This will wait for all guards to be destroyed. For this to complete promptly, it is
/// important that the holders of such guards are respecting a CancellationToken which has
/// been cancelled before entering this function.
pub async fn close(&self) {
warn_if_stuck(self.do_close(), &self.name, Duration::from_millis(1000)).await
}
/// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
/// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
/// the CancellationToken on such types is analogous to "Did shutdown start?"
pub fn close_complete(&self) -> bool {
self.sem.is_closed()
}
async fn do_close(&self) {
tracing::debug!(gate = self.name, "Closing Gate...");
match self.sem.acquire_many(Self::MAX_UNITS).await {
Ok(_units) => {
// While holding all units, close the semaphore. All subsequent calls to enter() will fail.
self.sem.close();
}
Err(_) => {
// Semaphore closed: we are the only function that can do this, so it indicates a double-call.
// This is legal. Timeline::shutdown for example is not protected from being called more than
// once.
tracing::debug!(gate = self.name, "Double close")
}
}
tracing::debug!(gate = self.name, "Closed Gate.")
}
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
use super::*;
#[tokio::test]
async fn test_idle_gate() {
// Having taken no gates, we should not be blocked in close
let gate = Gate::new("test".to_string());
gate.close().await;
// If a guard is dropped before entering, close should not be blocked
let gate = Gate::new("test".to_string());
let guard = gate.enter().unwrap();
drop(guard);
gate.close().await;
// Entering a closed guard fails
gate.enter().expect_err("enter should fail after close");
}
#[tokio::test]
async fn test_busy_gate() {
let gate = Gate::new("test".to_string());
let guard = gate.enter().unwrap();
let mut close_fut = std::pin::pin!(gate.close());
// Close should be blocked
assert!(close_fut.as_mut().now_or_never().is_none());
// Attempting to enter() should fail, even though close isn't done yet.
gate.enter()
.expect_err("enter should fail after entering close");
drop(guard);
// Guard is gone, close should finish
assert!(close_fut.as_mut().now_or_never().is_some());
// Attempting to enter() is still forbidden
gate.enter().expect_err("enter should fail finishing close");
}
}

View File

@@ -1,7 +1,4 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, MutexGuard,
};
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
@@ -13,7 +10,6 @@ use tokio::sync::Semaphore;
/// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit
pub struct OnceCell<T> {
inner: Mutex<Inner<T>>,
initializers: AtomicUsize,
}
impl<T> Default for OnceCell<T> {
@@ -21,7 +17,6 @@ impl<T> Default for OnceCell<T> {
fn default() -> Self {
Self {
inner: Default::default(),
initializers: AtomicUsize::new(0),
}
}
}
@@ -54,7 +49,6 @@ impl<T> OnceCell<T> {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
initializers: AtomicUsize::new(0),
}
}
@@ -66,8 +60,8 @@ impl<T> OnceCell<T> {
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let sem = {
let guard = self.inner.lock().unwrap();
@@ -77,61 +71,29 @@ impl<T> OnceCell<T> {
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
let permit = sem.acquire_owned().await;
if permit.is_err() {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
} else {
// now we try
let value = factory().await?;
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.lock().unwrap();
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
}
let mut guard = self.inner.lock().unwrap();
assert!(
guard.value.is_none(),
"we won permit, must not be initialized"
);
guard.value = Some(value);
guard.init_semaphore.close();
Ok(Guard(guard))
}
}
/// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used
/// to complete initializing the inner value.
///
/// # Panics
///
/// If the inner has already been initialized.
pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> {
let guard = self.inner.lock().unwrap();
// cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot
// give more permits right now.
if guard.init_semaphore.try_acquire().is_ok() {
drop(guard);
panic!("permit is of wrong origin");
}
Self::set0(value, guard)
}
fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner<T>>) -> Guard<'_, T> {
if guard.value.is_some() {
drop(guard);
unreachable!("we won permit, must not be initialized");
}
guard.value = Some(value);
guard.init_semaphore.close();
Guard(guard)
}
/// Returns a guard to an existing initialized value, if any.
pub fn get(&self) -> Option<Guard<'_, T>> {
let guard = self.inner.lock().unwrap();
@@ -141,28 +103,6 @@ impl<T> OnceCell<T> {
None
}
}
/// Return the number of [`Self::get_or_init`] calls waiting for initialization to complete.
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
}
}
/// DropGuard counter for queued tasks waiting to initialize, mainly accessible for the
/// initializing task for example at the end of initialization.
struct CountWaitingInitializers<'a, T>(&'a OnceCell<T>);
impl<'a, T> CountWaitingInitializers<'a, T> {
fn start(target: &'a OnceCell<T>) -> Self {
target.initializers.fetch_add(1, Ordering::Relaxed);
CountWaitingInitializers(target)
}
}
impl<'a, T> Drop for CountWaitingInitializers<'a, T> {
fn drop(&mut self) {
self.0.initializers.fetch_sub(1, Ordering::Relaxed);
}
}
/// Uninteresting guard object to allow short-lived access to inspect or clone the held,
@@ -195,7 +135,7 @@ impl<'a, T> Guard<'a, T> {
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
@@ -205,14 +145,11 @@ impl<'a, T> Guard<'a, T> {
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(permit)))
.map(|v| (v, permit))
.expect("guard is not created unless value has been initialized")
}
}
/// Type held by OnceCell (de)initializing task.
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
#[cfg(test)]
mod tests {
use super::*;
@@ -248,11 +185,11 @@ mod tests {
barrier.wait().await;
let won = {
let g = cell
.get_or_init(|permit| {
.get_or_init(|| {
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
async {
counters.future_polled.fetch_add(1, Ordering::Relaxed);
Ok::<_, Infallible>((i, permit))
Ok::<_, Infallible>(i)
}
})
.await
@@ -306,7 +243,7 @@ mod tests {
deinitialization_started.wait().await;
let started_at = tokio::time::Instant::now();
cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) })
cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) })
.await
.unwrap();
@@ -321,32 +258,18 @@ mod tests {
assert_eq!(*cell.get().unwrap(), reinit);
}
#[test]
fn reinit_with_deinit_permit() {
let cell = Arc::new(OnceCell::new(42));
let (mol, permit) = cell.get().unwrap().take_and_deinit();
cell.set(5, permit);
assert_eq!(*cell.get().unwrap(), 5);
let (five, permit) = cell.get().unwrap().take_and_deinit();
assert_eq!(5, five);
cell.set(mol, permit);
assert_eq!(*cell.get().unwrap(), 42);
}
#[tokio::test]
async fn initialization_attemptable_until_ok() {
let cell = OnceCell::default();
for _ in 0..10 {
cell.get_or_init(|_permit| async { Err("whatever error") })
cell.get_or_init(|| async { Err("whatever error") })
.await
.unwrap_err();
}
let g = cell
.get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) })
.get_or_init(|| async { Ok::<_, Infallible>("finally success") })
.await
.unwrap();
assert_eq!(*g, "finally success");
@@ -358,11 +281,11 @@ mod tests {
let barrier = tokio::sync::Barrier::new(2);
let initializer = cell.get_or_init(|permit| async {
let initializer = cell.get_or_init(|| async {
barrier.wait().await;
futures::future::pending::<()>().await;
Ok::<_, Infallible>(("never reached", permit))
Ok::<_, Infallible>("never reached")
});
tokio::select! {
@@ -375,7 +298,7 @@ mod tests {
assert!(cell.get().is_none());
let g = cell
.get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) })
.get_or_init(|| async { Ok::<_, Infallible>("now initialized") })
.await
.unwrap();
assert_eq!(*g, "now initialized");

View File

@@ -1,37 +0,0 @@
use std::time::Duration;
use tokio_util::sync::CancellationToken;
pub enum TimeoutCancellableError {
Timeout,
Cancelled,
}
/// Wrap [`tokio::time::timeout`] with a CancellationToken.
///
/// This wrapper is appropriate for any long running operation in a task
/// that ought to respect a CancellationToken (which means most tasks).
///
/// The only time you should use a bare tokio::timeout is when the future `F`
/// itself respects a CancellationToken: otherwise, always use this wrapper
/// with your CancellationToken to ensure that your task does not hold up
/// graceful shutdown.
pub async fn timeout_cancellable<F>(
duration: Duration,
cancel: &CancellationToken,
future: F,
) -> Result<F::Output, TimeoutCancellableError>
where
F: std::future::Future,
{
tokio::select!(
r = tokio::time::timeout(duration, future) => {
r.map_err(|_| TimeoutCancellableError::Timeout)
},
_ = cancel.cancelled() => {
Err(TimeoutCancellableError::Cancelled)
}
)
}

View File

@@ -19,12 +19,13 @@ inotify.workspace = true
serde.workspace = true
serde_json.workspace = true
sysinfo.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio.workspace = true
tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"

View File

@@ -21,6 +21,11 @@ pub struct FileCacheState {
#[derive(Debug)]
pub struct FileCacheConfig {
/// Whether the file cache is *actually* stored in memory (e.g. by writing to
/// a tmpfs or shmem file). If true, the size of the file cache will be counted against the
/// memory available for the cgroup.
pub(crate) in_memory: bool,
/// The size of the file cache, in terms of the size of the resource it consumes
/// (currently: only memory)
///
@@ -54,9 +59,22 @@ pub struct FileCacheConfig {
spread_factor: f64,
}
impl Default for FileCacheConfig {
fn default() -> Self {
impl FileCacheConfig {
pub fn default_in_memory() -> Self {
Self {
in_memory: true,
// 75 %
resource_multiplier: 0.75,
// 640 MiB; (512 + 128)
min_remaining_after_cache: NonZeroU64::new(640 * MiB).unwrap(),
// ensure any increase in file cache size is split 90-10 with 10% to other memory
spread_factor: 0.1,
}
}
pub fn default_on_disk() -> Self {
Self {
in_memory: false,
resource_multiplier: 0.75,
// 256 MiB - lower than when in memory because overcommitting is safe; if we don't have
// memory, the kernel will just evict from its page cache, rather than e.g. killing
@@ -65,9 +83,7 @@ impl Default for FileCacheConfig {
spread_factor: 0.1,
}
}
}
impl FileCacheConfig {
/// Make sure fields of the config are consistent.
pub fn validate(&self) -> anyhow::Result<()> {
// Single field validity

View File

@@ -1,5 +1,3 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
#![cfg(target_os = "linux")]
use anyhow::Context;
@@ -41,6 +39,16 @@ pub struct Args {
#[arg(short, long)]
pub pgconnstr: Option<String>,
/// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
/// kernel's page cache), and therefore should not count against available memory.
//
// NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
// than a roundabout way, via whether it's on disk), but in order to be backwards compatible
// during the switch away from an in-memory file cache, we had to default to the previous
// behavior.
#[arg(long)]
pub file_cache_on_disk: bool,
/// The address we should listen on for connection requests. For the
/// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
#[arg(short, long)]

View File

@@ -156,7 +156,10 @@ impl Runner {
// memory limits.
if let Some(connstr) = &args.pgconnstr {
info!("initializing file cache");
let config = FileCacheConfig::default();
let config = match args.file_cache_on_disk {
true => FileCacheConfig::default_on_disk(),
false => FileCacheConfig::default_in_memory(),
};
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
.await
@@ -184,7 +187,10 @@ impl Runner {
info!("file cache size actually got set to {actual_size}")
}
file_cache_disk_size = actual_size;
if args.file_cache_on_disk {
file_cache_disk_size = actual_size;
}
state.filecache = Some(file_cache);
}
@@ -233,11 +239,17 @@ impl Runner {
let requested_mem = target.mem;
let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
let expected_file_cache_size = self
let (expected_file_cache_size, expected_file_cache_disk_size) = self
.filecache
.as_ref()
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
.unwrap_or(0);
.map(|file_cache| {
let size = file_cache.config.calculate_cache_size(usable_system_memory);
match file_cache.config.in_memory {
true => (size, 0),
false => (size, size),
}
})
.unwrap_or((0, 0));
if let Some(cgroup) = &self.cgroup {
let (last_time, last_history) = *cgroup.watcher.borrow();
@@ -261,7 +273,7 @@ impl Runner {
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, expected_file_cache_size);
.cgroup_threshold(usable_system_memory, expected_file_cache_disk_size);
let current = last_history.avg_non_reclaimable;
@@ -288,10 +300,13 @@ impl Runner {
.set_file_cache_size(expected_file_cache_size)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
if !file_cache.config.in_memory {
file_cache_disk_size = actual_usage;
}
let message = format!(
"set file cache size to {} MiB",
"set file cache size to {} MiB (in memory = {})",
bytes_to_mebibytes(actual_usage),
file_cache.config.in_memory,
);
info!("downscale: {message}");
status.push(message);
@@ -342,7 +357,9 @@ impl Runner {
.set_file_cache_size(expected_usage)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
if !file_cache.config.in_memory {
file_cache_disk_size = actual_usage;
}
if actual_usage != expected_usage {
warn!(

View File

@@ -14,7 +14,7 @@ use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::TenantSharedResources;
use pageserver::tenant::{secondary, TenantSharedResources};
use remote_storage::GenericRemoteStorage;
use tokio::time::Instant;
use tracing::*;
@@ -34,15 +34,11 @@ use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
auth::{JwtAuth, SwappableJwtAuth},
logging, project_build_tag, project_git_version,
sentry_init::init_sentry,
signals::Signal,
auth::JwtAuth, logging, project_git_version, sentry_init::init_sentry, signals::Signal,
tcp_listener,
};
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
const PID_FILE_NAME: &str = "pageserver.pid";
@@ -262,12 +258,11 @@ fn start_pageserver(
// A changed version string indicates changed software.
// A changed launch timestamp indicates a pageserver restart.
info!(
"version: {} launch_timestamp: {} build_tag: {}",
"version: {} launch_timestamp: {}",
version(),
launch_ts.to_string(),
BUILD_TAG,
launch_ts.to_string()
);
set_build_info_metric(GIT_VERSION, BUILD_TAG);
set_build_info_metric(GIT_VERSION);
set_launch_timestamp_metric(launch_ts);
pageserver::preinitialize_metrics();
@@ -324,12 +319,13 @@ fn start_pageserver(
let http_auth;
let pg_auth;
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
// unwrap is ok because check is performed when creating config, so path is set and exists
// unwrap is ok because check is performed when creating config, so path is set and file exists
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
let jwt_auth = JwtAuth::from_key_path(key_path)?;
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
info!(
"Loading public key for verifying JWT tokens from {:#?}",
key_path
);
let auth: Arc<JwtAuth> = Arc::new(JwtAuth::from_key_path(key_path)?);
http_auth = match &conf.http_auth_type {
AuthType::Trust => None,
@@ -528,6 +524,18 @@ fn start_pageserver(
}
});
let secondary_controller = if let Some(remote_storage) = &remote_storage {
secondary::spawn_tasks(
conf,
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
)
} else {
secondary::null_controller()
};
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -539,6 +547,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
}
@@ -557,6 +566,7 @@ fn start_pageserver(
broker_client.clone(),
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
)
.context("Failed to initialize router state")?,
);

View File

@@ -33,7 +33,8 @@ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
TENANTS_SEGMENT_NAME, TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME,
TIMELINES_SEGMENT_NAME,
};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
@@ -161,7 +162,7 @@ pub struct PageServerConf {
pub http_auth_type: AuthType,
/// authentication method for libpq connections from compute
pub pg_auth_type: AuthType,
/// Path to a file or directory containing public key(s) for verifying JWT tokens.
/// Path to a file containing public key for verifying JWT tokens.
/// Used for both mgmt and compute auth, if enabled.
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
@@ -632,6 +633,11 @@ impl PageServerConf {
self.tenants_path().join(tenant_id.to_string())
}
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_ATTACHING_MARKER_FILENAME)
}
pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
}

View File

@@ -3,6 +3,7 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use serde_with::serde_as;
use std::{sync::Arc, time::SystemTime};
use utils::{
id::{TenantId, TimelineId},
@@ -41,10 +42,13 @@ pub(super) enum Name {
///
/// This is a denormalization done at the MetricsKey const methods; these should not be constructed
/// elsewhere.
#[serde_with::serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(crate) struct MetricsKey {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,

View File

@@ -1,4 +1,5 @@
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
use serde_with::serde_as;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@@ -6,9 +7,12 @@ use super::{metrics::Name, Cache, MetricsKey, RawMetric};
use utils::id::{TenantId, TimelineId};
/// How the metrics from pageserver are identified.
#[serde_with::serde_as]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
struct Ids {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
}

View File

@@ -57,10 +57,7 @@ impl ControlPlaneClient {
if let Some(jwt) = &conf.control_plane_api_token {
let mut headers = hyper::HeaderMap::new();
headers.insert(
"Authorization",
format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
);
headers.insert("Authorization", jwt.get_contents().parse().unwrap());
client = client.default_headers(headers);
}
@@ -147,7 +144,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
Ok(response
.tenants
.into_iter()
.map(|t| (t.id, Generation::new(t.gen)))
.map(|t| (t.id, Generation::new(t.generation)))
.collect::<HashMap<_, _>>())
}

View File

@@ -10,7 +10,6 @@ use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::VirtualFile;
use anyhow::Context;
use camino::Utf8PathBuf;
@@ -18,6 +17,7 @@ use hex::FromHex;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
use serde_with::serde_as;
use thiserror::Error;
use tokio;
use tokio_util::sync::CancellationToken;
@@ -214,6 +214,7 @@ where
/// during recovery as startup.
const TEMP_SUFFIX: &str = "tmp";
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionList {
/// Serialization version, for future use
@@ -242,6 +243,7 @@ struct DeletionList {
validated: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionHeader {
/// Serialization version, for future use
@@ -269,9 +271,7 @@ impl DeletionHeader {
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
.await
.maybe_fatal_err("save deletion header")?;
Ok(())
.map_err(Into::into)
}
}
@@ -360,7 +360,6 @@ impl DeletionList {
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
}
}

View File

@@ -0,0 +1,2 @@
Checking pageserver v0.1.0 (/home/neon/neon/pageserver)
Finished dev [optimized + debuginfo] target(s) in 7.62s

View File

@@ -55,24 +55,21 @@ impl Deleter {
/// Wrap the remote `delete_objects` with a failpoint
async fn remote_delete(&self) -> Result<(), anyhow::Error> {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["failpoint"])
.inc();
Err(anyhow::anyhow!("failpoint hit"))
});
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
// logging path for issues deleting objects.
backoff::retry(
|| async {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["failpoint"])
.inc();
Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
});
self.remote_storage.delete_objects(&self.accumulator).await
},
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|_| false,
3,
10,

View File

@@ -34,8 +34,6 @@ use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file::on_fatal_io_error;
use crate::virtual_file::MaybeFatalIo;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
@@ -197,7 +195,7 @@ impl ListWriter {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
on_fatal_io_error(&e, "reading deletion header");
Err(anyhow::anyhow!(e))
}
}
}
@@ -218,9 +216,16 @@ impl ListWriter {
self.pending.sequence = validated_sequence + 1;
let deletion_directory = self.conf.deletion_prefix();
let mut dir = tokio::fs::read_dir(&deletion_directory)
.await
.fatal_err("read deletion directory");
let mut dir = match tokio::fs::read_dir(&deletion_directory).await {
Ok(d) => d,
Err(e) => {
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
}
};
let list_name_pattern =
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
@@ -228,7 +233,7 @@ impl ListWriter {
let temp_extension = format!(".{TEMP_SUFFIX}");
let header_path = self.conf.deletion_header_path();
let mut seqs: Vec<u64> = Vec::new();
while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
while let Some(dentry) = dir.next_entry().await? {
let file_name = dentry.file_name();
let dentry_str = file_name.to_string_lossy();
@@ -241,9 +246,11 @@ impl ListWriter {
info!("Cleaning up temporary file {dentry_str}");
let absolute_path =
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
tokio::fs::remove_file(&absolute_path)
.await
.fatal_err("delete temp file");
if let Err(e) = tokio::fs::remove_file(&absolute_path).await {
// Non-fatal error: we will just leave the file behind but not
// try and load it.
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
}
continue;
}
@@ -283,9 +290,7 @@ impl ListWriter {
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path)
.await
.fatal_err("read deletion list");
let list_bytes = tokio::fs::read(&list_path).await?;
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,
@@ -344,7 +349,7 @@ impl ListWriter {
info!("Started deletion frontend worker");
// Synchronous, but we only do it once per process lifetime so it's tolerable
if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
if let Err(e) = create_dir_all(&self.conf.deletion_prefix()) {
tracing::error!(
"Failed to create deletion list directory {}, deletions will not be executed ({e})",
self.conf.deletion_prefix(),

View File

@@ -28,7 +28,6 @@ use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::MaybeFatalIo;
use super::deleter::DeleterMessage;
use super::DeletionHeader;
@@ -288,9 +287,16 @@ where
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
for list_path in list_paths {
debug!("Removing deletion list {list_path}");
tokio::fs::remove_file(&list_path)
.await
.fatal_err("remove deletion list");
if let Err(e) = tokio::fs::remove_file(&list_path).await {
// Unexpected: we should have permissions and nothing else should
// be touching these files. We will leave the file behind. Subsequent
// pageservers will try and load it again: hopefully whatever storage
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {list_path}: {e:#}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
break;
}
}
}

View File

@@ -48,20 +48,23 @@ use std::{
};
use anyhow::Context;
use camino::Utf8Path;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use utils::{
completion,
id::{TenantId, TenantTimelineId},
};
use utils::{id::TimelineId, serde_percent::Percent};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
mgr::TenantManager,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer},
Timeline,
},
@@ -87,6 +90,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
tenant_manager: Arc<TenantManager>,
background_jobs_barrier: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
@@ -112,8 +116,7 @@ pub fn launch_disk_usage_global_eviction_task(
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel)
.await;
disk_usage_eviction_task(&state, task_config, &storage, tenant_manager, cancel).await;
Ok(())
},
);
@@ -126,7 +129,7 @@ async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
_storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) {
scopeguard::defer! {
@@ -150,7 +153,8 @@ async fn disk_usage_eviction_task(
async {
let res =
disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await;
disk_usage_eviction_task_iteration(state, task_config, &tenant_manager, &cancel)
.await;
match res {
Ok(()) => {}
@@ -181,12 +185,14 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
tenants_dir: &Utf8Path,
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
let tenants_dir = tenant_manager.get_conf().tenants_path();
let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await;
let res =
disk_usage_eviction_task_iteration_impl(state, usage_pre, tenant_manager, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
@@ -196,7 +202,7 @@ async fn disk_usage_eviction_task_iteration(
}
IterationOutcome::Finished(outcome) => {
// Verify with statvfs whether we made any real progress
let after = filesystem_level_usage::get(tenants_dir, task_config)
let after = filesystem_level_usage::get(&tenants_dir, task_config)
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
.context("get filesystem-level disk usage after evictions")?;
@@ -271,6 +277,7 @@ struct LayerCount {
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
usage_pre: U,
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
// use tokio's mutex to get a Sync guard (instead of std::sync::Mutex)
@@ -290,7 +297,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
"running disk usage based eviction due to pressure"
);
let candidates = match collect_eviction_candidates(cancel).await? {
let candidates = match collect_eviction_candidates(tenant_manager, cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
@@ -326,7 +333,13 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
// Evictions for attached tenants, batched by timeline
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
// Evictions for secondary locations, batched by tenant
let mut secondary_by_tenant: HashMap<TenantId, Vec<(TimelineId, Layer)>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
let mut max_batch_size = 0;
@@ -349,14 +362,22 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// FIXME: batching makes no sense anymore because of no layermap locking, should just spawn
// tasks to evict all seen layers until we have evicted enough
let batch = batched.entry(TimelineKey(candidate.timeline)).or_default();
match candidate.source {
EvictionCandidateSource::Attached(timeline) => {
let batch = batched.entry(TimelineKey(timeline)).or_default();
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
}
}
EvictionCandidateSource::Secondary(ttid) => {
let batch = secondary_by_tenant.entry(ttid.tenant_id).or_default();
batch.push((ttid.timeline_id, candidate.layer));
}
}
}
@@ -372,7 +393,20 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
};
debug!(?usage_planned, "usage planned");
// phase2: evict victims batched by timeline
// phase2 (secondary tenants): evict victims batched by tenant
for (tenant_id, timeline_layers) in secondary_by_tenant {
// Q: Why do we go via TenantManager again rather than just deleting files, or keeping
// an Arc ref to the secondary state?
// A: It's because a given tenant's local storage **belongs** to whoever is currently
// live in the TenantManager. We must avoid a race where we might plan an eviction
// for secondary, and then execute it when the tenant is actually in an attached state.
tenant_manager
.evict_tenant_layers(&tenant_id, timeline_layers)
.instrument(tracing::info_span!("evict_batch", %tenant_id))
.await;
}
// phase2 (attached tenants): evict victims batched by timeline
let mut js = tokio::task::JoinSet::new();
@@ -403,7 +437,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
return (evicted_bytes, evictions_failed);
};
let results = timeline.evict_layers(&batch).await;
let results = timeline.evict_layers(&batch, &cancel).await;
match results {
Ok(results) => {
@@ -480,9 +514,18 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
}))
}
// An eviction candidate might originate from either an attached tenant
// with a [`Tenant`] and [`Timeline`] object, or from a secondary tenant
// location. These differ in how we will execute the eviction.
#[derive(Clone)]
enum EvictionCandidateSource {
Attached(Arc<Timeline>),
Secondary(TenantTimelineId),
}
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
source: EvictionCandidateSource,
layer: Layer,
last_activity_ts: SystemTime,
}
@@ -532,32 +575,18 @@ enum EvictionCandidates {
/// after exhauting the `Above` partition.
/// So, we did not respect each tenant's min_resident_size.
async fn collect_eviction_candidates(
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<EvictionCandidates> {
// get a snapshot of the list of tenants
let tenants = tenant::mgr::list_tenants()
.await
.context("get list of tenants")?;
let mut candidates = Vec::new();
for (tenant_id, _state) in &tenants {
let tenants = tenant_manager.get_attached_tenants();
for tenant in tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true) {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
debug!("failed to get tenant: {e:#}");
continue;
}
};
if tenant.cancel.is_cancelled() {
info!(%tenant_id, "Skipping tenant for eviction, it is shutting down");
continue;
}
// collect layers from all timelines in this tenant
//
@@ -620,7 +649,7 @@ async fn collect_eviction_candidates(
for (timeline, layer_info) in tenant_candidates.into_iter() {
let file_size = layer_info.file_size();
let candidate = EvictionCandidate {
timeline,
source: EvictionCandidateSource::Attached(timeline),
last_activity_ts: layer_info.last_activity_ts,
layer: layer_info.layer,
};
@@ -634,6 +663,43 @@ async fn collect_eviction_candidates(
}
}
// FIXME: this is a long loop over all secondary locations. At the least, respect
// cancellation here, but really we need to break up the loop. We could extract the
// Arc<SecondaryTenant>s and iterate over them with some tokio yields in there. Ideally
// though we should just reduce the total amount of work: our eviction goals do not require
// listing absolutely every layer in every tenant: we could sample this.
tenant_manager.foreach_secondary_tenants(
|tenant_id: &TenantId, state: &Arc<SecondaryTenant>| {
let mut tenant_candidates = Vec::new();
for (timeline_id, layer_info) in state.get_layers_for_eviction() {
debug!(tenant_id=%tenant_id, timeline_id=%timeline_id, "timeline resident layers (secondary) count: {}", layer_info.resident_layers.len());
tenant_candidates.extend(
layer_info.resident_layers
.into_iter()
.map(|layer_infos| (timeline_id, layer_infos)),
);
}
tenant_candidates
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
candidates.extend(tenant_candidates.into_iter().map(|(timeline_id, candidate)| {
(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
EvictionCandidate {
source: EvictionCandidateSource::Secondary(TenantTimelineId { tenant_id: *tenant_id, timeline_id}),
last_activity_ts: candidate.last_activity_ts,
layer: candidate.layer,
}
)
}));
},
);
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
candidates

View File

@@ -52,31 +52,6 @@ paths:
schema:
type: object
/v1/reload_auth_validation_keys:
post:
description: Reloads the JWT public keys from their pre-configured location on disk.
responses:
"200":
description: The reload completed successfully.
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error (also hits if no keys were found)
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}:
parameters:
- name: tenant_id
@@ -594,17 +569,7 @@ paths:
schema:
$ref: "#/components/schemas/NotFoundError"
"409":
description: |
The tenant is already known to Pageserver in some way,
and hence this `/attach` call has been rejected.
Some examples of how this can happen:
- tenant was created on this pageserver
- tenant attachment was started by an earlier call to `/attach`.
Callers should poll the tenant status's `attachment_status` field,
like for status 202. See the longer description for `POST /attach`
for details.
description: Tenant download is already in progress
content:
application/json:
schema:
@@ -748,12 +713,6 @@ paths:
Errors if the tenant is absent on disk, already present in memory or fails to schedule its load.
Scheduling a load does not mean that the tenant would load successfully, check tenant status to ensure load correctness.
requestBody:
required: false
content:
application/json:
schema:
$ref: "#/components/schemas/TenantLoadRequest"
responses:
"202":
description: Tenant scheduled to load successfully
@@ -1244,15 +1203,6 @@ components:
new_tenant_id:
type: string
format: hex
generation:
type: integer
description: Attachment generation number.
TenantLoadRequest:
type: object
properties:
generation:
type: integer
description: Attachment generation number.
TenantAttachRequest:
type: object
required:

View File

@@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
@@ -17,10 +18,10 @@ use pageserver_api::models::{
TenantLoadRequest, TenantLocationConfigRequest,
};
use remote_storage::GenericRemoteStorage;
use serde_with::{serde_as, DisplayFromStr};
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -39,6 +40,7 @@ use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::Timeline;
@@ -46,7 +48,7 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSha
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use utils::{
auth::SwappableJwtAuth,
auth::JwtAuth,
generation::Generation,
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
@@ -65,23 +67,26 @@ use super::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
}
impl State {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
@@ -96,6 +101,7 @@ impl State {
broker_client,
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
})
}
@@ -165,7 +171,8 @@ impl From<TenantSlotError> for ApiError {
NotFound(tenant_id) => {
ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
}
e @ (AlreadyExists(_, _) | Conflict(_)) => ApiError::Conflict(format!("{e}")),
e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
e @ Conflict(_) => ApiError::Conflict(format!("{e}")),
InProgress => {
ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
}
@@ -224,7 +231,6 @@ impl From<GetTenantError> for ApiError {
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
}
}
}
@@ -393,32 +399,6 @@ async fn status_handler(
json_response(StatusCode::OK, StatusResponse { id: config.id })
}
async fn reload_auth_validation_keys_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let config = get_config(&request);
let state = get_state(&request);
let Some(shared_auth) = &state.auth else {
return json_response(StatusCode::BAD_REQUEST, ());
};
// unwrap is ok because check is performed when creating config, so path is set and exists
let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
match JwtAuth::from_key_path(key_path) {
Ok(new_auth) => {
shared_auth.swap(new_auth);
json_response(StatusCode::OK, ())
}
Err(e) => {
warn!("Error reloading public keys from {key_path:?}: {e:}");
json_response(StatusCode::INTERNAL_SERVER_ERROR, ())
}
}
}
async fn timeline_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
@@ -462,9 +442,6 @@ async fn timeline_create_handler(
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
}
Err(tenant::CreateTimelineError::ShuttingDown) => {
json_response(StatusCode::SERVICE_UNAVAILABLE,HttpErrorBody::from_msg("tenant shutting down".to_string()))
}
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}
@@ -567,8 +544,10 @@ async fn get_lsn_by_timestamp_handler(
let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?;
if version.unwrap_or(0) > 1 {
#[serde_as]
#[derive(serde::Serialize)]
struct Result {
#[serde_as(as = "DisplayFromStr")]
lsn: Lsn,
kind: &'static str,
}
@@ -877,8 +856,10 @@ async fn tenant_size_handler(
}
/// The type resides in the pageserver not to expose `ModelInputs`.
#[serde_with::serde_as]
#[derive(serde::Serialize)]
struct TenantHistorySize {
#[serde_as(as = "serde_with::DisplayFromStr")]
id: TenantId,
/// Size is a mixture of WAL and logical size, so the unit is bytes.
///
@@ -1143,6 +1124,9 @@ async fn put_tenant_location_config_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
@@ -1172,7 +1156,7 @@ async fn put_tenant_location_config_handler(
state
.tenant_manager
.upsert_location(tenant_id, location_conf, &ctx)
.upsert_location(tenant_id, location_conf, flush, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
@@ -1563,11 +1547,12 @@ async fn disk_usage_eviction_run(
)));
}
let state = state.disk_usage_eviction_state.clone();
let eviction_state = state.disk_usage_eviction_state.clone();
let cancel = CancellationToken::new();
let child_cancel = cancel.clone();
let _g = cancel.drop_guard();
let tenant_manager = state.tenant_manager.clone();
crate::task_mgr::spawn(
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
@@ -1578,8 +1563,9 @@ async fn disk_usage_eviction_run(
false,
async move {
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
&eviction_state,
usage,
&tenant_manager,
&child_cancel,
)
.await;
@@ -1597,6 +1583,36 @@ async fn disk_usage_eviction_run(
json_response(StatusCode::OK, response)
}
async fn secondary_download_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
state
.secondary_controller
.download_tenant(tenant_id)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn secondary_upload_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
state
.secondary_controller
.upload_tenant(tenant_id)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -1719,7 +1735,7 @@ where
pub fn make_router(
state: Arc<State>,
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
let spec = include_bytes!("openapi_spec.yml");
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
@@ -1748,9 +1764,6 @@ pub fn make_router(
.put("/v1/failpoints", |r| {
testing_api_handler("manage failpoints", r, failpoints_handler)
})
.post("/v1/reload_auth_validation_keys", |r| {
api_handler(r, reload_auth_validation_keys_handler)
})
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
@@ -1836,6 +1849,16 @@ pub fn make_router(
.put("/v1/deletion_queue/flush", |r| {
api_handler(r, deletion_queue_flush)
})
.post("/v1/secondary/:tenant_id/upload", |r| {
testing_api_handler("force heatmap upload", r, secondary_upload_handler)
})
.post("/v1/secondary/:tenant_id/download", |r| {
testing_api_handler(
"force secondary layer download",
r,
secondary_download_handler,
)
})
.put("/v1/tenant/:tenant_id/break", |r| {
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
})

View File

@@ -1,5 +1,3 @@
#![deny(clippy::undocumented_unsafe_blocks)]
mod auth;
pub mod basebackup;
pub mod config;
@@ -63,6 +61,14 @@ pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_cod
)
.await;
// Shut down any page service tasks.
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
.await;
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC tasks.
timed(
@@ -72,15 +78,6 @@ pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_cod
)
.await;
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
// should already have been canclled via mgr::shutdown_all_tenants
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
.await;
// Best effort to persist any outstanding deletions, to avoid leaking objects
if let Some(mut deletion_queue) = deletion_queue {
deletion_queue.shutdown(Duration::from_secs(5)).await;

View File

@@ -962,32 +962,6 @@ static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy
.expect("failed to define a metric")
});
pub(crate) struct TenantManagerMetrics {
pub(crate) tenant_slots: UIntGauge,
pub(crate) tenant_slot_writes: IntCounter,
pub(crate) unexpected_errors: IntCounter,
}
pub(crate) static TENANT_MANAGER: Lazy<TenantManagerMetrics> = Lazy::new(|| {
TenantManagerMetrics {
tenant_slots: register_uint_gauge!(
"pageserver_tenant_manager_slots",
"How many slots currently exist, including all attached, secondary and in-progress operations",
)
.expect("failed to define a metric"),
tenant_slot_writes: register_int_counter!(
"pageserver_tenant_manager_slot_writes",
"Writes to a tenant slot, including all of create/attach/detach/delete"
)
.expect("failed to define a metric"),
unexpected_errors: register_int_counter!(
"pageserver_tenant_manager_unexpected_errors_total",
"Number of unexpected conditions encountered: nonzero value indicates a non-fatal bug."
)
.expect("failed to define a metric"),
}
});
pub(crate) struct DeletionQueueMetrics {
pub(crate) keys_submitted: IntCounter,
pub(crate) keys_dropped: IntCounter,
@@ -1910,9 +1884,6 @@ pub fn preinitialize_metrics() {
// Deletion queue stats
Lazy::force(&DELETION_QUEUE);
// Tenant manager stats
Lazy::force(&TENANT_MANAGER);
// countervecs
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
.into_iter()

View File

@@ -40,7 +40,7 @@ use tracing::field;
use tracing::*;
use utils::id::ConnectionId;
use utils::{
auth::{Claims, Scope, SwappableJwtAuth},
auth::{Claims, JwtAuth, Scope},
id::{TenantId, TimelineId},
lsn::Lsn,
simple_rcu::RcuReadGuard,
@@ -55,20 +55,16 @@ use crate::metrics;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::Timeline;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
// How long we may block waiting for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
// is not yet in state [`TenantState::Active`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
@@ -122,7 +118,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
listener_ctx: RequestContext,
@@ -190,7 +186,7 @@ pub async fn libpq_listener_main(
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
connection_ctx: RequestContext,
@@ -227,7 +223,13 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let mut conn_handler = PageServerHandler::new(
conf,
broker_client,
auth,
connection_ctx,
task_mgr::shutdown_token(),
);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -253,7 +255,7 @@ async fn page_service_conn_main(
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
claims: Option<Claims>,
/// The context created for the lifetime of the connection
@@ -261,14 +263,19 @@ struct PageServerHandler {
/// For each query received over the connection,
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
/// A token that should fire when the tenant transitions from
/// attached state, or when the pageserver is shutting down.
cancel: CancellationToken,
}
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> Self {
PageServerHandler {
_conf: conf,
@@ -276,6 +283,7 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
cancel,
}
}
@@ -283,11 +291,7 @@ impl PageServerHandler {
/// this rather than naked flush() in order to shut down promptly. Without this, we would
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
/// in the flush.
async fn flush_cancellable<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
cancel: &CancellationToken,
) -> Result<(), QueryError>
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
@@ -295,7 +299,7 @@ impl PageServerHandler {
flush_r = pgb.flush() => {
Ok(flush_r?)
},
_ = cancel.cancelled() => {
_ = self.cancel.cancelled() => {
Err(QueryError::Shutdown)
}
)
@@ -304,7 +308,6 @@ impl PageServerHandler {
fn copyin_stream<'a, IO>(
&'a self,
pgb: &'a mut PostgresBackend<IO>,
cancel: &'a CancellationToken,
) -> impl Stream<Item = io::Result<Bytes>> + 'a
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -314,7 +317,7 @@ impl PageServerHandler {
let msg = tokio::select! {
biased;
_ = cancel.cancelled() => {
_ = self.cancel.cancelled() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
@@ -354,7 +357,7 @@ impl PageServerHandler {
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
@@ -381,13 +384,12 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id();
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = mgr::get_active_tenant_with_timeout(
tenant_id,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await?;
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
let path = tenant
@@ -403,14 +405,9 @@ impl PageServerHandler {
.get_timeline(timeline_id, true)
.map_err(|e| anyhow::anyhow!(e))?;
// Avoid starting new requests if the timeline has already started shutting down,
// and block timeline shutdown until this request is complete, or drops out due
// to cancellation.
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
self.flush_cancellable(pgb).await?;
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
@@ -418,7 +415,7 @@ impl PageServerHandler {
let msg = tokio::select! {
biased;
_ = timeline.cancel.cancelled() => {
_ = self.cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
@@ -493,20 +490,9 @@ impl PageServerHandler {
}
};
if let Err(e) = &response {
if timeline.cancel.is_cancelled() {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropped response during shutdown: {e:#}"));
return Err(QueryError::Shutdown);
}
}
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
// error message is enough
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
@@ -514,7 +500,7 @@ impl PageServerHandler {
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
self.flush_cancellable(pgb).await?;
}
Ok(())
}
@@ -536,14 +522,10 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(
tenant_id,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await?;
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
@@ -561,9 +543,9 @@ impl PageServerHandler {
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb, &tenant.cancel).await?;
self.flush_cancellable(pgb).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(
&mut copyin_reader,
@@ -600,10 +582,9 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.await?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn != start_lsn {
return Err(QueryError::Other(
@@ -617,8 +598,8 @@ impl PageServerHandler {
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
self.flush_cancellable(pgb).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
info!("wal import complete");
@@ -811,9 +792,7 @@ impl PageServerHandler {
let started = std::time::Instant::now();
// check that the timeline exists
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.await?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
@@ -828,7 +807,7 @@ impl PageServerHandler {
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
self.flush_cancellable(pgb).await?;
// Send a tarball of the latest layer on the timeline. Compress if not
// fullbackup. TODO Compress in that case too (tests need to be updated)
@@ -880,7 +859,7 @@ impl PageServerHandler {
}
pgb.write_message_noflush(&BeMessage::CopyDone)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
self.flush_cancellable(pgb).await?;
let basebackup_after = started
.elapsed()
@@ -912,25 +891,6 @@ impl PageServerHandler {
.expect("claims presence already checked");
check_permission(claims, tenant_id)
}
/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_tenant_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(
tenant_id,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
Ok(timeline)
}
}
#[async_trait::async_trait]
@@ -1088,9 +1048,7 @@ where
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id)
.await?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let end_of_timeline = timeline.get_last_record_rlsn();
@@ -1274,12 +1232,7 @@ where
self.check_permission(Some(tenant_id))?;
let tenant = get_active_tenant_with_timeout(
tenant_id,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await?;
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
@@ -1325,16 +1278,67 @@ where
}
}
#[derive(thiserror::Error, Debug)]
enum GetActiveTenantError {
#[error(
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
)]
WaitForActiveTimeout {
latest_state: TenantState,
wait_time: Duration,
},
#[error(transparent)]
NotFound(GetTenantError),
#[error(transparent)]
WaitTenantActive(tenant::WaitToBecomeActiveError),
}
impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
}
}
}
/// Get active tenant.
///
/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
/// ensures that queries don't fail immediately after pageserver startup, because
/// all tenants are still loading.
async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false) {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
}
Err(GetTenantError::Broken(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
}
};
let wait_time = Duration::from_secs(30);
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Err(_) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
Ok(tenant)
} else {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state,
wait_time,
})
}
e => QueryError::Other(anyhow::anyhow!(e)),
}
}
}
@@ -1355,3 +1359,18 @@ impl From<GetActiveTimelineError> for QueryError {
}
}
}
/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_tenant_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
Ok(timeline)
}

View File

@@ -44,17 +44,6 @@ pub enum CalculateLogicalSizeError {
Other(#[from] anyhow::Error),
}
impl From<PageReconstructError> for CalculateLogicalSizeError {
fn from(pre: PageReconstructError) -> Self {
match pre {
PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
Self::Cancelled
}
_ => Self::Other(pre.into()),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
@@ -563,8 +552,7 @@ impl Timeline {
Err(e) => Err(PageReconstructError::from(e)),
},
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
warn!("Failed to get info about AUX files: {}", e);
Ok(HashMap::new())
}
}
@@ -584,17 +572,24 @@ impl Timeline {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
for rel in self
.list_rels(*spcnode, *dbnode, lsn, ctx)
.await
.context("list rels")?
{
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
let mut buf = self.get(relsize_key, lsn, ctx).await?;
let mut buf = self
.get(relsize_key, lsn, ctx)
.await
.with_context(|| format!("read relation size of {rel:?}"))?;
let relsize = buf.get_u32_le();
total_size += relsize as u64;

View File

@@ -257,6 +257,12 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
/// See [`crate::tenant::secondary`].
SecondaryDownloads,
/// See [`crate::tenant::secondary`].
SecondaryUploads,
// Initial logical size calculation
InitialLogicalSizeCalculation,
@@ -299,6 +305,10 @@ pub enum TaskKind {
#[derive(Default)]
struct MutableTaskState {
/// Tenant and timeline that this task is associated with.
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
/// Handle for waiting for the task to exit. It can be None, if the
/// the task has already exited.
join_handle: Option<JoinHandle<()>>,
@@ -315,11 +325,6 @@ struct PageServerTask {
// To request task shutdown, just cancel this token.
cancel: CancellationToken,
/// Tasks may optionally be launched for a particular tenant/timeline, enabling
/// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
mutable: Mutex<MutableTaskState>,
}
@@ -345,9 +350,11 @@ where
kind,
name: name.to_string(),
cancel: cancel.clone(),
tenant_id,
timeline_id,
mutable: Mutex::new(MutableTaskState { join_handle: None }),
mutable: Mutex::new(MutableTaskState {
tenant_id,
timeline_id,
join_handle: None,
}),
});
TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
@@ -417,6 +424,8 @@ async fn task_finish(
let mut shutdown_process = false;
{
let task_mut = task.mutable.lock().unwrap();
match result {
Ok(Ok(())) => {
debug!("Task '{}' exited normally", task_name);
@@ -425,13 +434,13 @@ async fn task_finish(
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task.tenant_id, task.timeline_id, err
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task.tenant_id, task.timeline_id, err
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
}
}
@@ -439,13 +448,13 @@ async fn task_finish(
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task.tenant_id, task.timeline_id, err
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task.tenant_id, task.timeline_id, err
task_name, task_mut.tenant_id, task_mut.timeline_id, err
);
}
}
@@ -457,6 +466,17 @@ async fn task_finish(
}
}
// expected to be called from the task of the given id.
pub fn associate_with(tenant_id: Option<TenantId>, timeline_id: Option<TimelineId>) {
CURRENT_TASK.with(|ct| {
let mut task_mut = ct.mutable.lock().unwrap();
task_mut.tenant_id = tenant_id;
task_mut.timeline_id = timeline_id;
});
}
/// Is there a task running that matches the criteria
/// Signal and wait for tasks to shut down.
///
///
@@ -479,16 +499,17 @@ pub async fn shutdown_tasks(
{
let tasks = TASKS.lock().unwrap();
for task in tasks.values() {
let task_mut = task.mutable.lock().unwrap();
if (kind.is_none() || Some(task.kind) == kind)
&& (tenant_id.is_none() || task.tenant_id == tenant_id)
&& (timeline_id.is_none() || task.timeline_id == timeline_id)
&& (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
&& (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
{
task.cancel.cancel();
victim_tasks.push((
Arc::clone(task),
task.kind,
task.tenant_id,
task.timeline_id,
task_mut.tenant_id,
task_mut.timeline_id,
));
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,10 @@ use std::sync::Arc;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::TenantState;
use remote_storage::{GenericRemoteStorage, RemotePath};
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, warn, Instrument, Span};
use tracing::{error, info, instrument, warn, Instrument, Span};
use utils::{
backoff, completion, crashsafe, fs_ext,
@@ -25,9 +25,11 @@ use super::{
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
span,
timeline::delete::DeleteTimelineFlow,
tree_sort_timelines, DeleteTimelineError, Tenant, TenantPreload,
tree_sort_timelines, DeleteTimelineError, Tenant,
};
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")]
@@ -67,7 +69,7 @@ fn remote_tenant_delete_mark_path(
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.context("tenant path")?;
Ok(tenant_remote_path.join(Utf8Path::new("timelines/deleted")))
Ok(tenant_remote_path.join(Utf8Path::new("deleted")))
}
async fn create_remote_delete_mark(
@@ -157,8 +159,7 @@ async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), Del
// Assert timelines dir is empty.
if !fs_ext::is_directory_empty(timelines_path).await? {
// Display first 10 items in directory
let list = fs_ext::list_dir(timelines_path).await.context("list_dir")?;
let list = &list.into_iter().take(10).collect::<Vec<_>>();
let list = &fs_ext::list_dir(timelines_path).await.context("list_dir")?[..10];
return Err(DeleteTenantError::Other(anyhow::anyhow!(
"Timelines directory is not empty after all timelines deletion: {list:?}"
)));
@@ -247,6 +248,32 @@ async fn cleanup_remaining_fs_traces(
Ok(())
}
pub(crate) async fn remote_delete_mark_exists(
conf: &PageServerConf,
tenant_id: &TenantId,
remote_storage: &GenericRemoteStorage,
) -> anyhow::Result<bool> {
// If remote storage is there we rely on it
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id).context("path")?;
let result = backoff::retry(
|| async { remote_storage.download(&remote_mark_path).await },
|e| matches!(e, DownloadError::NotFound),
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
"fetch_tenant_deletion_mark",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await;
match result {
Ok(_) => Ok(true),
Err(DownloadError::NotFound) => Ok(false),
Err(e) => Err(anyhow::anyhow!(e)).context("remote_delete_mark_exists")?,
}
}
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
/// and deletes its data from both disk and s3.
/// The sequence of steps:
@@ -258,9 +285,10 @@ async fn cleanup_remaining_fs_traces(
/// 6. Remove remote mark
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
/// It is resumable from any step in case a crash/restart occurs.
/// There are two entrypoints to the process:
/// There are three entrypoints to the process:
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
/// 2. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
/// 2. [`DeleteTenantFlow::resume_from_load`] is called during restarts when local or remote deletion marks are still there.
/// 3. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
#[derive(Default)]
pub enum DeleteTenantFlow {
@@ -359,7 +387,7 @@ impl DeleteTenantFlow {
pub(crate) async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_mark_exists: bool,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
let acquire = |t: &Tenant| {
@@ -370,25 +398,66 @@ impl DeleteTenantFlow {
)
};
if remote_mark_exists {
return Ok(acquire(tenant));
}
let tenant_id = tenant.tenant_id;
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
return Ok(acquire(tenant));
}
let remote_storage = match remote_storage {
Some(remote_storage) => remote_storage,
None => return Ok(None),
};
if remote_delete_mark_exists(conf, &tenant_id, remote_storage).await? {
Ok(acquire(tenant))
} else {
Ok(None)
}
}
pub(crate) async fn resume_from_load(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
tenant
.set_stopping(progress, true, false)
.await
.expect("cant be stopping or broken");
// Do not consume valuable resources during the load phase, continue deletion once init phase is complete.
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
info!("waiting for backgound jobs barrier");
background.clone().wait().await;
info!("ready for backgound jobs barrier");
}
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
if timelines_path.exists() {
tenant.load(init_order, None, ctx).await.context("load")?;
}
Self::background(
guard,
tenant.conf,
tenant.remote_storage.clone(),
tenants,
tenant,
)
.await
}
pub(crate) async fn resume_from_attach(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static std::sync::RwLock<TenantsMap>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -399,7 +468,7 @@ impl DeleteTenantFlow {
.expect("cant be stopping or broken");
tenant
.attach(init_order, preload, ctx)
.attach(ctx, super::AttachMarkerMode::Expect)
.await
.context("attach")?;
@@ -537,18 +606,10 @@ impl DeleteTenantFlow {
.await
.context("cleanup_remaining_fs_traces")?;
{
let mut locked = tenants.write().unwrap();
if locked.remove(&tenant.tenant_id).is_none() {
warn!("Tenant got removed from tenants map during deletion");
};
// FIXME: we should not be modifying this from outside of mgr.rs.
// This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080)
crate::metrics::TENANT_MANAGER
.tenant_slots
.set(locked.len() as u64);
}
let mut locked = tenants.write().unwrap();
if locked.remove(&tenant.tenant_id).is_none() {
warn!("Tenant got removed from tenants map during deletion");
};
*guard = Self::Finished;

File diff suppressed because it is too large Load Diff

View File

@@ -406,123 +406,4 @@ mod tests {
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
);
}
#[test]
fn test_metadata_bincode_serde() {
let original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let metadata_bytes = original_metadata
.to_bytes()
.expect("Cannot create bytes array from metadata");
let metadata_bincode_be_bytes = original_metadata
.ser()
.expect("Cannot serialize the metadata");
// 8 bytes for the length of the vector
assert_eq!(metadata_bincode_be_bytes.len(), 8 + metadata_bytes.len());
let expected_bincode_bytes = {
let mut temp = vec![];
let len_bytes = metadata_bytes.len().to_be_bytes();
temp.extend_from_slice(&len_bytes);
temp.extend_from_slice(&metadata_bytes);
temp
};
assert_eq!(metadata_bincode_be_bytes, expected_bincode_bytes);
let deserialized_metadata = TimelineMetadata::des(&metadata_bincode_be_bytes).unwrap();
// Deserialized metadata has the metadata header, which is different from the serialized one.
// Reference: TimelineMetaData::to_bytes()
let expected_metadata = {
let mut temp_metadata = original_metadata;
let body_bytes = temp_metadata
.body
.ser()
.expect("Cannot serialize the metadata body");
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
temp_metadata
};
assert_eq!(deserialized_metadata, expected_metadata);
}
#[test]
fn test_metadata_bincode_serde_ensure_roundtrip() {
let original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let expected_bytes = vec![
/* bincode length encoding bytes */
0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector
/* TimelineMetadataHeader */
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
/* TimelineMetadataBodyV2 */
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
1, 17, 34, 51, 68, 85, 102, 119, 136, 17, 34, 51, 68, 85, 102, 119,
136, // ancestor_timeline (17 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // ancestor_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
0, 0, 0, 15, // pg_version (4 bytes)
/* padding bytes */
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0,
];
let metadata_ser_bytes = original_metadata.ser().unwrap();
assert_eq!(metadata_ser_bytes, expected_bytes);
let expected_metadata = {
let mut temp_metadata = original_metadata;
let body_bytes = temp_metadata
.body
.ser()
.expect("Cannot serialize the metadata body");
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
temp_metadata
};
let des_metadata = TimelineMetadata::des(&metadata_ser_bytes).unwrap();
assert_eq!(des_metadata, expected_metadata);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -168,19 +168,41 @@
//! - create `Timeline` struct and a `RemoteTimelineClient`
//! - initialize the client's upload queue with its `IndexPart`
//! - schedule uploads for layers that are only present locally.
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
//! the local filesystem, write the remote metadata to the local filesystem
//! - After the above is done for each timeline, open the tenant for business by
//! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
//! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
//!
//! We keep track of the fact that a client is in `Attaching` state in a marker
//! file on the local disk. This is critical because, when we restart the pageserver,
//! we do not want to do the `List timelines` step for each tenant that has already
//! been successfully attached (for performance & cost reasons).
//! Instead, for a tenant without the attach marker file, we assume that the
//! local state is in sync or ahead of the remote state. This includes the list
//! of all of the tenant's timelines, which is particularly critical to be up-to-date:
//! if there's a timeline on the remote that the pageserver doesn't know about,
//! the GC will not consider its branch point, leading to data loss.
//! So, for a tenant with the attach marker file, we know that we do not yet have
//! persisted all the remote timeline's metadata files locally. To exclude the
//! risk above, we re-run the procedure for such tenants
//!
//! # Operating Without Remote Storage
//!
//! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
//! not created and the uploads are skipped.
//! Theoretically, it should be ok to remove and re-add remote storage configuration to
//! the pageserver config at any time, since it doesn't make a difference to
//! [`Timeline::load_layer_map`].
//! Of course, the remote timeline dir must not change while we have de-configured
//! remote storage, i.e., the pageserver must remain the owner of the given prefix
//! in remote storage.
//! But note that we don't test any of this right now.
//!
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
mod download;
pub(crate) mod download;
pub mod index;
mod upload;
@@ -446,10 +468,7 @@ impl RemoteTimelineClient {
//
/// Download index file
pub async fn download_index_file(
&self,
cancel: CancellationToken,
) -> Result<MaybeDeletedIndexPart, DownloadError> {
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Index,
&RemoteOpKind::Download,
@@ -463,7 +482,6 @@ impl RemoteTimelineClient {
&self.tenant_id,
&self.timeline_id,
self.generation,
cancel,
)
.measure_remote_op(
self.tenant_id,
@@ -721,17 +739,6 @@ impl RemoteTimelineClient {
})
.collect();
#[cfg(feature = "testing")]
for (name, gen) in &with_generations {
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), *gen) {
if &unexpected == gen {
tracing::error!("{name} was unlinked twice with same generation");
} else {
tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
}
}
}
// after unlinking files from the upload_queue.latest_files we must always schedule an
// index_part update, because that needs to be uploaded before we can actually delete the
// files.
@@ -765,19 +772,6 @@ impl RemoteTimelineClient {
info!("scheduling deletion of layer {}{}", name, gen.get_suffix());
}
#[cfg(feature = "testing")]
for (name, gen) in &with_generations {
match upload_queue.dangling_files.remove(name) {
Some(same) if &same == gen => { /* expected */ }
Some(other) => {
tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
}
None => {
tracing::error!("{name} was unlinked but was not dangling");
}
}
}
// schedule the actual deletions
let op = UploadOp::Delete(Delete {
layers: with_generations,
@@ -803,7 +797,9 @@ impl RemoteTimelineClient {
let names = compacted_from.iter().map(|x| x.layer_desc().filename());
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
let with_generations =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
self.launch_queued_tasks(upload_queue);
Ok(())
@@ -1456,8 +1452,6 @@ impl RemoteTimelineClient {
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
dangling_files: HashMap::default(),
};
let upload_queue = std::mem::replace(
@@ -1501,6 +1495,23 @@ impl RemoteTimelineClient {
}
}
}
pub(crate) fn get_layers_metadata(
&self,
layers: Vec<LayerFileName>,
) -> anyhow::Result<Vec<Option<LayerFileMetadata>>> {
let q = self.upload_queue.lock().unwrap();
let q = match &*q {
UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", q.as_str())
}
UploadQueue::Initialized(inner) => inner,
};
let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned());
Ok(decorated.collect())
}
}
pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
@@ -1541,8 +1552,15 @@ pub fn remote_index_path(
.expect("Failed to construct path")
}
pub const HEATMAP_BASENAME: &str = "heatmap";
pub fn remote_heatmap_path(tenant_id: &TenantId) -> RemotePath {
RemotePath::from_string(&format!("tenants/{tenant_id}/{HEATMAP_BASENAME}-v01"))
.expect("Failed to construct path")
}
/// Given the key of an index, parse out the generation part of the name
pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
let file_name = match path.get_path().file_name() {
Some(f) => f,
None => {
@@ -1731,11 +1749,7 @@ mod tests {
let client = timeline.remote_client.as_ref().unwrap();
// Download back the index.json, and check that the list of files is correct
let initial_index_part = match client
.download_index_file(CancellationToken::new())
.await
.unwrap()
{
let initial_index_part = match client.download_index_file().await.unwrap() {
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
};
@@ -1824,11 +1838,7 @@ mod tests {
}
// Download back the index.json, and check that the list of files is correct
let index_part = match client
.download_index_file(CancellationToken::new())
.await
.unwrap()
{
let index_part = match client.download_index_file().await.unwrap() {
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
};
@@ -2027,7 +2037,7 @@ mod tests {
let client = test_state.build_client(get_generation);
let download_r = client
.download_index_file(CancellationToken::new())
.download_index_file()
.await
.expect("download should always succeed");
assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));

View File

@@ -18,8 +18,8 @@ use crate::config::PageServerConf;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::Generation;
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
use crate::tenant::{Generation, TENANT_DELETED_MARKER_FILE_NAME};
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
@@ -170,43 +170,53 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool {
pub async fn list_remote_timelines(
storage: &GenericRemoteStorage,
tenant_id: TenantId,
cancel: CancellationToken,
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
) -> anyhow::Result<HashSet<TimelineId>> {
let remote_path = remote_timelines_path(&tenant_id);
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
anyhow::bail!("storage-sync-list-remote-timelines");
});
let listing = download_retry_forever(
|| storage.list(Some(&remote_path), ListingMode::WithDelimiter),
&format!("list timelines for {tenant_id}"),
cancel,
let timelines = download_retry(
|| storage.list_prefixes(Some(&remote_path)),
&format!("list prefixes for {tenant_id}"),
)
.await?;
let mut timeline_ids = HashSet::new();
let mut other_prefixes = HashSet::new();
if timelines.is_empty() {
anyhow::bail!("no timelines found on the remote storage")
}
let mut timeline_ids = HashSet::new();
for timeline_remote_storage_key in timelines {
if timeline_remote_storage_key.object_name() == Some(TENANT_DELETED_MARKER_FILE_NAME) {
// A `deleted` key within `timelines/` is a marker file, not a timeline. Ignore it.
// This code will be removed in https://github.com/neondatabase/neon/pull/5580
continue;
}
for timeline_remote_storage_key in listing.prefixes {
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
})?;
match object_name.parse::<TimelineId>() {
Ok(t) => timeline_ids.insert(t),
Err(_) => other_prefixes.insert(object_name.to_string()),
};
let timeline_id: TimelineId = object_name
.parse()
.with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
// list_prefixes is assumed to return unique names. Ensure this here.
// NB: it's safer to bail out than warn-log this because the pageserver
// needs to absolutely know about _all_ timelines that exist, so that
// GC knows all the branchpoints. If we skipped over a timeline instead,
// GC could delete a layer that's still needed by that timeline.
anyhow::ensure!(
!timeline_ids.contains(&timeline_id),
"list_prefixes contains duplicate timeline id {timeline_id}"
);
timeline_ids.insert(timeline_id);
}
for key in listing.keys {
let object_name = key
.object_name()
.ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
other_prefixes.insert(object_name.to_string());
}
Ok((timeline_ids, other_prefixes))
Ok(timeline_ids)
}
async fn do_download_index_part(
@@ -214,11 +224,10 @@ async fn do_download_index_part(
tenant_id: &TenantId,
timeline_id: &TimelineId,
index_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> {
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
let index_part_bytes = download_retry_forever(
let index_part_bytes = download_retry(
|| async {
let mut index_part_download = storage.download(&remote_path).await?;
@@ -233,7 +242,6 @@ async fn do_download_index_part(
Ok(index_part_bytes)
},
&format!("download {remote_path:?}"),
cancel,
)
.await?;
@@ -255,28 +263,19 @@ pub(super) async fn download_index_part(
tenant_id: &TenantId,
timeline_id: &TimelineId,
my_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if my_generation.is_none() {
// Operating without generations: just fetch the generation-less path
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
.await;
return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
}
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
// index in our generation.
//
// This is an optimization to avoid doing the listing for the general case below.
let res = do_download_index_part(
storage,
tenant_id,
timeline_id,
my_generation,
cancel.clone(),
)
.await;
let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
match res {
Ok(index_part) => {
tracing::debug!(
@@ -296,14 +295,8 @@ pub(super) async fn download_index_part(
// we want to find the most recent index from a previous generation.
//
// This is an optimization to avoid doing the listing for the general case below.
let res = do_download_index_part(
storage,
tenant_id,
timeline_id,
my_generation.previous(),
cancel.clone(),
)
.await;
let res =
do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await;
match res {
Ok(index_part) => {
tracing::debug!("Found index_part from previous generation");
@@ -347,14 +340,13 @@ pub(super) async fn download_index_part(
match max_previous_generation {
Some(g) => {
tracing::debug!("Found index_part in generation {g:?}");
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
do_download_index_part(storage, tenant_id, timeline_id, g).await
}
None => {
// Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found");
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
.await
do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await
}
}
}
@@ -384,23 +376,3 @@ where
)
.await
}
async fn download_retry_forever<T, O, F>(
op: O,
description: &str,
cancel: CancellationToken,
) -> Result<T, DownloadError>
where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry(
op,
|e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
FAILED_DOWNLOAD_WARN_THRESHOLD,
u32::MAX,
description,
backoff::Cancel::new(cancel, || DownloadError::Cancelled),
)
.await
}

View File

@@ -6,6 +6,7 @@ use std::collections::HashMap;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata;
@@ -57,6 +58,7 @@ impl LayerFileMetadata {
///
/// This type needs to be backwards and forwards compatible. When changing the fields,
/// remember to add a test case for the changed version.
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexPart {
/// Debugging aid describing the version of this type.
@@ -76,6 +78,7 @@ pub struct IndexPart {
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
// It's duplicated for convenience when reading the serialized structure, but is
// private because internally we would read from metadata instead.
#[serde_as(as = "DisplayFromStr")]
disk_consistent_lsn: Lsn,
#[serde(rename = "metadata_bytes")]
@@ -95,7 +98,7 @@ impl IndexPart {
const LATEST_VERSION: usize = 4;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4];
pub const KNOWN_VERSIONS: &[usize] = &[1, 2, 3, 4];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -152,7 +155,7 @@ pub struct IndexLayerMetadata {
#[serde(default = "Generation::none")]
#[serde(skip_serializing_if = "Generation::is_none")]
pub generation: Generation,
pub(super) generation: Generation,
}
impl From<LayerFileMetadata> for IndexLayerMetadata {

View File

@@ -0,0 +1,268 @@
pub mod downloader;
pub mod heatmap;
pub mod heatmap_writer;
use std::{sync::Arc, time::SystemTime};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
};
use self::{
downloader::{downloader_task, SecondaryDetail},
heatmap_writer::heatmap_writer_task,
};
use super::{
mgr::TenantManager,
storage_layer::{AsLayerDesc, Layer},
timeline::DiskUsageEvictionInfo,
};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
enum DownloadCommand {
Download(TenantId),
}
enum UploadCommand {
Upload(TenantId),
}
struct CommandRequest<T> {
payload: T,
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
}
struct CommandResponse {
result: anyhow::Result<()>,
}
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
// we do for secondary tenant locations: where we are not serving clients or
// ingesting WAL, but we are maintaining a warm cache of layer files.
//
// This type is all about the _download_ path for secondary mode. The upload path
// runs while a regular attached `Tenant` exists.
//
// This structure coordinates TenantManager and SecondaryDownloader,
// so that the downloader can indicate which tenants it is currently
// operating on, and the manager can indicate when a particular
// secondary tenant should cancel any work in flight.
#[derive(Debug)]
pub(crate) struct SecondaryTenant {
/// Cancellation token indicates to SecondaryDownloader that it should stop doing
/// any work for this tenant at the next opportunity.
pub(crate) cancel: CancellationToken,
/// Lock must be held by SecondaryDownloader at any time that it might be operating
/// on the local filesystem directory for this tenant ID.
// Ordering: the TenantManager must set the cancellation token _before_
// taking the lock. The SecondaryDownloader must always check the cancellation
// token immediately _after_ taking the lock (and at appropriate intervals
// while holding it).
pub(crate) busy: Arc<tokio::sync::Mutex<()>>,
detail: std::sync::Mutex<SecondaryDetail>,
// TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads
}
impl SecondaryTenant {
pub(crate) fn new() -> Arc<Self> {
// TODO; consider whether we really need to Arc this
Arc::new(Self {
busy: Arc::new(tokio::sync::Mutex::new(())),
// todo: shall we make this a descendent of the
// main cancellation token, or is it sufficient that
// on shutdown we walk the tenants and fire their
// individual cancellations?
cancel: CancellationToken::new(),
detail: std::sync::Mutex::default(),
})
}
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
// Wait for any secondary downloader work to complete: once we
// acquire this lock, we are guaranteed that the secondary downloader
// won't touch the local filesystem again for this instance: it is safe
// to e.g. construct a `Tenant` for the same TenantId
drop(self.busy.lock().await);
}
pub(crate) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
self.detail.lock().unwrap().get_layers_for_eviction()
}
pub(crate) async fn evict_layers(
&self,
_guard: tokio::sync::OwnedMutexGuard<()>,
conf: &PageServerConf,
tenant_id: &TenantId,
layers: Vec<(TimelineId, Layer)>,
) {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
if self.cancel.is_cancelled() {
// Eviction is a no-op if shutdown() was already called.
tracing::info!(
"Dropping {} layer evictions, secondary tenant shutting down",
layers.len()
);
return;
}
let now = SystemTime::now();
for (timeline_id, layer) in layers {
let layer_name = layer.layer_desc().filename();
let path = conf
.timeline_path(tenant_id, &timeline_id)
.join(&layer_name.file_name());
// We tolerate ENOENT, because between planning eviction and executing
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
tokio::fs::remove_file(path)
.await
.or_else(fs_ext::ignore_not_found)
.expect("TODO: terminate process on local I/O errors");
// TODO: batch up updates instead of acquiring lock in inner loop
let mut detail = self.detail.lock().unwrap();
// If there is no timeline detail for what we just deleted, that indicates that
// the secondary downloader did some work (perhaps removing all)
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
timeline_detail.on_disk_layers.remove(&layer_name);
timeline_detail.evicted_at.insert(layer_name, now);
}
}
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
}
impl SecondaryController {
async fn dispatch<T>(
&self,
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
payload: T,
) -> anyhow::Result<()> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
queue
.send(CommandRequest {
payload,
response_tx,
})
.await
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
let response = response_rx
.await
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
response.result
}
pub async fn download_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.download_req_tx, DownloadCommand::Download(tenant_id))
.await
}
pub async fn upload_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_id))
.await
}
}
pub fn spawn_tasks(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> SecondaryController {
let mgr_clone = tenant_manager.clone();
let storage_clone = remote_storage.clone();
let cancel_clone = cancel.clone();
let bg_jobs_clone = background_jobs_can_start.clone();
let (download_req_tx, download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"secondary tenant downloads",
false,
async move {
downloader_task(
conf,
mgr_clone,
storage_clone,
download_req_rx,
bg_jobs_clone,
cancel_clone,
)
.await
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"heatmap uploads",
false,
async move {
heatmap_writer_task(
tenant_manager,
remote_storage,
upload_req_rx,
background_jobs_can_start,
cancel,
)
.await
},
);
SecondaryController {
download_req_tx,
upload_req_tx,
}
}
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
pub fn null_controller() -> SecondaryController {
let (download_req_tx, _download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, _upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
SecondaryController {
upload_req_tx,
download_req_tx,
}
}

View File

@@ -0,0 +1,579 @@
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use crate::{
config::PageServerConf,
tenant::{
remote_timeline_client::index::LayerFileMetadata,
secondary::CommandResponse,
storage_layer::{Layer, LayerFileName},
timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction},
},
METADATA_FILE_NAME,
};
use super::SecondaryTenant;
use crate::tenant::{
mgr::TenantManager,
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
use anyhow::Context;
use chrono::format::{DelayedFormat, StrftimeItems};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
use super::{
heatmap::{HeatMapTenant, HeatMapTimeline},
CommandRequest, DownloadCommand,
};
/// Interval between checking if any Secondary tenants have download work to do:
/// note that this is _not_ the frequency with which we actually freshen the tenants,
/// just the frequency with which we wake up to decide whether anyone needs freshening.
///
/// Making this somewhat infrequent reduces the load on mutexes inside TenantManager
/// and SecondaryTenant for reads when checking for work to do.
const DOWNLOAD_CHECK_INTERVAL: Duration = Duration::from_millis(10000);
/// For each tenant, how long must have passed since the last freshen_tenant call before
/// calling it again. This is approximately the time by which local data is allowed
/// to fall behind remote data.
///
/// TODO: this should be an upper bound, and tenants that are uploading regularly
/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second
/// should not wait a minute between freshens)
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
#[derive(Debug, Clone)]
pub(super) struct OnDiskState {
layer: Layer,
access_time: SystemTime,
}
impl OnDiskState {
fn new(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
name: LayerFileName,
metadata: LayerFileMetadata,
access_time: SystemTime,
) -> Self {
Self {
layer: Layer::for_secondary(conf, tenant_id, timeline_id, name, metadata),
access_time,
}
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct SecondaryDetailTimeline {
pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
/// We remember when layers were evicted, to prevent re-downloading them.
/// TODO: persist this, so that we don't try and re-download everything on restart.
pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
}
/// This state is written by the secondary downloader, it is opaque
/// to TenantManager
#[derive(Default, Debug)]
pub(super) struct SecondaryDetail {
freshened_at: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
/// Helper for logging SystemTime
fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
datetime.format("%d/%m/%Y %T")
}
impl SecondaryDetail {
pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
let mut result = Vec::new();
for (timeline_id, timeline_detail) in &self.timelines {
let layers: Vec<_> = timeline_detail
.on_disk_layers
.values()
.map(|ods| LocalLayerInfoForDiskUsageEviction {
layer: ods.layer.clone(),
last_activity_ts: ods.access_time,
})
.collect();
let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max();
result.push((
*timeline_id,
DiskUsageEvictionInfo {
resident_layers: layers,
max_layer_size,
},
))
}
result
}
}
/// Keep trying to do downloads until the cancellation token is fired. Remote storage
/// errors are handled internally: any error returned by this function is an unexpected
/// internal error of some kind.
pub(super) async fn downloader_task(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let downloader = SecondaryDownloader {
conf,
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
downloader.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(DOWNLOAD_CHECK_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = downloader.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct SecondaryDownloader {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
struct TenantJob {
tenant_id: TenantId,
secondary_state: Arc<SecondaryTenant>,
// This mutex guard conveys the right to write to the tenant's local directory: it must
// be taken before doing downloads, and TenantManager must ensure it has been released
// before it considers shutdown complete for the secondary state -- [`SecondaryDownloader`]
// will thereby never be racing with [`Tenant`] for access to local files.
_guard: tokio::sync::OwnedMutexGuard<()>,
}
impl SecondaryDownloader {
async fn iteration(&self) -> anyhow::Result<()> {
// Step 1: identify some tenants that we may work on
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
});
// Step 2: prioritized selection of next batch of tenants to freshen
let now = Instant::now();
let candidates = candidates.into_iter().filter(|c| {
let detail = c.secondary_state.detail.lock().unwrap();
match detail.freshened_at {
None => true, // Not yet freshened, therefore elegible to run
Some(t) => {
let since = now.duration_since(t);
since > DOWNLOAD_FRESHEN_INTERVAL
}
}
});
// TODO: don't just cut down the list, prioritize it to freshen the stalest tenants first
// TODO: bounded parallelism
// Step 3: spawn freshen_tenant tasks
for job in candidates {
if job.secondary_state.cancel.is_cancelled() {
continue;
}
async {
if let Err(e) = self.freshen_tenant(&job).await {
tracing::info!("Failed to freshen secondary content: {e:#}")
};
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
// take priority to run again.
let mut detail = job.secondary_state.detail.lock().unwrap();
detail.freshened_at = Some(Instant::now());
}
.instrument(tracing::info_span!(
"freshen_tenant",
tenant_id = %job.tenant_id
))
.await;
}
Ok(())
}
async fn handle_command(&self, command: DownloadCommand) -> anyhow::Result<()> {
match command {
DownloadCommand::Download(req_tenant_id) => {
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
tracing::info!("foreach_secondary: {tenant_id} ({req_tenant_id})");
if tenant_id == &req_tenant_id {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
}
});
let tenant_job = if candidates.len() != 1 {
anyhow::bail!("Tenant not found in secondary mode");
} else {
candidates.pop().unwrap()
};
self.freshen_tenant(&tenant_job).await
}
}
}
async fn download_heatmap(&self, tenant_id: &TenantId) -> anyhow::Result<HeatMapTenant> {
// TODO: make download conditional on ETag having changed since last download
let heatmap_path = remote_heatmap_path(tenant_id);
// TODO: wrap this download in a select! that checks self.cancel
let mut download = self.remote_storage.download(&heatmap_path).await?;
let mut heatmap_bytes = Vec::new();
let _size = tokio::io::copy(&mut download.download_stream, &mut heatmap_bytes)
.await
.with_context(|| format!("download heatmap {heatmap_path:?}"))?;
Ok(serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?)
}
async fn init_timeline_state(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
heatmap: &HeatMapTimeline,
) -> anyhow::Result<SecondaryDetailTimeline> {
let timeline_path = self.conf.timeline_path(tenant_id, timeline_id);
let mut detail = SecondaryDetailTimeline::default();
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
Ok(d) => d,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
tracing::info!("Creating timeline directory {timeline_path}");
tokio::fs::create_dir(&timeline_path).await?;
// No entries to report: drop out.
return Ok(detail);
} else {
return Err(e.into());
}
}
};
let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect();
while let Some(dentry) = dir.next_entry().await? {
let dentry_file_name = dentry.file_name();
let file_name = dentry_file_name.to_string_lossy();
let local_meta = dentry.metadata().await?;
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
if file_name == METADATA_FILE_NAME {
continue;
}
match LayerFileName::from_str(&file_name) {
Ok(name) => {
let remote_meta = heatmap_metadata.get(&name);
match remote_meta {
Some(remote_meta) => {
// TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
if local_meta.len() != remote_meta.metadata.file_size {
// This should not happen, because we do crashsafe write-then-rename when downloading
// layers, and layers in remote storage are immutable. Remove the local file because
// we cannot trust it.
tracing::warn!("Removing local layer {name} with unexpected local size {} != {}",
local_meta.len(), remote_meta.metadata.file_size);
} else {
// We expect the access time to be initialized immediately afterwards, when
// the latest heatmap is applied to the state.
detail.on_disk_layers.insert(
name.clone(),
OnDiskState::new(
self.conf,
tenant_id,
timeline_id,
name,
LayerFileMetadata::from(&remote_meta.metadata),
remote_meta.access_time,
),
);
}
}
None => {
// FIXME: consider some optimization when transitioning from attached to secondary: maybe
// wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
// we will end up deleting any layers which were created+uploaded more recently than the heatmap.
tracing::info!(
"Removing secondary local layer {} because it's absent in heatmap",
name
);
tokio::fs::remove_file(dentry.path()).await?;
}
}
}
Err(_) => {
// Ignore it.
tracing::warn!("Unexpected file in timeline directory: {file_name}");
}
}
}
Ok(detail)
}
async fn freshen_timeline(
&self,
job: &TenantJob,
timeline: HeatMapTimeline,
) -> anyhow::Result<()> {
let timeline_path = self
.conf
.timeline_path(&job.tenant_id, &timeline.timeline_id);
// Accumulate updates to the state
let mut touched = Vec::new();
// Clone a view of what layers already exist on disk
let timeline_state = job
.secondary_state
.detail
.lock()
.unwrap()
.timelines
.get(&timeline.timeline_id)
.cloned();
let timeline_state = match timeline_state {
Some(t) => t,
None => {
// We have no existing state: need to scan local disk for layers first.
self.init_timeline_state(&job.tenant_id, &timeline.timeline_id, &timeline)
.await?
}
};
let layers_in_heatmap = timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
let local_path = timeline_path.join(layer.to_string());
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.cancel.is_cancelled() {
return Ok(());
}
// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata)
|| on_disk.access_time != layer.access_time
{
// We already have this layer on disk. Update its access time.
tracing::trace!(
"Access time updated for layer {}: {} -> {}",
layer.name,
strftime(&on_disk.access_time),
strftime(&layer.access_time)
);
touched.push(layer);
}
continue;
}
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
// recently than it was evicted.
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
if &layer.access_time > evicted_at {
tracing::info!(
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
} else {
tracing::trace!(
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
continue;
}
}
match download_layer_file(
self.conf,
&self.remote_storage,
job.tenant_id,
timeline.timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
)
.await
{
Ok(downloaded_bytes) => {
if downloaded_bytes != layer.metadata.file_size {
let local_path = timeline_path.join(layer.name.to_string());
tracing::error!(
"Downloaded layer {} with unexpected size {} != {}",
layer.name,
downloaded_bytes,
layer.metadata.file_size
);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
touched.push(layer)
}
Err(e) => {
// No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect
// that on some future call to freshen the download will work.
// TODO: refine this behavior.
tracing::info!("Failed to download layer {}: {}", layer.name, e);
}
}
}
// Write updates to state to record layers we just downloaded or touched.
{
let mut detail = job.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
for t in touched {
use std::collections::hash_map::Entry;
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = t.access_time;
}
Entry::Vacant(e) => {
e.insert(OnDiskState::new(
self.conf,
&job.tenant_id,
&timeline.timeline_id,
t.name,
LayerFileMetadata::from(&t.metadata),
t.access_time,
));
}
}
}
}
Ok(())
}
async fn freshen_tenant(&self, job: &TenantJob) -> anyhow::Result<()> {
// Download the tenant's heatmap
let heatmap = self.download_heatmap(&job.tenant_id).await?;
// Download the layers in the heatmap
for timeline in heatmap.timelines {
if self.cancel.is_cancelled() {
return Ok(());
}
self.freshen_timeline(job, timeline).await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,57 @@
use std::time::SystemTime;
use crate::tenant::{
remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::TimelineId;
#[derive(Serialize, Deserialize)]
pub(super) struct HeatMapTenant {
pub(super) timelines: Vec<HeatMapTimeline>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(super) name: LayerFileName,
pub(super) metadata: IndexLayerMetadata,
pub(super) access_time: SystemTime,
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}
impl HeatMapLayer {
pub(crate) fn new(
name: LayerFileName,
metadata: IndexLayerMetadata,
access_time: SystemTime,
) -> Self {
Self {
name,
metadata,
access_time,
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(super) timeline_id: TimelineId,
pub(super) layers: Vec<HeatMapLayer>,
}
impl HeatMapTimeline {
pub(crate) fn new(timeline_id: TimelineId, layers: Vec<HeatMapLayer>) -> Self {
Self {
timeline_id,
layers,
}
}
}

View File

@@ -0,0 +1,207 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::tenant::{
mgr::TenantManager, remote_timeline_client::remote_heatmap_path, secondary::CommandResponse,
Tenant,
};
use pageserver_api::models::TenantState;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{backoff, completion::Barrier};
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000);
pub(super) async fn heatmap_writer_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let writer = HeatmapWriter {
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
writer.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(HEATMAP_UPLOAD_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = writer.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct HeatmapWriter {
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
impl HeatmapWriter {
async fn iteration(&self) -> anyhow::Result<()> {
let tenants = self.tenant_manager.get_attached_tenants();
for tenant in tenants {
if self.cancel.is_cancelled() {
return Ok(());
}
if tenant.current_state() != TenantState::Active {
continue;
}
// TODO: add a mechanism to check whether the active layer set has
// changed since our last write
// TODO: add a minimum time between uploads
match self
.write_tenant(&tenant)
.instrument(tracing::info_span!(
"write_tenant",
tenant_id = %tenant.get_tenant_id()
))
.await
{
Ok(()) => {}
Err(e) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
tenant.get_tenant_id(),
)
}
}
}
Ok(())
}
async fn handle_command(&self, command: UploadCommand) -> anyhow::Result<()> {
match command {
UploadCommand::Upload(tenant_id) => {
let tenants = self.tenant_manager.get_attached_tenants();
let map = tenants
.iter()
.map(|t| (t.get_tenant_id(), t))
.collect::<HashMap<_, _>>();
match map.get(&tenant_id) {
Some(tenant) => self.write_tenant(tenant).await,
None => {
anyhow::bail!("Tenant is not attached");
}
}
}
}
}
async fn write_tenant(&self, tenant: &Arc<Tenant>) -> anyhow::Result<()> {
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
};
let timelines = tenant.timelines.lock().unwrap().clone();
let tenant_cancel = tenant.cancel.clone();
// Ensure that Tenant::shutdown waits for any upload in flight
let _guard = {
let hook = tenant.heatmap_hook.lock().unwrap();
match hook.enter() {
Some(g) => g,
None => {
// Tenant is shutting down
tracing::info!("Skipping, tenant is shutting down");
return Ok(());
}
}
};
for (timeline_id, timeline) in timelines {
let heatmap_timeline = timeline.generate_heatmap().await;
match heatmap_timeline {
None => {
tracing::debug!(
"Skipping heatmap upload because timeline {timeline_id} is not ready"
);
return Ok(());
}
Some(heatmap_timeline) => {
heatmap.timelines.push(heatmap_timeline);
}
}
}
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap)?;
let size = bytes.len();
let path = remote_heatmap_path(&tenant.get_tenant_id());
// Write the heatmap.
tracing::debug!("Uploading {size} byte heatmap to {path}");
if let Err(e) = backoff::retry(
|| async {
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone()));
let bytes = Box::new(bytes);
self.remote_storage
.upload_storage_object(bytes, size, &path)
.await
},
|_| false,
3,
u32::MAX,
"Uploading heatmap",
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
{
if tenant_cancel.is_cancelled() {
return Ok(());
} else {
return Err(e);
}
}
tracing::info!("Successfully uploading {size} byte heatmap to {path}");
Ok(())
}
}

View File

@@ -29,6 +29,7 @@ use tenant_size_model::{Segment, StorageModel};
/// needs. We will convert this into a StorageModel when it's time to perform
/// the calculation.
///
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ModelInputs {
pub segments: Vec<SegmentMeta>,
@@ -36,9 +37,11 @@ pub struct ModelInputs {
}
/// A [`Segment`], with some extra information for display purposes
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SegmentMeta {
pub segment: Segment,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
pub kind: LsnKind,
}
@@ -74,22 +77,32 @@ pub enum LsnKind {
/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
/// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct TimelineInputs {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub ancestor_id: Option<TimelineId>,
#[serde_as(as = "serde_with::DisplayFromStr")]
ancestor_lsn: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
last_record: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
latest_gc_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
horizon_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
pitr_cutoff: Lsn,
/// Cutoff point based on GC settings
#[serde_as(as = "serde_with::DisplayFromStr")]
next_gc_cutoff: Lsn,
/// Cutoff point calculated from the user-supplied 'max_retention_period'
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
retention_param_cutoff: Option<Lsn>,
}
@@ -406,12 +419,10 @@ async fn fill_logical_sizes(
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
}
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {

View File

@@ -345,19 +345,14 @@ impl InMemoryLayer {
let cursor = inner.file.block_cursor();
// Sort the keys because delta layer writer expects them sorted.
//
// NOTE: this sort can take up significant time if the layer has millions of
// keys. To speed up all the comparisons we convert the key to i128 and
// keep the value as a reference.
let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
keys.sort_unstable_by_key(|k| k.0);
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
for (key, vec_map) in keys.iter() {
let key = Key::from_i128(*key);
let key = **key;
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;

View File

@@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use tracing::Instrument;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::sync::heavier_once_cell;
@@ -47,16 +48,7 @@ pub(crate) struct Layer(Arc<LayerInner>);
impl std::fmt::Display for Layer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if matches!(self.0.generation, Generation::Broken) {
write!(f, "{}-broken", self.layer_desc().short_id())
} else {
write!(
f,
"{}{}",
self.layer_desc().short_id(),
self.0.generation.get_suffix()
)
}
write!(f, "{}", self.layer_desc().short_id())
}
}
@@ -91,7 +83,38 @@ impl Layer {
let owner = Layer(Arc::new(LayerInner::new(
conf,
timeline,
Arc::downgrade(timeline),
access_stats,
desc,
None,
metadata.generation,
)));
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
owner
}
/// A layer which is resident locally in a secondary location: not associated with a live Timeline object.
pub(crate) fn for_secondary(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> Self {
let desc = PersistentLayerDesc::from_filename(
*tenant_id,
*timeline_id,
file_name,
metadata.file_size(),
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
let owner = Layer(Arc::new(LayerInner::new(
conf,
Weak::default(),
access_stats,
desc,
None,
@@ -125,13 +148,12 @@ impl Layer {
let inner = Arc::new(DownloadedLayer {
owner: owner.clone(),
kind: tokio::sync::OnceCell::default(),
version: 0,
});
resident = Some(inner.clone());
LayerInner::new(
conf,
timeline,
Arc::downgrade(timeline),
access_stats,
desc,
Some(inner),
@@ -164,7 +186,6 @@ impl Layer {
let inner = Arc::new(DownloadedLayer {
owner: owner.clone(),
kind: tokio::sync::OnceCell::default(),
version: 0,
});
resident = Some(inner.clone());
let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
@@ -174,7 +195,7 @@ impl Layer {
);
LayerInner::new(
conf,
timeline,
Arc::downgrade(timeline),
access_stats,
desc,
Some(inner),
@@ -330,46 +351,42 @@ impl Layer {
/// read with [`Layer::get_value_reconstruct_data`].
///
/// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
#[derive(Debug)]
enum ResidentOrWantedEvicted {
Resident(Arc<DownloadedLayer>),
WantedEvicted(Weak<DownloadedLayer>, usize),
WantedEvicted(Weak<DownloadedLayer>),
}
impl ResidentOrWantedEvicted {
fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
fn get(&self) -> Option<Arc<DownloadedLayer>> {
match self {
ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
ResidentOrWantedEvicted::WantedEvicted(weak) => match weak.upgrade() {
Some(strong) => {
LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
*self = ResidentOrWantedEvicted::Resident(strong.clone());
Some((strong, true))
Some(strong)
}
None => None,
},
}
}
/// When eviction is first requested, drop down to holding a [`Weak`].
///
/// Returns `Some` if this was the first time eviction was requested. Care should be taken to
/// drop the possibly last strong reference outside of the mutex of
/// heavier_once_cell::OnceCell.
fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
match self {
/// Returns `true` if this was the first time eviction was requested.
fn downgrade(&mut self) -> &Weak<DownloadedLayer> {
let _was_first = match self {
ResidentOrWantedEvicted::Resident(strong) => {
let weak = Arc::downgrade(strong);
let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
std::mem::swap(self, &mut temp);
match temp {
ResidentOrWantedEvicted::Resident(strong) => Some(strong),
ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
}
*self = ResidentOrWantedEvicted::WantedEvicted(weak);
// returning the weak is not useful, because the drop could had already ran with
// the replacement above, and that will take care of cleaning the Option we are in
true
}
ResidentOrWantedEvicted::WantedEvicted(..) => None,
ResidentOrWantedEvicted::WantedEvicted(_) => false,
};
match self {
ResidentOrWantedEvicted::WantedEvicted(ref weak) => weak,
_ => unreachable!("just wrote wanted evicted"),
}
}
}
@@ -404,17 +421,11 @@ struct LayerInner {
/// [`LayerInner::on_downloaded_layer_drop`].
wanted_evicted: AtomicBool,
/// Version is to make sure we will only evict a specific download of a file.
///
/// Incremented for each download, stored in `DownloadedLayer::version` or
/// `ResidentOrWantedEvicted::WantedEvicted`.
/// Version is to make sure we will in fact only evict a file if no new download has been
/// started.
version: AtomicUsize,
/// Allow subscribing to when the layer actually gets evicted.
///
/// If in future we need to implement "wait until layer instances are gone and done", carrying
/// this over to the gc spawn_blocking from LayerInner::drop will do the trick, and adding a
/// method for "wait_gc" which will wait to this being closed.
status: tokio::sync::broadcast::Sender<Status>,
/// Counter for exponential backoff with the download
@@ -454,7 +465,7 @@ impl Drop for LayerInner {
return;
}
let span = tracing::info_span!(parent: None, "layer_gc", tenant_id = %self.layer_desc().tenant_id, timeline_id = %self.layer_desc().timeline_id);
let span = tracing::info_span!(parent: None, "layer_gc", tenant_id = %self.layer_desc().tenant_id, timeline_id = %self.layer_desc().timeline_id, layer = %self);
let path = std::mem::take(&mut self.path);
let file_name = self.layer_desc().filename();
@@ -517,35 +528,36 @@ impl Drop for LayerInner {
impl LayerInner {
fn new(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
timeline: Weak<Timeline>,
access_stats: LayerAccessStats,
desc: PersistentLayerDesc,
downloaded: Option<Arc<DownloadedLayer>>,
generation: Generation,
) -> Self {
let path = conf
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
.timeline_path(&desc.tenant_id, &desc.timeline_id)
.join(desc.filename().to_string());
let (inner, version) = if let Some(inner) = downloaded {
let version = inner.version;
let resident = ResidentOrWantedEvicted::Resident(inner);
(heavier_once_cell::OnceCell::new(resident), version)
} else {
(heavier_once_cell::OnceCell::default(), 0)
};
let have_remote_client = timeline
.upgrade()
.map(|t| t.remote_client.is_some())
.unwrap_or(false);
LayerInner {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
timeline,
have_remote_client,
access_stats,
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner,
version: AtomicUsize::new(version),
inner: if let Some(inner) = downloaded {
heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner))
} else {
heavier_once_cell::OnceCell::default()
},
version: AtomicUsize::new(0),
status: tokio::sync::broadcast::channel(1).0,
consecutive_failures: AtomicUsize::new(0),
generation,
@@ -565,8 +577,6 @@ impl LayerInner {
}
}
/// Cancellation safe, however dropping the future and calling this method again might result
/// in a new attempt to evict OR join the previously started attempt.
pub(crate) async fn evict_and_wait(
&self,
_: &RemoteTimelineClient,
@@ -577,22 +587,20 @@ impl LayerInner {
let mut rx = self.status.subscribe();
let strong = {
match self.inner.get() {
Some(mut either) => {
self.wanted_evicted.store(true, Ordering::Relaxed);
either.downgrade()
}
None => return Err(EvictionError::NotFound),
}
};
let res =
self.wanted_evicted
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
if strong.is_some() {
// drop the DownloadedLayer outside of the holding the guard
drop(strong);
if res.is_ok() {
LAYER_IMPL_METRICS.inc_started_evictions();
}
if self.get().is_none() {
// it was not evictable in the first place
// our store to the wanted_evicted does not matter; it will be reset by next download
return Err(EvictionError::NotFound);
}
match rx.recv().await {
Ok(Status::Evicted) => Ok(()),
Ok(Status::Downloaded) => Err(EvictionError::Downloaded),
@@ -606,8 +614,7 @@ impl LayerInner {
//
// use however late (compared to the initial expressing of wanted) as the
// "outcome" now
LAYER_IMPL_METRICS.inc_broadcast_lagged();
match self.inner.get() {
match self.get() {
Some(_) => Err(EvictionError::Downloaded),
None => Ok(()),
}
@@ -615,19 +622,17 @@ impl LayerInner {
}
}
/// Cancellation safe.
#[tracing::instrument(skip_all, fields(layer=%self))]
/// Should be cancellation safe, but cancellation is troublesome together with the spawned
/// download.
async fn get_or_maybe_download(
self: &Arc<Self>,
allow_download: bool,
ctx: Option<&RequestContext>,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let mut init_permit = None;
loop {
let download = move |permit| async move {
let download = move || async move {
// disable any scheduled but not yet running eviction deletions for this
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
self.version.fetch_add(1, Ordering::Relaxed);
// no need to make the evict_and_wait wait for the actual download to complete
drop(self.status.send(Status::Downloaded));
@@ -646,11 +651,7 @@ impl LayerInner {
.await
.map_err(DownloadError::PreStatFailed)?;
let permit = if let Some(reason) = needs_download {
if let NeedsDownload::NotFile(ft) = reason {
return Err(DownloadError::NotFile(ft));
}
if let Some(reason) = needs_download {
// only reset this after we've decided we really need to download. otherwise it'd
// be impossible to mark cancelled downloads for eviction, like one could imagine
// we would like to do for prefetching which was not needed.
@@ -660,6 +661,8 @@ impl LayerInner {
return Err(DownloadError::NoRemoteStorage);
}
tracing::debug!(%reason, "downloading layer");
if let Some(ctx) = ctx {
self.check_expected_download(ctx)?;
}
@@ -670,21 +673,16 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
tracing::info!(%reason, "downloading on-demand");
self.spawn_download_and_wait(timeline, permit).await?
self.spawn_download_and_wait(timeline).await?;
} else {
// the file is present locally, probably by a previous but cancelled call to
// get_or_maybe_download. alternatively we might be running without remote storage.
LAYER_IMPL_METRICS.inc_init_needed_no_download();
permit
};
}
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
kind: tokio::sync::OnceCell::default(),
version: next_version,
});
self.access_stats.record_residence_event(
@@ -692,60 +690,19 @@ impl LayerInner {
LayerResidenceEventReason::ResidenceChange,
);
let waiters = self.inner.initializer_count();
if waiters > 0 {
tracing::info!(waiters, "completing the on-demand download for other tasks");
}
Ok((ResidentOrWantedEvicted::Resident(res), permit))
Ok(ResidentOrWantedEvicted::Resident(res))
};
if let Some(init_permit) = init_permit.take() {
// use the already held initialization permit because it is impossible to hit the
// below paths anymore essentially limiting the max loop iterations to 2.
let (value, init_permit) = download(init_permit).await?;
let mut guard = self.inner.set(value, init_permit);
let (strong, _upgraded) = guard
.get_and_upgrade()
.expect("init creates strong reference, we held the init permit");
let locked = self.inner.get_or_init(download).await?;
if let Some(strong) = Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted)
{
return Ok(strong);
}
let (weak, permit) = {
let mut locked = self.inner.get_or_init(download).await?;
if let Some((strong, upgraded)) = locked.get_and_upgrade() {
if upgraded {
// when upgraded back, the Arc<DownloadedLayer> is still available, but
// previously a `evict_and_wait` was received.
self.wanted_evicted.store(false, Ordering::Relaxed);
// error out any `evict_and_wait`
drop(self.status.send(Status::Downloaded));
LAYER_IMPL_METRICS
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
}
return Ok(strong);
} else {
// path to here: the evict_blocking is stuck on spawn_blocking queue.
//
// reset the contents, deactivating the eviction and causing a
// EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed.
locked.take_and_deinit()
}
};
// unlock first, then drop the weak, but because upgrade failed, we
// know it cannot be a problem.
assert!(
matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
"unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
);
init_permit = Some(permit);
// the situation in which we might need to retry is that our init was ready
// immediatedly, but the DownloadedLayer had been dropped BUT failed to complete
// Self::evict_blocking
LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download();
}
}
@@ -757,8 +714,8 @@ impl LayerInner {
match b {
Download => Ok(()),
Warn | Error => {
tracing::info!(
"unexpectedly on-demand downloading for task kind {:?}",
tracing::warn!(
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
ctx.task_kind()
);
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
@@ -780,17 +737,14 @@ impl LayerInner {
async fn spawn_download_and_wait(
self: &Arc<Self>,
timeline: Arc<Timeline>,
permit: heavier_once_cell::InitPermit,
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
) -> Result<(), DownloadError> {
let task_name = format!("download layer {}", self);
let (tx, rx) = tokio::sync::oneshot::channel();
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
let this: Arc<Self> = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
crate::task_mgr::TaskKind::RemoteDownloadTask,
@@ -799,7 +753,6 @@ impl LayerInner {
&task_name,
false,
async move {
let client = timeline
.remote_client
.as_ref()
@@ -821,9 +774,9 @@ impl LayerInner {
}
};
if let Err(res) = tx.send((result, permit)) {
if let Err(res) = tx.send(result) {
match res {
(Ok(()), _) => {
Ok(()) => {
// our caller is cancellation safe so this is fine; if someone
// else requests the layer, they'll find it already downloaded
// or redownload.
@@ -834,7 +787,7 @@ impl LayerInner {
tracing::info!("layer file download completed after requester had cancelled");
LAYER_IMPL_METRICS.inc_download_completed_without_requester();
},
(Err(e), _) => {
Err(e) => {
// our caller is cancellation safe, but we might be racing with
// another attempt to initialize. before we have cancellation
// token support: these attempts should converge regardless of
@@ -850,7 +803,7 @@ impl LayerInner {
.in_current_span(),
);
match rx.await {
Ok((Ok(()), permit)) => {
Ok(Ok(())) => {
if let Some(reason) = self
.needs_download()
.await
@@ -861,12 +814,10 @@ impl LayerInner {
}
self.consecutive_failures.store(0, Ordering::Relaxed);
tracing::info!("on-demand download successful");
Ok(permit)
Ok(())
}
Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive
Ok(Err(e)) => {
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
@@ -884,6 +835,33 @@ impl LayerInner {
}
}
/// Access the current state without waiting for the file to be downloaded.
///
/// Requires that we've initialized to state which is respective to the
/// actual residency state.
fn get(&self) -> Option<Arc<DownloadedLayer>> {
let locked = self.inner.get();
Self::get_or_apply_evictedness(locked, &self.wanted_evicted)
}
fn get_or_apply_evictedness(
guard: Option<heavier_once_cell::Guard<'_, ResidentOrWantedEvicted>>,
wanted_evicted: &AtomicBool,
) -> Option<Arc<DownloadedLayer>> {
if let Some(mut x) = guard {
if let Some(won) = x.get() {
// there are no guarantees that we will always get to observe a concurrent call
// to evict
if wanted_evicted.load(Ordering::Acquire) {
x.downgrade();
}
return Some(won);
}
}
None
}
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match tokio::fs::metadata(&self.path).await {
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
@@ -903,7 +881,7 @@ impl LayerInner {
fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
// in future, this should include sha2-256 validation of the file.
if !m.is_file() {
Err(NeedsDownload::NotFile(m.file_type()))
Err(NeedsDownload::NotFile)
} else if m.len() != self.desc.file_size {
Err(NeedsDownload::WrongSize {
actual: m.len(),
@@ -917,9 +895,7 @@ impl LayerInner {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
// this is not accurate: we could have the file locally but there was a cancellation
// and now we are not in sync, or we are currently downloading it.
let remote = self.inner.get().is_none();
let remote = self.get().is_none();
let access_stats = self.access_stats.as_api_model(reset);
@@ -948,7 +924,7 @@ impl LayerInner {
}
/// `DownloadedLayer` is being dropped, so it calls this method.
fn on_downloaded_layer_drop(self: Arc<LayerInner>, version: usize) {
fn on_downloaded_layer_drop(self: Arc<LayerInner>) {
let gc = self.wanted_garbage_collected.load(Ordering::Acquire);
let evict = self.wanted_evicted.load(Ordering::Acquire);
let can_evict = self.have_remote_client;
@@ -956,16 +932,15 @@ impl LayerInner {
if gc {
// do nothing now, only in LayerInner::drop
} else if can_evict && evict {
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self, %version);
let version = self.version.load(Ordering::Relaxed);
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self);
// downgrade for queueing, in case there's a tear down already ongoing we should not
// hold it alive.
let this = Arc::downgrade(&self);
drop(self);
// NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
// drop while the `self.inner` is being locked, leading to a deadlock.
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
let _g = span.entered();
@@ -975,15 +950,19 @@ impl LayerInner {
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
return;
};
match this.evict_blocking(version) {
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
}
this.evict_blocking(version);
});
}
}
fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> {
fn evict_blocking(&self, version: usize) {
match self.evict_blocking0(version) {
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
}
}
fn evict_blocking0(&self, version: usize) -> Result<(), EvictionCancelled> {
// deleted or detached timeline, don't do anything.
let Some(timeline) = self.timeline.upgrade() else {
return Err(EvictionCancelled::TimelineGone);
@@ -994,34 +973,32 @@ impl LayerInner {
let _permit = {
let maybe_downloaded = self.inner.get();
let (_weak, permit) = match maybe_downloaded {
Some(mut guard) => {
if let ResidentOrWantedEvicted::WantedEvicted(_weak, version) = &*guard {
if *version == only_version {
guard.take_and_deinit()
} else {
// this was not for us; maybe there's another eviction job
// TODO: does it make any sense to stall here? unique versions do not
// matter, we only want to make sure not to evict a resident, which we
// are not doing.
return Err(EvictionCancelled::VersionCheckFailed);
}
} else {
return Err(EvictionCancelled::AlreadyReinitialized);
}
if version != self.version.load(Ordering::Relaxed) {
// downloadness-state has advanced, we might no longer be the latest eviction
// work; don't do anything.
//
// this is possible to get to by having:
//
// 1. wanted_evicted.store(true)
// 2. ResidentOrWantedEvicted::downgrade
// 3. DownloadedLayer::drop
// 4. LayerInner::get_or_maybe_download
// 5. LayerInner::evict_blocking
return Err(EvictionCancelled::VersionCheckFailed);
}
// free the DownloadedLayer allocation
match maybe_downloaded.map(|mut g| g.take_and_deinit()) {
Some((taken, permit)) => {
assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_)));
permit
}
None => {
// already deinitialized, perhaps get_or_maybe_download did this and is
// currently waiting to reinitialize it
return Err(EvictionCancelled::LostToDownload);
unreachable!("we do the version checking for this exact reason")
}
};
permit
}
};
// now accesses to inner.get_or_init wait on the semaphore or the `_permit`
self.access_stats.record_residence_event(
LayerResidenceStatus::Evicted,
LayerResidenceEventReason::ResidenceChange,
@@ -1054,14 +1031,11 @@ impl LayerInner {
Ok(())
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::error!(
layer_size = %self.desc.file_size,
"failed to evict layer from disk, it was already gone (metrics will be inaccurate)"
);
tracing::info!("failed to evict file from disk, it was already gone");
Err(EvictionCancelled::FileNotFound)
}
Err(e) => {
tracing::error!("failed to evict file from disk: {e:#}");
tracing::warn!("failed to evict file from disk: {e:#}");
Err(EvictionCancelled::RemoveFailed)
}
};
@@ -1105,8 +1079,6 @@ enum DownloadError {
ContextAndConfigReallyDeniesDownloads,
#[error("downloading is really required but not allowed by this method")]
DownloadRequired,
#[error("layer path exists, but it is not a file: {0:?}")]
NotFile(std::fs::FileType),
/// Why no error here? Because it will be reported by page_service. We should had also done
/// retries already.
#[error("downloading evicted layer file failed")]
@@ -1122,7 +1094,7 @@ enum DownloadError {
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,
NotFile(std::fs::FileType),
NotFile,
WrongSize { actual: u64, expected: u64 },
}
@@ -1130,7 +1102,7 @@ impl std::fmt::Display for NeedsDownload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NeedsDownload::NotFound => write!(f, "file was not found"),
NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
NeedsDownload::NotFile => write!(f, "path is not a file"),
NeedsDownload::WrongSize { actual, expected } => {
write!(f, "file size mismatch {actual} vs. {expected}")
}
@@ -1141,10 +1113,7 @@ impl std::fmt::Display for NeedsDownload {
/// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it.
pub(crate) struct DownloadedLayer {
owner: Weak<LayerInner>,
// Use tokio OnceCell as we do not need to deinitialize this, it'll just get dropped with the
// DownloadedLayer
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
version: usize,
}
impl std::fmt::Debug for DownloadedLayer {
@@ -1152,7 +1121,6 @@ impl std::fmt::Debug for DownloadedLayer {
f.debug_struct("DownloadedLayer")
// owner omitted because it is always "Weak"
.field("kind", &self.kind)
.field("version", &self.version)
.finish()
}
}
@@ -1160,7 +1128,7 @@ impl std::fmt::Debug for DownloadedLayer {
impl Drop for DownloadedLayer {
fn drop(&mut self) {
if let Some(owner) = self.owner.upgrade() {
owner.on_downloaded_layer_drop(self.version);
owner.on_downloaded_layer_drop();
} else {
// no need to do anything, we are shutting down
}
@@ -1186,6 +1154,7 @@ impl DownloadedLayer {
"these are the same, just avoiding the upgrade"
);
// there is nothing async here, but it should be async
let res = if owner.desc.is_delta {
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_id,
@@ -1284,8 +1253,6 @@ impl std::fmt::Debug for ResidentLayer {
impl ResidentLayer {
/// Release the eviction guard, converting back into a plain [`Layer`].
///
/// You can access the [`Layer`] also by using `as_ref`.
pub(crate) fn drop_eviction_guard(self) -> Layer {
self.into()
}
@@ -1341,7 +1308,7 @@ impl AsRef<Layer> for ResidentLayer {
}
}
/// Drop the eviction guard.
/// Allow slimming down if we don't want the `2*usize` with eviction candidates?
impl From<ResidentLayer> for Layer {
fn from(value: ResidentLayer) -> Self {
value.owner
@@ -1511,13 +1478,6 @@ impl LayerImplMetrics {
.unwrap()
.inc();
}
fn inc_broadcast_lagged(&self) {
self.rare_counters
.get_metric_with_label_values(&["broadcast_lagged"])
.unwrap()
.inc();
}
}
enum EvictionCancelled {
@@ -1526,11 +1486,6 @@ enum EvictionCancelled {
VersionCheckFailed,
FileNotFound,
RemoveFailed,
AlreadyReinitialized,
/// Not evicted because of a pending reinitialization
LostToDownload,
/// After eviction, there was a new layer access which cancelled the eviction.
UpgradedBackOnAccess,
}
impl EvictionCancelled {
@@ -1541,9 +1496,6 @@ impl EvictionCancelled {
EvictionCancelled::VersionCheckFailed => "version_check_fail",
EvictionCancelled::FileNotFound => "file_not_found",
EvictionCancelled::RemoveFailed => "remove_failed",
EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
EvictionCancelled::LostToDownload => "lost_to_download",
EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
}
}
}

View File

@@ -12,7 +12,7 @@ use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{backoff, completion};
use utils::completion;
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
@@ -139,10 +139,7 @@ pub fn start_background_loops(
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
let wait_duration = Duration::from_secs(2);
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
@@ -162,11 +159,8 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
if first && random_init_delay(period, &cancel).await.is_err() {
break;
}
let started_at = Instant::now();
@@ -179,24 +173,23 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
} else {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
error!(
"Compaction failed {error_run_count} times, retrying in {:?}: {e:?}",
wait_duration
);
Duration::from_secs_f64(wait_duration)
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {
error_run_count = 0;
period
}
};
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
if !first {
// The first iteration is typically much slower, because all tenants compete for the
// compaction sempahore to run, and because of concurrent startup work like initializing
// logical sizes. To avoid routinely spamming warnings, we suppress this log on first iteration.
warn_when_period_overrun(
started_at.elapsed(),
period,
BackgroundLoopKind::Compaction,
);
}
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
@@ -205,6 +198,8 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
{
break;
}
first = false;
}
}
.await;
@@ -215,10 +210,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
let wait_duration = Duration::from_secs(2);
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
// GC might require downloading, to find the cutoff LSN that corresponds to the
@@ -239,11 +231,8 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let period = tenant.get_gc_period();
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
if first && random_init_delay(period, &cancel).await.is_err() {
break;
}
let started_at = Instant::now();
@@ -260,24 +249,19 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
.await;
if let Err(e) = res {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
error!(
"Gc failed {error_run_count} times, retrying in {:?}: {e:?}",
wait_duration
);
Duration::from_secs_f64(wait_duration)
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {
error_run_count = 0;
period
}
};
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
if !first {
// The first iteration is typically much slower, because all tenants compete for the
// compaction sempahore to run, and because of concurrent startup work like initializing
// logical sizes. To avoid routinely spamming warnings, we suppress this log on first iteration.
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
}
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
@@ -286,6 +270,8 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
{
break;
}
first = false;
}
}
.await;
@@ -361,7 +347,7 @@ pub(crate) fn warn_when_period_overrun(
// humantime does no significant digits clamping whereas Duration's debug is a bit more
// intelligent. however it makes sense to keep the "configuration format" for period, even
// though there's no way to output the actual config value.
info!(
warn!(
?elapsed,
period = %humantime::format_duration(period),
?task,

View File

@@ -23,7 +23,7 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{id::TenantTimelineId, sync::gate::Gate};
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::{BinaryHeap, HashMap, HashSet};
@@ -88,8 +88,9 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -310,13 +311,6 @@ pub struct Timeline {
/// Load or creation time information about the disk_consistent_lsn and when the loading
/// happened. Used for consumption metrics.
pub(crate) loaded_at: (Lsn, SystemTime),
/// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
pub(crate) gate: Gate,
/// Cancellation token scoped to this timeline: anything doing long-running work relating
/// to the timeline should drop out when this token fires.
pub(crate) cancel: CancellationToken,
}
pub struct WalReceiverInfo {
@@ -793,11 +787,7 @@ impl Timeline {
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
//
// Suppress error when it's due to cancellation
if !self.cancel.is_cancelled() {
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
@@ -895,12 +885,7 @@ impl Timeline {
pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
debug_assert_current_span_has_tenant_and_timeline_id();
// Signal any subscribers to our cancellation token to drop out
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// prevent writes to the InMemoryLayer
tracing::debug!("Waiting for WalReceiverManager...");
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.tenant_id),
@@ -936,16 +921,6 @@ impl Timeline {
warn!("failed to await for frozen and flushed uploads: {e:#}");
}
}
// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
// while doing so.
self.last_record_lsn.shutdown();
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await;
// Finally wait until any gate-holders are complete
self.gate.close().await;
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -1074,11 +1049,6 @@ impl Timeline {
/// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let _gate = self
.gate
.enter()
.map_err(|_| anyhow::anyhow!("Shutting down"))?;
let Some(local_layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
@@ -1094,8 +1064,9 @@ impl Timeline {
.as_ref()
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
let cancel = CancellationToken::new();
let results = self
.evict_layer_batch(remote_client, &[local_layer])
.evict_layer_batch(remote_client, &[local_layer], &cancel)
.await?;
assert_eq!(results.len(), 1);
let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
@@ -1110,18 +1081,15 @@ impl Timeline {
pub(crate) async fn evict_layers(
&self,
layers_to_evict: &[Layer],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
let _gate = self
.gate
.enter()
.map_err(|_| anyhow::anyhow!("Shutting down"))?;
let remote_client = self
.remote_client
.as_ref()
.context("timeline must have RemoteTimelineClient")?;
self.evict_layer_batch(remote_client, layers_to_evict).await
self.evict_layer_batch(remote_client, layers_to_evict, cancel)
.await
}
/// Evict multiple layers at once, continuing through errors.
@@ -1142,6 +1110,7 @@ impl Timeline {
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Layer],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
// ensure that the layers have finished uploading
// (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
@@ -1189,7 +1158,7 @@ impl Timeline {
};
tokio::select! {
_ = self.cancel.cancelled() => {},
_ = cancel.cancelled() => {},
_ = join => {}
}
@@ -1299,7 +1268,6 @@ impl Timeline {
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
state: TimelineState,
cancel: CancellationToken,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(state);
@@ -1400,8 +1368,6 @@ impl Timeline {
initial_logical_size_can_start,
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
cancel,
gate: Gate::new(format!("Timeline<{tenant_id}/{timeline_id}>")),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -1894,17 +1860,19 @@ impl Timeline {
"aborted because task_mgr shutdown requested".to_string()
};
tokio::select! {
res = &mut calculation => { res }
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
calculation.await
}
reason = taskmgr_shutdown_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
calculation.await
loop {
tokio::select! {
res = &mut calculation => { return res }
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
reason = taskmgr_shutdown_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
}
}
}
@@ -2002,6 +1970,55 @@ impl Timeline {
None
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
/// indicating which layers are currently on-disk on the primary.
///
/// None is returned if the Timeline is in a state where uploading a heatmap
/// doesn't make sense, such as shutting down or initializing. The caller
/// should treat this as a cue to simply skip doing any heatmap uploading
/// for this timeline.
pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
let remote_client = match &self.remote_client {
Some(c) => c,
None => return None,
};
let layer_file_names = eviction_info
.resident_layers
.iter()
.map(|l| l.layer.layer_desc().filename())
.collect::<Vec<_>>();
let decorated = match remote_client.get_layers_metadata(layer_file_names) {
Ok(d) => d,
Err(_) => {
// Getting metadata only fails on Timeline in bad state.
return None;
}
};
let heatmap_layers = std::iter::zip(
eviction_info.resident_layers.into_iter(),
decorated.into_iter(),
)
.filter_map(|(layer, remote_info)| {
remote_info.map(|remote_info| {
HeatMapLayer::new(
layer.layer.layer_desc().filename(),
IndexLayerMetadata::from(remote_info),
layer.last_activity_ts,
)
})
});
Some(HeatMapTimeline::new(
self.timeline_id,
heatmap_layers.collect(),
))
}
}
type TraversalId = String;
@@ -2065,10 +2082,6 @@ impl Timeline {
let mut cont_lsn = Lsn(request_lsn.0 + 1);
'outer: loop {
if self.cancel.is_cancelled() {
return Err(PageReconstructError::Cancelled);
}
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
@@ -2975,10 +2988,13 @@ struct CompactLevel0Phase1StatsBuilder {
new_deltas_size: Option<u64>,
}
#[serde_as]
#[derive(serde::Serialize)]
struct CompactLevel0Phase1Stats {
version: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "serde_with::DisplayFromStr")]
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
@@ -4405,10 +4421,25 @@ mod tests {
.expect("should had been resident")
.drop_eviction_guard();
let cancel = tokio_util::sync::CancellationToken::new();
let batch = [layer];
let first = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() };
let second = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() };
let first = {
let cancel = cancel.child_token();
async {
let cancel = cancel;
timeline
.evict_layer_batch(&rc, &batch, &cancel)
.await
.unwrap()
}
};
let second = async {
timeline
.evict_layer_batch(&rc, &batch, &cancel)
.await
.unwrap()
};
let (first, second) = tokio::join!(first, second);

View File

@@ -17,7 +17,6 @@ use crate::{
deletion_queue::DeletionQueueClient,
task_mgr::{self, TaskKind},
tenant::{
debug_assert_current_span_has_tenant_and_timeline_id,
metadata::TimelineMetadata,
remote_timeline_client::{
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
@@ -31,11 +30,6 @@ use super::{Timeline, TimelineResources};
/// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
debug_assert_current_span_has_tenant_and_timeline_id();
// Notify any timeline work to drop out of loops/requests
tracing::debug!("Cancelling CancellationToken");
timeline.cancel.cancel();
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
let maybe_started_walreceiver = { timeline.walreceiver.lock().unwrap().take() };
@@ -44,14 +38,6 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
}
debug!("wal receiver shutdown confirmed");
// Shut down the layer flush task before the remote client, as one depends on the other
task_mgr::shutdown_tasks(
Some(TaskKind::LayerFlushTask),
Some(timeline.tenant_id),
Some(timeline.timeline_id),
)
.await;
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
@@ -80,11 +66,6 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
"failpoint: timeline-delete-before-index-deleted-at"
))?
});
tracing::debug!("Waiting for gate...");
timeline.gate.close().await;
tracing::debug!("Shutdown complete");
Ok(())
}
@@ -313,7 +294,6 @@ async fn cleanup_remaining_timeline_fs_traces(
// Remove delete mark
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
.await
.or_else(fs_ext::ignore_not_found)
.context("remove delete mark")
}

View File

@@ -277,7 +277,10 @@ impl Timeline {
Some(c) => c,
};
let results = match self.evict_layer_batch(remote_client, &candidates).await {
let results = match self
.evict_layer_batch(remote_client, &candidates, cancel)
.await
{
Err(pre_err) => {
stats.errors += candidates.len();
error!("could not do any evictions: {pre_err:#}");

View File

@@ -426,7 +426,7 @@ impl ConnectionManagerState {
timeline,
new_sk.wal_source_connconf,
events_sender,
cancellation.clone(),
cancellation,
connect_timeout,
ctx,
node_id,
@@ -447,14 +447,7 @@ impl ConnectionManagerState {
}
WalReceiverError::Other(e) => {
// give out an error to have task_mgr give it a really verbose logging
if cancellation.is_cancelled() {
// Ideally we would learn about this via some path other than Other, but
// that requires refactoring all the intermediate layers of ingest code
// that only emit anyhow::Error
Ok(())
} else {
Err(e).context("walreceiver connection handling failure")
}
Err(e).context("walreceiver connection handling failure")
}
}
}

View File

@@ -122,7 +122,7 @@ pub(super) async fn handle_walreceiver_connection(
// Connect to the database in replication mode.
info!("connecting to {wal_source_connconf:?}");
let (replication_client, connection) = {
let (mut replication_client, connection) = {
let mut config = wal_source_connconf.to_tokio_postgres_config();
config.application_name("pageserver");
config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
@@ -205,7 +205,7 @@ pub(super) async fn handle_walreceiver_connection(
gauge.dec();
}
let identify = identify_system(&replication_client).await?;
let identify = identify_system(&mut replication_client).await?;
info!("{identify:?}");
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
@@ -444,7 +444,7 @@ struct IdentifySystem {
struct IdentifyError;
/// Run the postgres `IDENTIFY_SYSTEM` command
async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem> {
let query_str = "IDENTIFY_SYSTEM";
let response = client.simple_query(query_str).await?;
@@ -459,7 +459,7 @@ async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
// extract the row contents into an IdentifySystem struct.
// written as a closure so I can use ? for Option here.
if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
Ok(IdentifySystem {
systemid: get_parse(first_row, 0)?,
timeline: get_parse(first_row, 1)?,

View File

@@ -80,14 +80,6 @@ pub(crate) struct UploadQueueInitialized {
/// tasks to finish. For example, metadata upload cannot be performed before all
/// preceding layer file uploads have completed.
pub(crate) queued_operations: VecDeque<UploadOp>,
/// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
/// for error logging.
///
/// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
/// bug causing leaks, then it's better to not leave this enabled for production builds.
#[cfg(feature = "testing")]
pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
}
impl UploadQueueInitialized {
@@ -144,8 +136,6 @@ impl UploadQueue {
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
};
*self = UploadQueue::Initialized(state);
@@ -191,8 +181,6 @@ impl UploadQueue {
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
};
*self = UploadQueue::Initialized(state);

Some files were not shown because too many files have changed in this diff Show More