Compare commits

..

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
feead26e04 incorporate some github comments in the doc 2025-02-28 13:09:22 +02:00
Heikki Linnakangas
87a9afbc64 RFC: Rewrite Postgres <-> Pageserver communication
This is not ready, I'm still collecting an organizing my own
thoughts. I will update when it's ready for review.

That said, feel free to leave comments already if you wish.
2025-02-13 02:39:39 +02:00
67 changed files with 448 additions and 1524 deletions

View File

@@ -263,9 +263,8 @@ jobs:
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
benchmarks:
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `deploy` in PRs
if: github.ref_name == 'main' || (contains(github.event.pull_request.labels.*.name, 'run-benchmarks') && !failure() && !cancelled())
needs: [ check-permissions, build-build-tools-image, get-benchmarks-durations, deploy ]
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write

View File

@@ -88,7 +88,7 @@ jobs:
BUILD_AND_TEST_RUN_ID=${TAG}
while true; do
gh run --repo ${GITHUB_REPOSITORY} view ${BUILD_AND_TEST_RUN_ID} --json jobs --jq '[.jobs[] | select((.name | startswith("push-neon-image-dev")) or (.name | startswith("push-compute-image-dev"))) | {"name": .name, "conclusion": .conclusion, "url": .url}]' > jobs.json
if [ $(jq '[.[] | select(.conclusion == "success")] | length' jobs.json) -eq 2 ]; then
if [ $(jq '[.[] | select(.conclusion == "success")]' jobs.json) -eq 2 ]; then
break
fi
jq -c '.[]' jobs.json | while read -r job; do

81
Cargo.lock generated
View File

@@ -1293,7 +1293,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"jsonwebtoken",
"regex",
"remote_storage",
"serde",
@@ -1321,7 +1320,6 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"notify",
@@ -2398,9 +2396,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.3"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
@@ -2503,27 +2501,6 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "governor"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
dependencies = [
"cfg-if",
"dashmap 6.1.0",
"futures-sink",
"futures-timer",
"futures-util",
"no-std-compat",
"nonzero_ext",
"parking_lot 0.12.1",
"portable-atomic",
"quanta",
"rand 0.8.5",
"smallvec",
"spinning_top",
]
[[package]]
name = "group"
version = "0.12.1"
@@ -3723,12 +3700,6 @@ dependencies = [
"memoffset 0.9.0",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.3"
@@ -3739,12 +3710,6 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "notify"
version = "8.0.0"
@@ -4603,12 +4568,6 @@ dependencies = [
"never-say-never",
]
[[package]]
name = "portable-atomic"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "postgres"
version = "0.19.7"
@@ -5075,21 +5034,6 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "quanta"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
@@ -5220,15 +5164,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "raw-cpuid"
version = "11.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
dependencies = [
"bitflags 2.8.0",
]
[[package]]
name = "rayon"
version = "1.7.0"
@@ -6437,15 +6372,6 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spinning_top"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.6.0"
@@ -6521,7 +6447,6 @@ dependencies = [
"diesel_migrations",
"fail",
"futures",
"governor",
"hex",
"http-utils",
"humantime",
@@ -6539,8 +6464,6 @@ dependencies = [
"routerify",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"safekeeper_api",
"safekeeper_client",
"scoped-futures",
"scopeguard",
"serde",

View File

@@ -50,14 +50,6 @@ RUN set -e \
&& rm -rf pg_install/build \
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
# Prepare cargo-chef recipe
FROM $REPOSITORY/$IMAGE:$TAG AS plan
WORKDIR /home/nonroot
COPY --chown=nonroot . .
RUN cargo chef prepare --recipe-path recipe.json
# Build neon binaries
FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
@@ -71,15 +63,9 @@ COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_i
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
COPY --from=plan /home/nonroot/recipe.json recipe.json
ARG ADDITIONAL_RUSTFLAGS=""
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \

View File

@@ -300,7 +300,6 @@ ARG CARGO_HAKARI_VERSION=0.9.33
ARG CARGO_DENY_VERSION=0.16.2
ARG CARGO_HACK_VERSION=0.6.33
ARG CARGO_NEXTEST_VERSION=0.9.85
ARG CARGO_CHEF_VERSION=0.1.71
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
@@ -315,7 +314,6 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \

View File

@@ -1750,7 +1750,7 @@ COPY --from=pg_graphql-src /ext-src/ /ext-src/
COPY --from=hypopg-src /ext-src/ /ext-src/
COPY --from=pg_hashids-src /ext-src/ /ext-src/
COPY --from=rum-src /ext-src/ /ext-src/
COPY --from=pgtap-src /ext-src/ /ext-src/
#COPY --from=pgtap-src /ext-src/ /ext-src/
COPY --from=ip4r-src /ext-src/ /ext-src/
COPY --from=prefix-src /ext-src/ /ext-src/
COPY --from=hll-src /ext-src/ /ext-src/

View File

@@ -24,7 +24,6 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true
notify.workspace = true

View File

@@ -55,7 +55,7 @@ use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
use url::Url;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{
@@ -281,7 +281,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
info!("got spec from cli argument {}", spec_json);
return Ok(CliSpecParams {
spec: Some(serde_json::from_str(spec_json)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: false,
});
}
@@ -291,7 +290,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
let file = File::open(Path::new(spec_path))?;
return Ok(CliSpecParams {
spec: Some(serde_json::from_reader(file)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: true,
});
}
@@ -301,9 +299,8 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
};
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
Ok(resp) => Ok(CliSpecParams {
spec: resp.0,
compute_ctl_config: resp.1,
Ok(spec) => Ok(CliSpecParams {
spec,
live_config_allowed: true,
}),
Err(e) => {
@@ -320,8 +317,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
struct CliSpecParams {
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
spec: Option<ComputeSpec>,
#[allow(dead_code)]
compute_ctl_config: ComputeCtlConfig,
live_config_allowed: bool,
}
@@ -331,7 +326,6 @@ fn wait_spec(
CliSpecParams {
spec,
live_config_allowed,
compute_ctl_config: _,
}: CliSpecParams,
) -> Result<Arc<ComputeNode>> {
let mut new_state = ComputeState::new();

View File

@@ -11,9 +11,7 @@ use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::responses::{
ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
};
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
use compute_api::spec::ComputeSpec;
// Do control plane request and return response if any. In case of error it
@@ -75,13 +73,14 @@ fn do_control_plane_request(
pub fn get_spec_from_control_plane(
base_uri: &str,
compute_id: &str,
) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
) -> Result<Option<ComputeSpec>> {
let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
Ok(v) => v,
Err(_) => "".to_string(),
};
let mut attempt = 1;
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
info!("getting spec from control plane: {}", cp_uri);
@@ -91,7 +90,7 @@ pub fn get_spec_from_control_plane(
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
while attempt < 4 {
let result = match do_control_plane_request(&cp_uri, &jwt) {
spec = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => {
CPLANE_REQUESTS_TOTAL
.with_label_values(&[
@@ -100,10 +99,10 @@ pub fn get_spec_from_control_plane(
])
.inc();
match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
ControlPlaneComputeStatus::Empty => Ok(None),
ControlPlaneComputeStatus::Attached => {
if let Some(spec) = spec_resp.spec {
Ok((Some(spec), spec_resp.compute_ctl_config))
Ok(Some(spec))
} else {
bail!("compute is attached, but spec is empty")
}
@@ -122,10 +121,10 @@ pub fn get_spec_from_control_plane(
}
};
if let Err(e) = &result {
if let Err(e) = &spec {
error!("attempt {} to get spec failed with: {}", attempt, e);
} else {
return result;
return spec;
}
attempt += 1;
@@ -133,9 +132,7 @@ pub fn get_spec_from_control_plane(
}
// All attempts failed, return error.
Err(anyhow::anyhow!(
"Exhausted all attempts to retrieve the spec from the control plane"
))
spec
}
/// Check `pg_hba.conf` and update if needed to allow external connections.

View File

@@ -48,8 +48,6 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::Database;
use compute_api::spec::PgIdent;
use compute_api::spec::RemoteExtSpec;
@@ -882,13 +880,10 @@ impl Endpoint {
self.external_http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.body(
serde_json::to_string(&ConfigurationRequest {
spec,
compute_ctl_config: ComputeCtlConfig::default(),
})
.unwrap(),
)
.body(format!(
"{{\"spec\":{}}}",
serde_json::to_string_pretty(&spec)?
))
.send()
.await?;

View File

@@ -838,10 +838,7 @@ impl StorageController {
self.dispatch(
Method::PUT,
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
Some(TenantShardMigrateRequest {
node_id,
migration_config: None,
}),
Some(TenantShardMigrateRequest { node_id }),
)
.await
}

View File

@@ -609,10 +609,7 @@ async fn main() -> anyhow::Result<()> {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest {
node_id: node,
migration_config: None,
};
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
@@ -626,10 +623,7 @@ async fn main() -> anyhow::Result<()> {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest {
node_id: node,
migration_config: None,
};
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
@@ -1088,10 +1082,7 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
Some(TenantShardMigrateRequest {
node_id: mv.to,
migration_config: None,
}),
Some(TenantShardMigrateRequest { node_id: mv.to }),
)
.await
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))

View File

@@ -71,7 +71,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
# We are running tests now
rm -f testout.txt testout_contrib.txt
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0

View File

@@ -1,15 +0,0 @@
diff --git a/test/schedule/create.sql b/test/schedule/create.sql
index ba355ed..7e250f5 100644
--- a/test/schedule/create.sql
+++ b/test/schedule/create.sql
@@ -1,3 +1,2 @@
\unset ECHO
\i test/psql.sql
-CREATE EXTENSION pgtap;
diff --git a/test/schedule/main.sch b/test/schedule/main.sch
index a8a5fbc..0463fc4 100644
--- a/test/schedule/main.sch
+++ b/test/schedule/main.sch
@@ -1,2 +1 @@
-test: build
test: create

View File

@@ -1,6 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=86 --schedule test/schedule/main.sch --schedule test/build/run.sch --dbname contrib_regression --use-existing

View File

@@ -41,8 +41,7 @@ EXTENSIONS='[
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"},
{"extname": "pgtap", "extdir": "pgtap-src"}
{"extname": "pgjwt", "extdir": "pgjwt-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d

View File

@@ -0,0 +1,203 @@
# Compute <-> Pageserver Communicator Rewrite
## Summary
## Motivation
- prefetching logic is complicated
- handling async libpq connections in C code are difficult and error-prone
- only few people are comfortable working on the code
- new AIO (maybe) coming up in Postgres v18
- cannot process prefetch replies until another I/O function is called. Makes it impossible to accurately measure when a reply was received
- every backend opens a separate connection to pageservers -> lots of connections, first query in backend is slow
- desire for better protocol, not libpq-based
- By writing the "communicator" as a separate rust module, it can be reused in
tests, outside PostgreSQL.
## Non Goals (if relevant)
- We will keep LFC unmodified for now. It might be a good idea to rewrite it
too, but it's out of scope here.
- We should consider a similar rewrite for the walproposer - safekeeper
communication, but it's out of scope for this RFC
## Impacted components (e.g. pageserver, safekeeper, console, etc)
- Most changes are to the neon extension in compute.
- Pageserver, to implement the new protocol.
## Proposed implementation
- we will use the new implementation with all PostgreSQL versions.
- we will have a feature flag to switch between old and new communicator. Once we're
comfortable with the new communicator, remove old code and protocol.
- What about relation size cache? Out of scope? Or move it to the communicator process,
and have smgrnblocks() requests always through communicator process?
### Communicator process
There is one communicator process in the Postgres server. It's a background
worker process. It handles all communication with the pageservers.
The communicator process is written in a mix of C and Rust. Mostly in Rust, but
some C code is needed to interface with the Postgres facilities. For example:
- logging
- error handling (in a very limited form, we don't want to ereport() on most errors)
- expose a shared memory area for IPC
- signal other processes
We will write unsafe rust or C glue code for those facilities, which allow us to
write the rest of the communicator in safe rust.
The Rust parts of the communicator process can use multiple threads and
tokio. The glue code is written taken that into account, making it safe.
pqrx is a rust crate for writing Postgres extensions in Rust. We will _not_ use
that. It's a fine crate, good for most extensions, but I don't think we need
most of the facilities that it provides. Our wrappers are more low-level than
what most extensions need. We don't expose SQL functions or types from this
extension for example.
### Communicator <-> backend interface
The backends and the communicator process communicate via shared memory. Each
backend has a fixed number of "request slots", forming a ring. When a backend
wants to perform an I/O, it writes the request details like blk # and LSN, to
the next available slot. The request also includes a pointer or buffer ID where
the resulting page should be written. The backend then wakes up the
communicator, with a signal/futex/latch or something, telling the communicator
that it has work to do.
The communicator picks up the request from the backend's ring, and performs
it. It writes result page to the address requested by the backend (most likely a
shared buffer that the backend is holding a lock on), marks the request as
completed, and wakes up the backend.
In this design, because each backend has its own small ring, a backend doesn't
need to do any locking to manipulate the request slots. Similarly, after a
request has been submitted, the communicator has temporary ownership of the
request slot, and doesn't need to do locking on it.
This design is somewhat similar to how the upcoming AIO patches in PostgreSQL
will work. That should make it easy to adapt to new PostgreSQL versions.
In the above example, I assumed a GetPage request, but everything applies to
other other request types like "smgrnblocks" too.
Q: How we are going to handle backend termination while it is waiting for response? Should we allow it or wait request completion? If PS is down, it can take quite long time during which user will not be able to interrupt the query.
### Prefetching
A backend can also issue a "blind" prefetch request. When a communicator
processes a blind prefetch request, it starts the GetPage request and writes the
result to a local buffer within the communicator process. But it could also
decide to do nothing, or to schedule the request with a lower priority. It
doesn't give any result back to the requesting backend, hence it's "blind".
Later, when the backend - or a different backend - requests the page that was
prefetched and the prefetch was performed and completed, the communicator
process can satisfy the request quickly from the private buffer.
In this design, the "prefetch" slots are shared by all backends. If one backend
issues a prefetch request but never consumes it, but another backend reads the
same page, the prefetch can be used to satisfy the request. (In our current
implementaiton, it would be wasted, and the same GetPage is performed
twice. https://github.com/neondatabase/neon/pull/10442 helps with that, but
doesn't fully fix the problem)
### PostgreSQL versions < 17
- In 16 and below, prefetching calls are made without holding the buffer pinned.
Backends will perform "blind" prefetch requests for smgrprefetch().
### PostgreSQL version 17
In version 17, when prefetching is requested, the pages are already pinned in
the buffer manager. We possibly could write the page directly to the shared
buffer, but there's a risk that the backend stops the scan and releases the pins
without ever performing the real I/Os. Because of that, backends will perform
blind prefetch requests like in v16; we can't easily take advantage of the
pinned buffer.
### PostgreSQL version 18, if the AIO patches are committed
With the AIO patches, prefetching is no longer performed with posix_fadvise
calls. The backends will start the prefetch I/Os "for real", into the locked
shared buffer. On completion of an AIO, the process that processes the
completion will have to call a small callback routine that releases the buffer
lock and wakes up any processes waiting on the I/O. It'll require some care to
execute that safely from the communicator process.
### Compute <-> Pageserver Protocol
As part of the project, we will change the protocol. Desires for new protocol:
- Use protobuf or something else more standard. Maybe gRPC. So that we can use
standard tools like Wireshark to easily analyze the traffic.
- Batching. Have capability to request more than one page in one request.
In principle, changing the protocol is an independent change from the new
communicator process. But it makes sense to do at the same time:
- Switching to Rust in the communicator process makes it possible to use
existing libraries
- Using a library might help with managing the pool of pageserver connnection,
so we want need to implement that ourselves
### Reliability, failure modes and corner cases (if relevant)
### Interaction/Sequence diagram (if relevant)
### Scalability (if relevant)
- Could the single communicator process become a bottleneck? In the new v18 AIO
system, the process needs to execute all the I/O completion callbacks. They're
very short, but I still wonder if a single process can handle it.
- Goal is to sustain ~2.5GB/s bandwidth (typical EC2 NIC).
- John: I'd presume a single process to be capable of that if it's just passing buffers around. If we needed more than one in future it would probably be quite a small lift to make that happen (but I bet we never do). (I don't fully know what's involved in the execute all the I/O completion callbacks part though)
### Security implications (if relevant)
- We currently use libpq authentication with a JWT token. We can continue to use
the token for authentication in the new protocol.
### Unresolved questions (if relevant)
## Alternative implementation (if relevant)
I think UDP might also be a good fit for the protocol. No overhead of
establishing or holding a connection. No head-of-line blocking; prefetch
requests can be processed with lower priority. We would control our own
destiny. But it has its own set of challenges: congestion control,
authentication & encryption.
## Pros/cons of proposed approaches (if relevant)
## Definition of Done (if relevant)
New communicator has replaced the old code, deployed in production, old protocol
support is removed.
Implentation phases:
- Implement new protocol in pageserver. In first prototype, maybe just
wrap/convert the existing message types into HTTP+protobuf, to keep it simple.
- Implement the C/Rust wrappers needed to launch the communicator as a background
worker process, with access to shard memory.
- Implement a simple request / response interface in shared memory between the
backends and the communicator.
- Implement a minimalistic communicator: hold one connection to
pageserver/shard. No prefetching. Process one request at a time
- Improve the communicator: multiple threads, multiple connections, prefetching

View File

@@ -7,7 +7,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
chrono.workspace = true
jsonwebtoken.workspace = true
serde.workspace = true
serde_json.workspace = true
regex.workspace = true

View File

@@ -1,20 +1,18 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use crate::{
privilege::Privilege,
responses::ComputeCtlConfig,
spec::{ComputeSpec, ExtVersion, PgIdent},
};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
/// Request of the /configure API
///
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Deserialize, Debug)]
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
pub compute_ctl_config: ComputeCtlConfig,
}
#[derive(Deserialize, Debug)]

View File

@@ -3,7 +3,6 @@
use std::fmt::Display;
use chrono::{DateTime, Utc};
use jsonwebtoken::jwk::JwkSet;
use serde::{Deserialize, Serialize, Serializer};
use crate::{
@@ -136,27 +135,13 @@ pub struct CatalogObjects {
pub databases: Vec<Database>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct ComputeCtlConfig {
pub jwks: JwkSet,
}
impl Default for ComputeCtlConfig {
fn default() -> Self {
Self {
jwks: JwkSet {
keys: Vec::default(),
},
}
}
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.
/// This is not actually a compute API response, so consider moving
/// to a different place.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub spec: Option<ComputeSpec>,
pub status: ControlPlaneComputeStatus,
pub compute_ctl_config: ComputeCtlConfig,
}
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]

View File

@@ -182,18 +182,6 @@ pub struct TenantDescribeResponseShard {
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateRequest {
pub node_id: NodeId,
#[serde(default)]
pub migration_config: Option<MigrationConfig>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MigrationConfig {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub secondary_warmup_timeout: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub secondary_download_request_timeout: Option<Duration>,
}
#[derive(Serialize, Clone, Debug)]

View File

@@ -1136,24 +1136,7 @@ pub struct TimelineInfo {
pub ancestor_lsn: Option<Lsn>,
pub last_record_lsn: Lsn,
pub prev_record_lsn: Option<Lsn>,
/// Legacy field for compat with control plane. Synonym of `min_readable_lsn`.
/// TODO: remove once control plane no longer reads it.
pub latest_gc_cutoff_lsn: Lsn,
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
/// as it is easier to reason about.
pub applied_gc_cutoff_lsn: Lsn,
/// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
/// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
/// LSN at which it is legal to create a branch or ephemeral endpoint.
///
/// Note that holders of valid LSN leases may be able to create branches and read pages earlier
/// than this LSN, but new leases may not be taken out earlier than this LSN.
pub min_readable_lsn: Lsn,
pub disk_consistent_lsn: Lsn,
/// The LSN that we have succesfully uploaded to remote storage

View File

@@ -42,8 +42,8 @@ use utils::lsn::Lsn;
pub enum BasebackupError {
#[error("basebackup pageserver error {0:#}")]
Server(#[from] anyhow::Error),
#[error("basebackup client error {0:#} when {1}")]
Client(#[source] io::Error, &'static str),
#[error("basebackup client error {0:#}")]
Client(#[source] io::Error),
}
/// Create basebackup with non-rel data in it.
@@ -234,7 +234,7 @@ where
self.ar
.append(&header, self.buf.as_slice())
.await
.map_err(|e| BasebackupError::Client(e, "flush"))?;
.map_err(BasebackupError::Client)?;
self.total_blocks += nblocks;
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
@@ -273,9 +273,9 @@ where
for dir in subdirs.iter() {
let header = new_tar_header_dir(dir)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball"))?;
.context("could not add directory to basebackup tarball")?;
}
// Send config files.
@@ -286,13 +286,13 @@ where
self.ar
.append(&header, data)
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
.context("could not add config file to basebackup tarball")?;
} else {
let header = new_tar_header(filepath, 0)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_config_file"))?;
.context("could not add config file to basebackup tarball")?;
}
}
if !lazy_slru_download {
@@ -406,7 +406,7 @@ where
self.ar
.append(&header, &*content)
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_aux_file"))?;
.context("could not add aux file to basebackup tarball")?;
}
if min_restart_lsn != Lsn::MAX {
@@ -419,7 +419,7 @@ where
self.ar
.append(&header, &data[..])
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,restart.lsn"))?;
.context("could not add restart.lsn file to basebackup tarball")?;
}
for xid in self
.timeline
@@ -451,9 +451,9 @@ where
let crc32 = crc32c::crc32c(&content);
content.extend_from_slice(&crc32.to_le_bytes());
let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?;
self.ar.append(&header, &*content).await.map_err(|e| {
BasebackupError::Client(e, "send_tarball,pg_logical/replorigin_checkpoint")
})?;
self.ar.append(&header, &*content).await.context(
"could not add pg_logical/replorigin_checkpoint file to basebackup tarball",
)?;
}
fail_point!("basebackup-before-control-file", |_| {
@@ -464,10 +464,7 @@ where
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
self.ar
.finish()
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,finish"))?;
self.ar.finish().await.map_err(BasebackupError::Client)?;
debug!("all tarred up!");
Ok(())
}
@@ -485,9 +482,9 @@ where
let file_name = dst.to_segfile_name(0);
let header = new_tar_header(&file_name, 0)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "add_rel,empty"))?;
.map_err(BasebackupError::Client)?;
return Ok(());
}
@@ -518,7 +515,7 @@ where
self.ar
.append(&header, segment_data.as_slice())
.await
.map_err(|e| BasebackupError::Client(e, "add_rel,segment"))?;
.map_err(BasebackupError::Client)?;
seg += 1;
startblk = endblk;
@@ -569,7 +566,7 @@ where
self.ar
.append(&header, pg_version_str.as_bytes())
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,PG_VERSION"))?;
.map_err(BasebackupError::Client)?;
info!("timeline.pg_version {}", self.timeline.pg_version);
@@ -579,7 +576,7 @@ where
self.ar
.append(&header, &img[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,global/pg_filenode.map"))?;
.map_err(BasebackupError::Client)?;
} else {
warn!("global/pg_filenode.map is missing");
}
@@ -615,9 +612,9 @@ where
let path = format!("base/{}", dbnode);
let header = new_tar_header_dir(&path)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
.map_err(BasebackupError::Client)?;
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
@@ -630,14 +627,14 @@ where
self.ar
.append(&header, pg_version_str.as_bytes())
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
.map_err(BasebackupError::Client)?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let header = new_tar_header(&relmap_path, img.len() as u64)?;
self.ar
.append(&header, &img[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/pg_filenode.map"))?;
.map_err(BasebackupError::Client)?;
}
};
Ok(())
@@ -666,7 +663,7 @@ where
self.ar
.append(&header, &buf[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_twophase_file"))?;
.map_err(BasebackupError::Client)?;
Ok(())
}
@@ -696,7 +693,7 @@ where
zenith_signal.as_bytes(),
)
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
.map_err(BasebackupError::Client)?;
let checkpoint_bytes = self
.timeline
@@ -721,7 +718,7 @@ where
self.ar
.append(&header, &pg_control_bytes[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,pg_control"))?;
.map_err(BasebackupError::Client)?;
//send wal segment
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
@@ -745,7 +742,7 @@ where
self.ar
.append(&header, &wal_seg[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,wal_segment"))?;
.map_err(BasebackupError::Client)?;
Ok(())
}
}

View File

@@ -1080,10 +1080,7 @@ components:
type: integer
state:
type: string
min_readable_lsn:
type: string
format: hex
applied_gc_cutoff_lsn:
latest_gc_cutoff_lsn:
type: string
format: hex

View File

@@ -482,11 +482,6 @@ async fn build_timeline_info_common(
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
let min_readable_lsn = std::cmp::max(
timeline.get_gc_cutoff_lsn(),
*timeline.get_applied_gc_cutoff_lsn(),
);
let info = TimelineInfo {
tenant_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
@@ -498,12 +493,7 @@ async fn build_timeline_info_common(
initdb_lsn,
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
// Externally, expose the lowest LSN that can be used to create a branch as the "GC cutoff", although internally
// we distinguish between the "planned" GC cutoff (PITR point) and the "latest" GC cutoff (where we
// actually trimmed data to), which can pass each other when PITR is changed.
latest_gc_cutoff_lsn: min_readable_lsn,
min_readable_lsn,
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
current_logical_size_is_accurate: match current_logical_size.accuracy() {
tenant::timeline::logical_size::Accuracy::Approximate => false,

View File

@@ -914,7 +914,7 @@ impl PageServerHandler {
&shard,
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
&shard.get_latest_gc_cutoff_lsn(),
ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
@@ -1810,7 +1810,7 @@ impl PageServerHandler {
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -1837,7 +1837,7 @@ impl PageServerHandler {
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -1864,7 +1864,7 @@ impl PageServerHandler {
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -1954,7 +1954,7 @@ impl PageServerHandler {
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -2050,8 +2050,7 @@ impl PageServerHandler {
{
fn map_basebackup_error(err: BasebackupError) -> QueryError {
match err {
// TODO: passthrough the error site to the final error message?
BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
BasebackupError::Server(e) => QueryError::Other(e),
}
}
@@ -2072,7 +2071,7 @@ impl PageServerHandler {
//return Err(QueryError::NotFound("timeline is archived".into()))
}
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
info!("waiting for {}", lsn);
@@ -2152,12 +2151,10 @@ impl PageServerHandler {
.await
.map_err(map_basebackup_error)?;
}
writer.flush().await.map_err(|e| {
map_basebackup_error(BasebackupError::Client(
e,
"handle_basebackup_request,flush",
))
})?;
writer
.flush()
.await
.map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
}
pgb.write_message_noflush(&BeMessage::CopyDone)

View File

@@ -611,7 +611,7 @@ impl Timeline {
) -> Result<LsnForTimestamp, PageReconstructError> {
pausable_failpoint!("find-lsn-for-timestamp-pausable");
let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let gc_cutoff_planned = {
let gc_info = self.gc_info.read().unwrap();
gc_info.min_cutoff()

View File

@@ -40,8 +40,6 @@ use remote_timeline_client::manifest::{
use remote_timeline_client::UploadQueueNotReadyError;
use remote_timeline_client::FAILED_REMOTE_OP_RETRIES;
use remote_timeline_client::FAILED_UPLOAD_WARN_THRESHOLD;
use secondary::heatmap::HeatMapTenant;
use secondary::heatmap::HeatMapTimeline;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
@@ -57,7 +55,6 @@ use timeline::offload::OffloadError;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::PreviousHeatmap;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -265,7 +262,6 @@ struct TimelinePreload {
timeline_id: TimelineId,
client: RemoteTimelineClient,
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
previous_heatmap: Option<PreviousHeatmap>,
}
pub(crate) struct TenantPreload {
@@ -1132,7 +1128,6 @@ impl Tenant {
resources: TimelineResources,
mut index_part: IndexPart,
metadata: TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
cause: LoadTimelineCause,
ctx: &RequestContext,
@@ -1163,7 +1158,6 @@ impl Tenant {
let timeline = self.create_timeline_struct(
timeline_id,
&metadata,
previous_heatmap,
ancestor.clone(),
resources,
CreateTimelineCause::Load,
@@ -1563,18 +1557,8 @@ impl Tenant {
}
}
// TODO(vlad): Could go to S3 if the secondary is freezing cold and hasn't even
// pulled the first heatmap. Not entirely necessary since the storage controller
// will kick the secondary in any case and cause a download.
let maybe_heatmap_at = self.read_on_disk_heatmap().await;
let timelines = self
.load_timelines_metadata(
remote_timeline_ids,
remote_storage,
maybe_heatmap_at,
cancel,
)
.load_timelines_metadata(remote_timeline_ids, remote_storage, cancel)
.await?;
Ok(TenantPreload {
@@ -1587,26 +1571,6 @@ impl Tenant {
})
}
async fn read_on_disk_heatmap(&self) -> Option<(HeatMapTenant, std::time::Instant)> {
let on_disk_heatmap_path = self.conf.tenant_heatmap_path(&self.tenant_shard_id);
match tokio::fs::read_to_string(on_disk_heatmap_path).await {
Ok(heatmap) => match serde_json::from_str::<HeatMapTenant>(&heatmap) {
Ok(heatmap) => Some((heatmap, std::time::Instant::now())),
Err(err) => {
error!("Failed to deserialize old heatmap: {err}");
None
}
},
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => None,
_ => {
error!("Unexpected IO error reading old heatmap: {err}");
None
}
},
}
}
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
@@ -1694,10 +1658,7 @@ impl Tenant {
match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => {
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
remote_index_and_client.insert(
timeline_id,
(index_part, preload.client, preload.previous_heatmap),
);
remote_index_and_client.insert(timeline_id, (index_part, preload.client));
}
MaybeDeletedIndexPart::Deleted(index_part) => {
info!(
@@ -1716,7 +1677,7 @@ impl Tenant {
// layer file.
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
for (timeline_id, remote_metadata) in sorted_timelines {
let (index_part, remote_client, previous_heatmap) = remote_index_and_client
let (index_part, remote_client) = remote_index_and_client
.remove(&timeline_id)
.expect("just put it in above");
@@ -1736,7 +1697,6 @@ impl Tenant {
timeline_id,
index_part,
remote_metadata,
previous_heatmap,
self.get_timeline_resources_for(remote_client),
LoadTimelineCause::Attach,
ctx,
@@ -1886,13 +1846,11 @@ impl Tenant {
}
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
#[allow(clippy::too_many_arguments)]
async fn load_remote_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
index_part: IndexPart,
remote_metadata: TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
resources: TimelineResources,
cause: LoadTimelineCause,
ctx: &RequestContext,
@@ -1922,7 +1880,6 @@ impl Tenant {
resources,
index_part,
remote_metadata,
previous_heatmap,
ancestor,
cause,
ctx,
@@ -1934,29 +1891,14 @@ impl Tenant {
self: &Arc<Tenant>,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
heatmap: Option<(HeatMapTenant, std::time::Instant)>,
cancel: CancellationToken,
) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
let mut timeline_heatmaps = heatmap.map(|h| (h.0.into_timelines_index(), h.1));
let mut part_downloads = JoinSet::new();
for timeline_id in timeline_ids {
let cancel_clone = cancel.clone();
let previous_timeline_heatmap = timeline_heatmaps.as_mut().and_then(|hs| {
hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active {
heatmap: h,
read_at: hs.1,
})
});
part_downloads.spawn(
self.load_timeline_metadata(
timeline_id,
remote_storage.clone(),
previous_timeline_heatmap,
cancel_clone,
)
.instrument(info_span!("download_index_part", %timeline_id)),
self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone)
.instrument(info_span!("download_index_part", %timeline_id)),
);
}
@@ -2004,7 +1946,6 @@ impl Tenant {
self: &Arc<Tenant>,
timeline_id: TimelineId,
remote_storage: GenericRemoteStorage,
previous_heatmap: Option<PreviousHeatmap>,
cancel: CancellationToken,
) -> impl Future<Output = TimelinePreload> {
let client = self.build_timeline_client(timeline_id, remote_storage);
@@ -2020,7 +1961,6 @@ impl Tenant {
client,
timeline_id,
index_part,
previous_heatmap,
}
}
}
@@ -2132,12 +2072,7 @@ impl Tenant {
})?;
let timeline_preload = self
.load_timeline_metadata(
timeline_id,
self.remote_storage.clone(),
None,
cancel.clone(),
)
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
.await;
let index_part = match timeline_preload.index_part {
@@ -2171,7 +2106,6 @@ impl Tenant {
timeline_id,
index_part,
remote_metadata,
None,
timeline_resources,
LoadTimelineCause::Unoffload,
&ctx,
@@ -2887,7 +2821,7 @@ impl Tenant {
};
let metadata = index_part.metadata.clone();
self
.load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{
.load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{
create_guard: timeline_create_guard, activate, }, &ctx)
.await?
.ready_to_activate()
@@ -4096,7 +4030,6 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
@@ -4120,7 +4053,6 @@ impl Tenant {
self.conf,
Arc::clone(&self.tenant_conf),
new_metadata,
previous_heatmap,
ancestor,
new_timeline_id,
self.tenant_shard_id,
@@ -4763,24 +4695,24 @@ impl Tenant {
// We check it against both the planned GC cutoff stored in 'gc_info',
// and the 'latest_gc_cutoff' of the last GC that was performed. The
// planned GC cutoff in 'gc_info' is normally larger than
// 'applied_gc_cutoff_lsn', but beware of corner cases like if you just
// 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
// changed the GC settings for the tenant to make the PITR window
// larger, but some of the data was already removed by an earlier GC
// iteration.
// check against last actual 'latest_gc_cutoff' first
let applied_gc_cutoff_lsn = src_timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
{
let gc_info = src_timeline.gc_info.read().unwrap();
let planned_cutoff = gc_info.min_cutoff();
if gc_info.lsn_covered_by_lease(start_lsn) {
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *applied_gc_cutoff_lsn);
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *latest_gc_cutoff_lsn);
} else {
src_timeline
.check_lsn_is_in_scope(start_lsn, &applied_gc_cutoff_lsn)
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {}",
*applied_gc_cutoff_lsn,
*latest_gc_cutoff_lsn,
))
.map_err(CreateTimelineError::AncestorLsn)?;
@@ -4819,7 +4751,7 @@ impl Tenant {
dst_prev,
Some(src_id),
start_lsn,
*src_timeline.applied_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
@@ -5192,7 +5124,6 @@ impl Tenant {
.create_timeline_struct(
new_timeline_id,
new_metadata,
None,
ancestor,
resources,
CreateTimelineCause::Load,
@@ -6199,8 +6130,8 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
let applied_gc_cutoff_lsn = tline.get_applied_gc_cutoff_lsn();
assert!(*applied_gc_cutoff_lsn > Lsn(0x25));
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
match tline.get(*TEST_KEY, Lsn(0x25)) {
Ok(_) => panic!("request for page should have failed"),
Err(err) => assert!(err.to_string().contains("not found at")),
@@ -8496,7 +8427,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -8604,7 +8535,7 @@ mod tests {
// increase GC horizon and compact again
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
@@ -8772,8 +8703,8 @@ mod tests {
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
info!(
"applied_gc_cutoff_lsn: {}",
*timeline.get_applied_gc_cutoff_lsn()
"latest_gc_cutoff_lsn: {}",
*timeline.get_latest_gc_cutoff_lsn()
);
timeline.force_set_disk_consistent_lsn(end_lsn);
@@ -8799,7 +8730,7 @@ mod tests {
// Make lease on a already GC-ed LSN.
// 0/80 does not have a valid lease + is below latest_gc_cutoff
assert!(Lsn(0x80) < *timeline.get_applied_gc_cutoff_lsn());
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
timeline
.init_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx)
.expect_err("lease request on GC-ed LSN should fail");
@@ -8990,7 +8921,7 @@ mod tests {
};
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -9077,7 +9008,7 @@ mod tests {
// increase GC horizon and compact again
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
@@ -9530,7 +9461,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -9677,7 +9608,7 @@ mod tests {
// increase GC horizon and compact again
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x38))
.wait()
@@ -9778,7 +9709,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -10029,7 +9960,7 @@ mod tests {
{
parent_tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x10))
.wait()
@@ -10049,7 +9980,7 @@ mod tests {
{
branch_tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x50))
.wait()
@@ -10405,7 +10336,7 @@ mod tests {
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -10790,7 +10721,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -11041,7 +10972,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()

View File

@@ -130,10 +130,7 @@ struct TimelineMetadataBodyV2 {
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<TimelineId>,
ancestor_lsn: Lsn,
// The LSN at which GC was last executed. Synonym of [`Timeline::applied_gc_cutoff_lsn`].
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
}

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, time::SystemTime};
use std::time::SystemTime;
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
@@ -8,7 +8,7 @@ use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
use utils::{generation::Generation, id::TimelineId};
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTenant {
pub(super) struct HeatMapTenant {
/// Generation of the attached location that uploaded the heatmap: this is not required
/// for correctness, but acts as a hint to secondary locations in order to detect thrashing
/// in the unlikely event that two attached locations are both uploading conflicting heatmaps.
@@ -25,17 +25,8 @@ pub(crate) struct HeatMapTenant {
pub(super) upload_period_ms: Option<u128>,
}
impl HeatMapTenant {
pub(crate) fn into_timelines_index(self) -> HashMap<TimelineId, HeatMapTimeline> {
self.timelines
.into_iter()
.map(|htl| (htl.timeline_id, htl))
.collect()
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(crate) timeline_id: TimelineId,
@@ -44,13 +35,13 @@ pub(crate) struct HeatMapTimeline {
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(crate) name: LayerName,
pub(crate) metadata: LayerFileMetadata,
#[serde_as(as = "TimestampSeconds<i64>")]
pub(crate) access_time: SystemTime,
pub(super) access_time: SystemTime,
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}

View File

@@ -394,7 +394,7 @@ pub(super) async fn gather_inputs(
ancestor_lsn,
last_record: last_record_lsn,
// this is not used above, because it might not have updated recently enough
latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
next_pitr_cutoff,
retention_param_cutoff,
lease_points,

View File

@@ -136,22 +136,6 @@ pub(crate) fn local_layer_path(
}
}
pub(crate) enum LastEviction {
Never,
At(std::time::Instant),
Evicting,
}
impl LastEviction {
pub(crate) fn happened_after(&self, timepoint: std::time::Instant) -> bool {
match self {
LastEviction::Never => false,
LastEviction::At(evicted_at) => evicted_at > &timepoint,
LastEviction::Evicting => true,
}
}
}
impl Layer {
/// Creates a layer value for a file we know to not be resident.
pub(crate) fn for_evicted(
@@ -421,17 +405,6 @@ impl Layer {
self.0.metadata()
}
pub(crate) fn last_evicted_at(&self) -> LastEviction {
match self.0.last_evicted_at.try_lock() {
Ok(lock) => match *lock {
None => LastEviction::Never,
Some(at) => LastEviction::At(at),
},
Err(std::sync::TryLockError::WouldBlock) => LastEviction::Evicting,
Err(std::sync::TryLockError::Poisoned(p)) => panic!("Lock poisoned: {p}"),
}
}
pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
self.0
.timeline
@@ -683,9 +656,7 @@ struct LayerInner {
/// When the Layer was last evicted but has not been downloaded since.
///
/// This is used for skipping evicted layers from the previous heatmap (see
/// `[Timeline::generate_heatmap]`) and for updating metrics
/// (see [`LayerImplMetrics::redownload_after`]).
/// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
#[cfg(test)]

View File

@@ -150,15 +150,16 @@ use super::{
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
MaybeOffloaded,
};
use super::{
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
storage_layer::ReadableLayer,
};
use super::{secondary::heatmap::HeatMapLayer, GcError};
use super::{
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
GcError,
};
#[cfg(test)]
use pageserver_api::value::Value;
@@ -351,11 +352,8 @@ pub struct Timeline {
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
// The LSN at which we have executed GC: whereas [`Self::gc_info`] records the LSN at which
// we _intend_ to GC (i.e. the PITR cutoff), this LSN records where we actually last did it.
// Because PITR interval is mutable, it's possible for this LSN to be earlier or later than
// the planned GC cutoff.
pub applied_gc_cutoff_lsn: Rcu<Lsn>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
pub(crate) gc_compaction_layer_update_lock: tokio::sync::RwLock<()>,
@@ -464,16 +462,6 @@ pub struct Timeline {
/// If Some, collects GetPage metadata for an ongoing PageTrace.
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
}
pub(crate) enum PreviousHeatmap {
Active {
heatmap: HeatMapTimeline,
read_at: std::time::Instant,
},
Obsolete,
}
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
@@ -1089,15 +1077,9 @@ impl Timeline {
(history, gc_info.within_ancestor_pitr)
}
/// Read timeline's GC cutoff: this is the LSN at which GC has started to happen
pub(crate) fn get_applied_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.applied_gc_cutoff_lsn.read()
}
/// Read timeline's planned GC cutoff: this is the logical end of history that users
/// are allowed to read (based on configured PITR), even if physically we have more history.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
self.gc_info.read().unwrap().cutoffs.time
/// Lock and get timeline's GC cutoff
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read()
}
/// Look up given page version.
@@ -1605,7 +1587,7 @@ impl Timeline {
};
if init || validate {
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!("tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
}
@@ -2577,7 +2559,6 @@ impl Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
metadata: &TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -2678,7 +2659,7 @@ impl Timeline {
LastImageLayerCreationStatus::default(),
)),
applied_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
current_logical_size: if disk_consistent_lsn.is_valid() {
@@ -2740,8 +2721,6 @@ impl Timeline {
create_idempotency,
page_trace: Default::default(),
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
};
result.repartition_threshold =
@@ -3480,52 +3459,12 @@ impl Timeline {
let guard = self.layers.read().await;
// Firstly, if there's any heatmap left over from when this location
// was a secondary, take that into account. Keep layers that are:
// * present in the layer map
// * visible
// * non-resident
// * not evicted since we read the heatmap
//
// Without this, a new cold, attached location would clobber the previous
// heatamp.
let previous_heatmap = self.previous_heatmap.load();
let visible_non_resident = match previous_heatmap.as_deref() {
Some(PreviousHeatmap::Active { heatmap, read_at }) => {
Some(heatmap.layers.iter().filter_map(|hl| {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
if layer.visibility() == LayerVisibilityHint::Covered {
return None;
}
if layer.is_likely_resident() {
return None;
}
if layer.last_evicted_at().happened_after(*read_at) {
return None;
}
Some((desc, hl.metadata.clone(), hl.access_time))
}))
}
Some(PreviousHeatmap::Obsolete) => None,
None => None,
};
// Secondly, all currently visible, resident layers are included.
let resident = guard.likely_resident_layers().filter_map(|layer| {
match layer.visibility() {
LayerVisibilityHint::Visible => {
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
let last_activity_ts = layer.latest_activity();
Some((
layer.layer_desc().clone(),
layer.metadata(),
last_activity_ts,
))
Some((layer.layer_desc(), layer.metadata(), last_activity_ts))
}
LayerVisibilityHint::Covered => {
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
@@ -3534,18 +3473,7 @@ impl Timeline {
}
});
let mut layers = match visible_non_resident {
Some(non_resident) => {
let mut non_resident = non_resident.peekable();
if non_resident.peek().is_none() {
self.previous_heatmap
.store(Some(PreviousHeatmap::Obsolete.into()));
}
non_resident.chain(resident).collect::<Vec<_>>()
}
None => resident.collect::<Vec<_>>(),
};
let mut layers = resident.collect::<Vec<_>>();
// Sort layers in order of which to download first. For a large set of layers to download, we
// want to prioritize those layers which are most likely to still be in the resident many minutes
@@ -3734,7 +3662,7 @@ impl Timeline {
// the timeline, then it will remove layers that are required for fulfilling
// the current get request (read-path cannot "look back" and notice the new
// image layer).
let _gc_cutoff_holder = timeline.get_applied_gc_cutoff_lsn();
let _gc_cutoff_holder = timeline.get_latest_gc_cutoff_lsn();
// See `compaction::compact_with_gc` for why we need this.
let _guard = timeline.gc_compaction_layer_update_lock.read().await;
@@ -4421,7 +4349,7 @@ impl Timeline {
let update = crate::tenant::metadata::MetadataUpdate::new(
disk_consistent_lsn,
ondisk_prev_record_lsn,
*self.applied_gc_cutoff_lsn.read(),
*self.latest_gc_cutoff_lsn.read(),
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
@@ -5649,7 +5577,7 @@ impl Timeline {
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
// cannot advance beyond what was already GC'd, and respect space-based retention
GcCutoffs {
time: *self.get_applied_gc_cutoff_lsn(),
time: *self.get_latest_gc_cutoff_lsn(),
space: space_cutoff,
}
}
@@ -5770,7 +5698,7 @@ impl Timeline {
let mut result: GcResult = GcResult::default();
// Nothing to GC. Return early.
let latest_gc_cutoff = *self.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
if latest_gc_cutoff >= new_gc_cutoff {
info!(
"Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
@@ -5784,7 +5712,7 @@ impl Timeline {
//
// The GC cutoff should only ever move forwards.
let waitlist = {
let write_guard = self.applied_gc_cutoff_lsn.lock_for_write();
let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
if *write_guard > new_gc_cutoff {
return Err(GcError::BadLsn {
why: format!(
@@ -6724,32 +6652,18 @@ fn is_send() {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use pageserver_api::key::Key;
use pageserver_api::value::Value;
use tracing::Instrument;
use utils::{id::TimelineId, lsn::Lsn};
use crate::tenant::{
harness::{test_img, TenantHarness},
layer_map::LayerMap,
storage_layer::{Layer, LayerName, LayerVisibilityHint},
storage_layer::{Layer, LayerName},
timeline::{DeltaLayerTestDesc, EvictionError},
PreviousHeatmap, Timeline,
Timeline,
};
use super::HeatMapTimeline;
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
assert_eq!(lhs.layers.len(), rhs.layers.len());
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
for (l, r) in lhs_rhs {
assert_eq!(l.name, r.name);
assert_eq!(l.metadata, r.metadata);
}
}
#[tokio::test]
async fn test_heatmap_generation() {
let harness = TenantHarness::create("heatmap_generation").await.unwrap();
@@ -6823,7 +6737,7 @@ mod tests {
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
let mut last_lsn = Lsn::MAX;
for layer in &heatmap.layers {
for layer in heatmap.layers {
// Covered layer should be omitted
assert!(layer.name != covered_delta.layer_name());
@@ -6838,144 +6752,6 @@ mod tests {
last_lsn = layer_lsn;
}
}
// Evict all the layers and stash the old heatmap in the timeline.
// This simulates a migration to a cold secondary location.
let guard = timeline.layers.read().await;
let mut all_layers = Vec::new();
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
all_layers.push(layer.clone());
layer.evict_and_wait(forever).await.unwrap();
}
drop(guard);
timeline
.previous_heatmap
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
})));
// Generate a new heatmap and assert that it contains the same layers as the old one.
let post_migration_heatmap = timeline.generate_heatmap().await.unwrap();
assert_heatmaps_have_same_layers(&heatmap, &post_migration_heatmap);
// Download each layer one by one. Generate the heatmap at each step and check
// that it's stable.
for layer in all_layers {
if layer.visibility() == LayerVisibilityHint::Covered {
continue;
}
eprintln!("Downloading {layer} and re-generating heatmap");
let _resident = layer
.download_and_keep_resident()
.instrument(tracing::info_span!(
parent: None,
"download_layer",
tenant_id = %timeline.tenant_shard_id.tenant_id,
shard_id = %timeline.tenant_shard_id.shard_slug(),
timeline_id = %timeline.timeline_id
))
.await
.unwrap();
let post_download_heatmap = timeline.generate_heatmap().await.unwrap();
assert_heatmaps_have_same_layers(&heatmap, &post_download_heatmap);
}
// Everything from the post-migration heatmap is now resident.
// Check that we drop it from memory.
assert!(matches!(
timeline.previous_heatmap.load().as_deref(),
Some(PreviousHeatmap::Obsolete)
));
}
#[tokio::test]
async fn test_previous_heatmap_obsoletion() {
let harness = TenantHarness::create("heatmap_previous_heatmap_obsoletion")
.await
.unwrap();
let l0_delta = DeltaLayerTestDesc::new(
Lsn(0x20)..Lsn(0x30),
Key::from_hex("000000000000000000000000000000000000").unwrap()
..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(),
vec![(
Key::from_hex("720000000033333333444444445500000000").unwrap(),
Lsn(0x25),
Value::Image(test_img("foo")),
)],
);
let image_layer = (
Lsn(0x40),
vec![(
Key::from_hex("620000000033333333444444445500000000").unwrap(),
test_img("bar"),
)],
);
let delta_layers = vec![l0_delta];
let image_layers = vec![image_layer];
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
14,
&ctx,
delta_layers,
image_layers,
Lsn(0x100),
)
.await
.unwrap();
// Layer visibility is an input to heatmap generation, so refresh it first
timeline.update_layer_visibility().await.unwrap();
let heatmap = timeline
.generate_heatmap()
.await
.expect("Infallible while timeline is not shut down");
// Both layers should be in the heatmap
assert!(!heatmap.layers.is_empty());
// Now simulate a migration.
timeline
.previous_heatmap
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
})));
// Evict all the layers in the previous heatmap
let guard = timeline.layers.read().await;
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
layer.evict_and_wait(forever).await.unwrap();
}
drop(guard);
// Generate a new heatmap and check that the previous heatmap
// has been marked obsolete.
let post_eviction_heatmap = timeline
.generate_heatmap()
.await
.expect("Infallible while timeline is not shut down");
assert!(post_eviction_heatmap.layers.is_empty());
assert!(matches!(
timeline.previous_heatmap.load().as_deref(),
Some(PreviousHeatmap::Obsolete)
));
}
#[tokio::test]

View File

@@ -852,7 +852,7 @@ impl Timeline {
//
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
// are rewriting layers.
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
tracing::info!(
"latest_gc_cutoff: {}, pitr cutoff {}",
@@ -2202,7 +2202,7 @@ impl Timeline {
// TODO: ensure the child branches will not use anything below the watermark, or consider
// them when computing the watermark.
gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
}
/// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.

View File

@@ -294,7 +294,6 @@ impl DeleteTimelineFlow {
timeline_id,
local_metadata,
None, // Ancestor is not needed for deletion.
None, // Previous heatmap is not needed for deletion
tenant.get_timeline_resources_for(remote_client),
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.

View File

@@ -496,8 +496,7 @@ pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
let backtrace = std::backtrace::Backtrace::force_capture();
tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
tracing::error!("Fatal I/O error: {e}: {context})");
std::process::abort();
}
@@ -948,18 +947,13 @@ impl VirtualFileInner {
where
Buf: tokio_epoll_uring::IoBufMut + Send,
{
let file_guard = match self
.lock_file()
.await
.maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
{
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
observe_duration!(StorageIoOperation::Read, {
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
if let Ok(size) = res {
STORAGE_IO_SIZE
.with_label_values(&[

View File

@@ -14,7 +14,7 @@
#include "utils/guc.h"
#include "extension_server.h"
#include "extension_server.h"
#include "neon_utils.h"
static int extension_server_port = 0;
@@ -45,7 +45,7 @@ neon_download_extension_file_http(const char *filename, bool is_library)
handle = alloc_curl_handle();
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 60L /* seconds */ );
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ );
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",

View File

@@ -3765,7 +3765,7 @@ neon_dbsize(Oid dbNode)
* neon_truncate() -- Truncate relation to specified number of blocks.
*/
static void
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
{
XLogRecPtr lsn;
@@ -3780,7 +3780,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdtruncate(reln, forknum, old_blocks, nblocks);
mdtruncate(reln, forknum, nblocks);
return;
default:
@@ -3818,7 +3818,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdtruncate(reln, forknum, old_blocks, nblocks);
mdtruncate(reln, forknum, nblocks);
#endif
}

View File

@@ -96,7 +96,7 @@ static void inmem_writeback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
static BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
static void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber old_blocks, BlockNumber nblocks);
BlockNumber nblocks);
static void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
#if PG_MAJORVERSION_NUM >= 17
static void inmem_registersync(SMgrRelation reln, ForkNumber forknum);
@@ -345,7 +345,7 @@ inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
* inmem_truncate() -- Truncate relation to specified number of blocks.
*/
static void
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
{
}

View File

@@ -501,7 +501,7 @@ impl Session {
_guard: Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HDel),
.guard(RedisMsgKind::HSet),
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {

View File

@@ -5,7 +5,7 @@
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
use std::error::Error as _;
use utils::{
id::{NodeId, TenantId, TimelineId},
@@ -32,9 +32,6 @@ pub enum Error {
/// Status is not ok; parsed error in body as `HttpErrorBody`.
#[error("safekeeper API: {1}")]
ApiError(StatusCode, String),
#[error("Cancelled")]
Cancelled,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -127,10 +124,9 @@ impl Client {
self.get(&uri).await
}
pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
pub async fn utilization(&self) -> Result<reqwest::Response> {
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
self.get(&uri).await
}
async fn post<B: serde::Serialize, U: IntoUrl>(

View File

@@ -626,7 +626,7 @@ pub fn make_router(
failpoints_handler(r, cancel).await
})
})
.get("/v1/utilization", |r| request_span(r, utilization_handler))
.get("/v1/uzilization", |r| request_span(r, utilization_handler))
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})

View File

@@ -310,12 +310,9 @@ impl WalBackupTask {
retry_attempt = 0;
}
Err(e) => {
// We might have managed to upload some segment even though
// some later in the range failed, so log backup_lsn
// separately.
error!(
"failed while offloading range {}-{}, backup_lsn {}: {:?}",
backup_lsn, commit_lsn, backup_lsn, e
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = retry_attempt.saturating_add(1);
@@ -341,13 +338,6 @@ async fn backup_lsn_range(
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
info!(
"offloading segnos {:?} of range [{}-{})",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
start_lsn,
end_lsn,
);
// Pool of concurrent upload tasks. We use `FuturesOrdered` to
// preserve order of uploads, and update `backup_lsn` only after
// all previous uploads are finished.
@@ -384,10 +374,10 @@ async fn backup_lsn_range(
}
info!(
"offloaded segnos {:?} of range [{}-{})",
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
start_lsn,
end_lsn,
start_lsn,
);
Ok(())
}

View File

@@ -26,15 +26,12 @@ humantime.workspace = true
itertools.workspace = true
lasso.workspace = true
once_cell.workspace = true
governor = {version = "0.8.0"}
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
rustls-native-certs.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -1,10 +1,6 @@
use futures::{stream::FuturesUnordered, StreamExt};
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use std::{
collections::HashMap,
fmt::Debug,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
@@ -13,15 +9,15 @@ use tokio_util::sync::CancellationToken;
use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
use thiserror::Error;
use utils::{id::NodeId, logging::SecretString};
use utils::id::NodeId;
use crate::{node::Node, safekeeper::Safekeeper};
use crate::node::Node;
struct HeartbeaterTask<Server, State> {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
struct HeartbeaterTask {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
cancel: CancellationToken,
state: HashMap<NodeId, State>,
state: HashMap<NodeId, PageserverState>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
@@ -40,17 +36,8 @@ pub(crate) enum PageserverState {
Offline,
}
#[derive(Debug, Clone)]
pub(crate) enum SafekeeperState {
Available {
last_seen_at: Instant,
utilization: SafekeeperUtilization,
},
Offline,
}
#[derive(Debug)]
pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
#[derive(Debug, Error)]
pub(crate) enum HeartbeaterError {
@@ -58,28 +45,23 @@ pub(crate) enum HeartbeaterError {
Cancel,
}
struct HeartbeatRequest<Server, State> {
servers: Arc<HashMap<NodeId, Server>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
struct HeartbeatRequest {
pageservers: Arc<HashMap<NodeId, Node>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
}
pub(crate) struct Heartbeater<Server, State> {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
pub(crate) struct Heartbeater {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
}
#[allow(private_bounds)]
impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
impl Heartbeater {
pub(crate) fn new(
jwt_token: Option<String>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
) -> Self {
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
let mut heartbeater = HeartbeaterTask::new(
receiver,
jwt_token,
@@ -94,12 +76,12 @@ where
pub(crate) async fn heartbeat(
&self,
servers: Arc<HashMap<NodeId, Server>>,
) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
.send(HeartbeatRequest {
servers,
pageservers,
reply: sender,
})
.map_err(|_| HeartbeaterError::Cancel)?;
@@ -111,12 +93,9 @@ where
}
}
impl<Server, State: Debug> HeartbeaterTask<Server, State>
where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
impl HeartbeaterTask {
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
jwt_token: Option<String>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
@@ -131,13 +110,14 @@ where
jwt_token,
}
}
async fn run(&mut self) {
loop {
tokio::select! {
request = self.receiver.recv() => {
match request {
Some(req) => {
let res = self.heartbeat(req.servers).await;
let res = self.heartbeat(req.pageservers).await;
req.reply.send(res).unwrap();
},
None => { return; }
@@ -147,20 +127,11 @@ where
}
}
}
}
pub(crate) trait HeartBeat<Server, State> {
fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Server>>,
) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
}
impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
async fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
) -> Result<AvailablityDeltas, HeartbeaterError> {
let mut new_state = HashMap::new();
let mut heartbeat_futs = FuturesUnordered::new();
@@ -301,121 +272,3 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
Ok(AvailablityDeltas(deltas))
}
}
impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
async fn heartbeat(
&mut self,
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
let mut new_state = HashMap::new();
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, sk) in &*safekeepers {
heartbeat_futs.push({
let jwt_token = self
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let cancel = self.cancel.clone();
async move {
let response = sk
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
3,
3,
Duration::from_secs(1),
&cancel,
)
.await;
let status = match response {
Ok(utilization) => SafekeeperState::Available {
last_seen_at: Instant::now(),
utilization,
},
Err(mgmt_api::Error::Cancelled) => {
// This indicates cancellation of the request.
// We ignore the node in this case.
return None;
}
Err(_) => SafekeeperState::Offline,
};
Some((*node_id, status))
}
});
loop {
let maybe_status = tokio::select! {
next = heartbeat_futs.next() => {
match next {
Some(result) => result,
None => { break; }
}
},
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
};
if let Some((node_id, status)) = maybe_status {
new_state.insert(node_id, status);
}
}
}
let mut offline = 0;
for state in new_state.values() {
match state {
SafekeeperState::Offline { .. } => offline += 1,
SafekeeperState::Available { .. } => {}
}
}
tracing::info!(
"Heartbeat round complete for {} safekeepers, {} offline",
new_state.len(),
offline
);
let mut deltas = Vec::new();
let now = Instant::now();
for (node_id, sk_state) in new_state.iter_mut() {
use std::collections::hash_map::Entry::*;
let entry = self.state.entry(*node_id);
let mut needs_update = false;
match entry {
Occupied(ref occ) => match (occ.get(), &sk_state) {
(SafekeeperState::Offline, SafekeeperState::Offline) => {}
(SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
if now - *last_seen_at >= self.max_offline_interval {
deltas.push((*node_id, sk_state.clone()));
needs_update = true;
}
}
_ => {
deltas.push((*node_id, sk_state.clone()));
needs_update = true;
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((*node_id, sk_state.clone()));
}
}
match entry {
Occupied(mut occ) if needs_update => {
(*occ.get_mut()) = sk_state.clone();
}
Vacant(vac) => {
vac.insert(sk_state.clone());
}
_ => {}
}
}
Ok(AvailablityDeltas(deltas))
}
}

View File

@@ -8,7 +8,6 @@ use crate::reconciler::ReconcileError;
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
use futures::Future;
use governor::{Quota, RateLimiter};
use http_utils::{
endpoint::{self, auth_middleware, check_permission_with, request_span},
error::ApiError,
@@ -33,7 +32,6 @@ use pageserver_api::models::{
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::{mgmt_api, BlockUnblock};
use std::num::NonZero;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -518,14 +516,6 @@ async fn handle_tenant_timeline_block_unblock_gc(
json_response(StatusCode::OK, ())
}
static PASSTHROUGH_RATE_LIMITER: std::sync::OnceLock<
RateLimiter<
TenantId,
governor::state::keyed::DefaultKeyedStateStore<TenantId>,
governor::clock::DefaultClock,
>,
> = std::sync::OnceLock::new();
async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
@@ -547,19 +537,6 @@ async fn handle_tenant_timeline_passthrough(
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
// Proxied requests are expected to be rare on a per-tenant basis: these are things
// like inspecting a timeline's details or doing an LSN<->timestamp mapping. Not anything
// that has high throughput.
let limiter = PASSTHROUGH_RATE_LIMITER.get_or_init(|| {
RateLimiter::new(
Quota::per_second(NonZero::new(10).unwrap()),
governor::state::keyed::DefaultKeyedStateStore::new(),
governor::clock::DefaultClock::default(),
)
});
limiter.until_key_ready(&tenant_id).await;
// Find the node that holds shard zero
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;

View File

@@ -17,8 +17,6 @@ mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod safekeeper;
mod safekeeper_client;
mod scheduler;
mod schema;
pub mod service;

View File

@@ -80,11 +80,6 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
@@ -92,13 +87,6 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_passthrough_request_error:

View File

@@ -1185,6 +1185,23 @@ impl Persistence {
Ok(safekeepers)
}
pub(crate) async fn safekeeper_get(
&self,
id: i64,
) -> Result<SafekeeperPersistence, DatabaseError> {
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
self.with_conn(move |conn| {
Box::pin(async move {
Ok(safekeepers
.filter(id_column.eq(&id))
.select(SafekeeperPersistence::as_select())
.get_result(conn)
.await?)
})
})
.await
}
pub(crate) async fn safekeeper_upsert(
&self,
record: SafekeeperUpsert,
@@ -1537,21 +1554,6 @@ pub(crate) struct SafekeeperPersistence {
}
impl SafekeeperPersistence {
pub(crate) fn from_upsert(
upsert: SafekeeperUpsert,
scheduling_policy: SkSchedulingPolicy,
) -> Self {
crate::persistence::SafekeeperPersistence {
id: upsert.id,
region_id: upsert.region_id,
version: upsert.version,
host: upsert.host,
port: upsert.port,
http_port: upsert.http_port,
availability_zone_id: upsert.availability_zone_id,
scheduling_policy: String::from(scheduling_policy),
}
}
pub(crate) fn as_describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let scheduling_policy =
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {

View File

@@ -1,7 +1,7 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::{compute_hook, service};
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
};
@@ -162,22 +162,6 @@ impl ReconcilerConfig {
}
}
impl From<&MigrationConfig> for ReconcilerConfig {
fn from(value: &MigrationConfig) -> Self {
let mut builder = ReconcilerConfigBuilder::new();
if let Some(timeout) = value.secondary_warmup_timeout {
builder = builder.secondary_warmup_timeout(timeout)
}
if let Some(timeout) = value.secondary_download_request_timeout {
builder = builder.secondary_download_request_timeout(timeout)
}
builder.build()
}
}
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
pub(crate) struct ReconcileUnits {
_sem_units: tokio::sync::OwnedSemaphorePermit,

View File

@@ -1,139 +0,0 @@
use std::{str::FromStr, time::Duration};
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use reqwest::StatusCode;
use safekeeper_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::NodeId, logging::SecretString};
use crate::{
heartbeater::SafekeeperState,
persistence::{DatabaseError, SafekeeperPersistence},
safekeeper_client::SafekeeperClient,
};
#[derive(Clone)]
pub struct Safekeeper {
pub(crate) skp: SafekeeperPersistence,
cancel: CancellationToken,
listen_http_addr: String,
listen_http_port: u16,
id: NodeId,
availability: SafekeeperState,
}
impl Safekeeper {
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
Self {
cancel,
listen_http_addr: skp.host.clone(),
listen_http_port: skp.http_port as u16,
id: NodeId(skp.id as u64),
skp,
availability: SafekeeperState::Offline,
}
}
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}
pub(crate) fn get_id(&self) -> NodeId {
self.id
}
pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.skp.as_describe_response()
}
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
self.availability = availability;
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
jwt: &Option<SecretString>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> mgmt_api::Result<T>
where
O: FnMut(SafekeeperClient) -> F,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
}
}
backoff::retry(
|| {
let http_client = reqwest::ClientBuilder::new()
.timeout(timeout)
.build()
.expect("Failed to construct HTTP client");
let client = SafekeeperClient::from_client(
self.get_id(),
http_client,
self.base_url(),
jwt.clone(),
);
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
async {
tokio::select! {
r = op_fut=> {r},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}
}
},
is_fatal,
warn_threshold,
max_retries,
&format!(
"Call to node {} ({}:{}) management API",
self.id, self.listen_http_addr, self.listen_http_port
),
cancel,
)
.await
.unwrap_or(Err(mgmt_api::Error::Cancelled))
}
pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
let crate::persistence::SafekeeperUpsert {
active: _,
availability_zone_id: _,
host,
http_port,
id,
port: _,
region_id: _,
version: _,
} = record.clone();
if id != self.id.0 as i64 {
// The way the function is called ensures this. If we regress on that, it's a bug.
panic!(
"id can't be changed via update_from_record function: {id} != {}",
self.id.0
);
}
self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
);
self.listen_http_port = http_port as u16;
self.listen_http_addr = host;
}
}

View File

@@ -1,105 +0,0 @@
use crate::metrics::PageserverRequestLabelGroup;
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
///
/// Analogous to [`crate::pageserver_client::PageserverClient`].
#[derive(Debug, Clone)]
pub(crate) struct SafekeeperClient {
inner: Client,
node_id_label: String,
}
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_latency;
let _timer_guard = latency.start_timer(labels.clone());
let res = $invoke;
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}
res
}};
}
impl SafekeeperClient {
#[allow(dead_code)]
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
#[allow(dead_code)]
pub(crate) async fn create_timeline(
&self,
req: &TimelineCreateRequest,
) -> Result<TimelineStatus> {
measured_request!(
"create_timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.create_timeline(req).await
)
}
#[allow(dead_code)]
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineStatus> {
measured_request!(
"delete_timeline",
crate::metrics::Method::Delete,
&self.node_id_label,
self.inner.delete_timeline(tenant_id, timeline_id).await
)
}
pub(crate) async fn get_utilization(&self) -> Result<SafekeeperUtilization> {
measured_request!(
"utilization",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.utilization().await
)
}
}

View File

@@ -2,7 +2,6 @@ pub mod chaos_injector;
mod context_iterator;
use hyper::Uri;
use safekeeper_api::models::SafekeeperUtilization;
use std::{
borrow::Cow,
cmp::Ordering,
@@ -21,7 +20,6 @@ use crate::{
},
compute_hook::{self, NotifyError},
drain_utils::{self, TenantShardDrain, TenantShardIterator},
heartbeater::SafekeeperState,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
leadership::Leadership,
metrics,
@@ -31,7 +29,6 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper::Safekeeper,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
@@ -209,8 +206,6 @@ struct ServiceState {
nodes: Arc<HashMap<NodeId, Node>>,
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
scheduler: Scheduler,
/// Ongoing background operation on the cluster if any is running.
@@ -277,7 +272,6 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
impl ServiceState {
fn new(
nodes: HashMap<NodeId, Node>,
safekeepers: HashMap<NodeId, Safekeeper>,
tenants: BTreeMap<TenantShardId, TenantShard>,
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
@@ -289,7 +283,6 @@ impl ServiceState {
leadership_status: initial_leadership_status,
tenants,
nodes: Arc::new(nodes),
safekeepers: Arc::new(safekeepers),
scheduler,
ongoing_operation: None,
delayed_reconcile_rx,
@@ -306,23 +299,6 @@ impl ServiceState {
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
}
#[allow(clippy::type_complexity)]
fn parts_mut_sk(
&mut self,
) -> (
&mut Arc<HashMap<NodeId, Node>>,
&mut Arc<HashMap<NodeId, Safekeeper>>,
&mut BTreeMap<TenantShardId, TenantShard>,
&mut Scheduler,
) {
(
&mut self.nodes,
&mut self.safekeepers,
&mut self.tenants,
&mut self.scheduler,
)
}
fn get_leadership_status(&self) -> LeadershipStatus {
self.leadership_status
}
@@ -421,8 +397,7 @@ pub struct Service {
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
heartbeater_ps: Heartbeater<Node, PageserverState>,
heartbeater_sk: Heartbeater<Safekeeper, SafekeeperState>,
heartbeater: Heartbeater,
// Channel for background cleanup from failed operations that require cleanup, such as shard split
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
@@ -632,8 +607,7 @@ impl Service {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let (mut nodes_online, mut sks_online) =
self.initial_heartbeat_round(all_nodes.keys()).await;
let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
// List of tenants for which we will attempt to notify compute of their location at startup
let mut compute_notifications = Vec::new();
@@ -642,7 +616,7 @@ impl Service {
tracing::info!("Populating tenant shards' states from initial pageserver scan...");
let shard_count = {
let mut locked = self.inner.write().unwrap();
let (nodes, safekeepers, tenants, scheduler) = locked.parts_mut_sk();
let (nodes, tenants, scheduler) = locked.parts_mut();
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
let mut new_nodes = (**nodes).clone();
@@ -654,17 +628,6 @@ impl Service {
}
*nodes = Arc::new(new_nodes);
let mut new_sks = (**safekeepers).clone();
for (node_id, node) in new_sks.iter_mut() {
if let Some((utilization, last_seen_at)) = sks_online.remove(node_id) {
node.set_availability(SafekeeperState::Available {
utilization,
last_seen_at,
});
}
}
*safekeepers = Arc::new(new_sks);
for (tenant_shard_id, observed_state) in observed.0 {
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
for node_id in observed_state.locations.keys() {
@@ -773,10 +736,7 @@ impl Service {
async fn initial_heartbeat_round<'a>(
&self,
node_ids: impl Iterator<Item = &'a NodeId>,
) -> (
HashMap<NodeId, PageserverUtilization>,
HashMap<NodeId, (SafekeeperUtilization, Instant)>,
) {
) -> HashMap<NodeId, PageserverUtilization> {
assert!(!self.startup_complete.is_ready());
let all_nodes = {
@@ -796,20 +756,14 @@ impl Service {
}
}
let all_sks = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
tracing::info!("Sending initial heartbeats...");
let res_ps = self
.heartbeater_ps
let res = self
.heartbeater
.heartbeat(Arc::new(nodes_to_heartbeat))
.await;
let res_sk = self.heartbeater_sk.heartbeat(all_sks).await;
let mut online_nodes = HashMap::new();
if let Ok(deltas) = res_ps {
if let Ok(deltas) = res {
for (node_id, status) in deltas.0 {
match status {
PageserverState::Available { utilization, .. } => {
@@ -823,22 +777,7 @@ impl Service {
}
}
let mut online_sks = HashMap::new();
if let Ok(deltas) = res_sk {
for (node_id, status) in deltas.0 {
match status {
SafekeeperState::Available {
utilization,
last_seen_at,
} => {
online_sks.insert(node_id, (utilization, last_seen_at));
}
SafekeeperState::Offline => {}
}
}
}
(online_nodes, online_sks)
online_nodes
}
/// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
@@ -1045,14 +984,8 @@ impl Service {
locked.nodes.clone()
};
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
let res_ps = self.heartbeater_ps.heartbeat(nodes).await;
let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await;
if let Ok(deltas) = res_ps {
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
let mut to_handle = Vec::default();
for (node_id, state) in deltas.0 {
@@ -1153,18 +1086,6 @@ impl Service {
}
}
}
if let Ok(deltas) = res_sk {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {
let Some(sk) = safekeepers.get_mut(&id) else {
tracing::info!("Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}");
continue;
};
sk.set_availability(state);
}
locked.safekeepers = Arc::new(safekeepers);
}
}
}
@@ -1390,17 +1311,6 @@ impl Service {
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
tracing::info!("Loading safekeepers from database...");
let safekeepers = persistence
.list_safekeepers()
.await?
.into_iter()
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
.collect::<Vec<_>>();
let safekeepers: HashMap<NodeId, Safekeeper> =
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
tracing::info!(
@@ -1527,14 +1437,7 @@ impl Service {
let cancel = CancellationToken::new();
let reconcilers_cancel = cancel.child_token();
let heartbeater_ps = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
);
let heartbeater_sk = Heartbeater::new(
let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
@@ -1550,7 +1453,6 @@ impl Service {
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes,
safekeepers,
tenants,
scheduler,
delayed_reconcile_rx,
@@ -1560,8 +1462,7 @@ impl Service {
persistence,
compute_hook: Arc::new(ComputeHook::new(config.clone())),
result_tx,
heartbeater_ps,
heartbeater_sk,
heartbeater,
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
@@ -5213,12 +5114,7 @@ impl Service {
shard.sequence = shard.sequence.next();
}
let reconciler_config = match migrate_req.migration_config {
Some(cfg) => (&cfg).into(),
None => ReconcilerConfig::default(),
};
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
@@ -7765,54 +7661,29 @@ impl Service {
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
let locked = self.inner.read().unwrap();
let mut list = locked
.safekeepers
.iter()
.map(|sk| sk.1.describe_response())
.collect::<Result<Vec<_>, _>>()?;
list.sort_by_key(|v| v.id);
Ok(list)
self.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|v| v.as_describe_response())
.collect::<Result<Vec<_>, _>>()
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let locked = self.inner.read().unwrap();
let sk = locked
.safekeepers
.get(&NodeId(id as u64))
.ok_or(diesel::result::Error::NotFound)?;
sk.describe_response()
self.persistence
.safekeeper_get(id)
.await
.and_then(|v| v.as_describe_response())
}
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), DatabaseError> {
let node_id = NodeId(record.id as u64);
self.persistence.safekeeper_upsert(record.clone()).await?;
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
match safekeepers.entry(node_id) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().update_from_record(record);
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
),
CancellationToken::new(),
));
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
self.persistence.safekeeper_upsert(record).await
}
pub(crate) async fn set_safekeeper_scheduling_policy(
@@ -7822,20 +7693,7 @@ impl Service {
) -> Result<(), DatabaseError> {
self.persistence
.set_safekeeper_scheduling_policy(id, scheduling_policy)
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.skp.scheduling_policy = String::from(scheduling_policy);
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
.await
}
pub(crate) async fn update_shards_preferred_azs(

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
import abc
import asyncio
import concurrent.futures
import dataclasses
import filecmp
import json
import os
@@ -1676,12 +1675,6 @@ class StorageControllerLeadershipStatus(StrEnum):
CANDIDATE = "candidate"
@dataclass
class StorageControllerMigrationConfig:
secondary_warmup_timeout: str | None
secondary_download_request_timeout: str | None
class NeonStorageController(MetricsGetter, LogUtils):
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
self.env = env
@@ -2075,20 +2068,11 @@ class NeonStorageController(MetricsGetter, LogUtils):
shards: list[TenantShardId] = body["new_shards"]
return shards
def tenant_shard_migrate(
self,
tenant_shard_id: TenantShardId,
dest_ps_id: int,
config: StorageControllerMigrationConfig | None = None,
):
payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}
if config is not None:
payload["migration_config"] = dataclasses.asdict(config)
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
self.request(
"PUT",
f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate",
json=payload,
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
headers=self.headers(TokenScope.ADMIN),
)
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
@@ -4988,13 +4972,8 @@ def check_restored_datadir_content(
restored_files = list_files_to_compare(restored_dir_path)
# pg_notify files are always ignored
pgdata_files = [f for f in pgdata_files if not f.startswith("pg_notify")]
restored_files = [f for f in restored_files if not f.startswith("pg_notify")]
# pg_xact and pg_multixact files are optional in basebackup: depending on our configuration they
# may be omitted and loaded on demand.
if pgdata_files != restored_files:
# filter pg_xact and multixact files which are downloaded on demand
pgdata_files = [
f
for f in pgdata_files

View File

@@ -231,14 +231,14 @@ def test_pgdata_import_smoke(
shard_zero_http = shard_zero_ps.http_client()
shard_zero_timeline_info = shard_zero_http.timeline_detail(shard_zero["shard_id"], timeline_id)
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
min_readable_lsn = Lsn(shard_zero_timeline_info["min_readable_lsn"])
latest_gc_cutoff_lsn = Lsn(shard_zero_timeline_info["latest_gc_cutoff_lsn"])
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
assert remote_consistent_lsn_visible == disk_consistent_lsn
assert initdb_lsn == min_readable_lsn
assert initdb_lsn == latest_gc_cutoff_lsn
assert disk_consistent_lsn == initdb_lsn + 8
assert last_record_lsn == disk_consistent_lsn
# TODO: assert these values are the same everywhere

View File

@@ -10,18 +10,14 @@ from typing import TYPE_CHECKING
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonPageserver,
StorageControllerMigrationConfig,
)
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import (
assert_prefix_empty,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
from fixtures.utils import run_only_on_default_postgres, skip_in_debug_build, wait_until
from fixtures.utils import skip_in_debug_build, wait_until
from fixtures.workload import Workload
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -893,93 +889,3 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll
assert progress_3["heatmap_mtime"] is not None
assert progress_3["layers_total"] == progress_3["layers_downloaded"]
assert progress_3["bytes_total"] == progress_3["bytes_downloaded"]
@skip_in_debug_build("only run with release build")
@run_only_on_default_postgres("PG version is not interesting here")
def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_configs()
env.start()
assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Attached":1}')
env.storage_controller.reconcile_until_idle()
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
# Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(128, upload=True)
workload.write_rows(128, upload=True)
workload.write_rows(128, upload=True)
workload.write_rows(128, upload=True)
workload.stop()
# Expect lots of layers
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
# Simulate large data by making layer downloads artifically slow
for ps in env.pageservers:
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
# Upload a heatmap, so that secondaries have something to download
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
# However, it pulls the heatmap, which will be important later.
http_client = env.storage_controller.pageserver_api()
(status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
assert status == 202
assert progress["heatmap_mtime"] is not None
assert progress["layers_downloaded"] > 0
assert progress["bytes_downloaded"] > 0
assert progress["layers_total"] > progress["layers_downloaded"]
assert progress["bytes_total"] > progress["bytes_downloaded"]
env.storage_controller.allowed_errors.extend(
[
".*Timed out.*downloading layers.*",
]
)
# Use a custom configuration that gives up earlier than usual.
# We can't hydrate everything anyway because of the failpoints.
config = StorageControllerMigrationConfig(
secondary_warmup_timeout="5s", secondary_download_request_timeout="2s"
)
env.storage_controller.tenant_shard_migrate(
TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config
)
env.storage_controller.reconcile_until_idle()
assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0
# The new layer map should contain all the layers in the pre-migration one
# and a new in memory layer
assert len(heatmap_before_migration["timelines"][0]["layers"]) + 1 == len(
heatmap_after_migration["timelines"][0]["layers"]
)
log.info(
f'Heatmap size after cold migration is {len(heatmap_after_migration["timelines"][0]["layers"])}'
)
# TODO: Once we have an endpoint for rescuing the cold location, exercise it here.

View File

@@ -261,7 +261,7 @@ def test_isolation(
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)
# This fails with a mismatch on `pg_multixact/offsets/0000`
post_checks(env, test_output_dir, DBNAME, endpoint)
# post_checks(env, test_output_dir, DBNAME, endpoint)
# Run extra Neon-specific pg_regress-based tests. The tests and their

View File

@@ -287,7 +287,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
offset=offset,
)
# Do some update so we can increment gc_cutoff
# Do some update so we can increment latest_gc_cutoff
generate_updates_on_main(env, ep_main, i, end=100)
# Wait for the existing lease to expire.

View File

@@ -1821,7 +1821,7 @@ def test_sharding_gc(
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
ps.allowed_errors.extend(
[
".*could not find data for key.*",
".*could not find data for key 020000000000000000000000000000000000.*",
".*could not ingest record.*",
]
)

View File

@@ -3189,17 +3189,15 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert len(target.get_safekeepers()) == 0
sk_0 = env.safekeepers[0]
body = {
"active": True,
"id": fake_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "localhost",
"port": sk_0.port.pg,
"http_port": sk_0.port.http,
"host": "safekeeper-333.us-east-2.aws.neon.build",
"port": 6401,
"http_port": 7676,
"version": 5957,
"availability_zone_id": "us-east-2b",
}
@@ -3245,13 +3243,6 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
# Ensure idempotency
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
def storcon_heartbeat():
assert env.storage_controller.log_contains(
"Heartbeat round complete for 1 safekeepers, 0 offline"
)
wait_until(storcon_heartbeat)
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]

View File

@@ -318,7 +318,7 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
ps.allowed_errors.extend(
[
".*could not find data for key.*",
".*could not find data for key 020000000000000000000000000000000000.*",
".*could not ingest record.*",
]
)

View File

@@ -566,14 +566,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix)
# This test is flaky, probably because PUTs of local fs storage are not atomic.
# Let's keep both remote storage kinds for a while to see if this is the case.
# https://github.com/neondatabase/neon/issues/10761
@pytest.mark.parametrize("remote_storage_kind", [s3_storage(), RemoteStorageKind.LOCAL_FS])
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant

16
vendor/revisions.json vendored
View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.3",
"4d3a722312b496ff7378156caa6d41c2e70c30e4"
"17.2",
"4c45d78ad587e4bcb4a5a7ef6931b88c6a3d575d"
],
"v16": [
"16.7",
"999cf81b101ead40e597d5cd729458d8200f4537"
"16.6",
"13cf5d06c98a8e9b0590ce6cdfd193a08d0a7792"
],
"v15": [
"15.11",
"80ed91ce255c765d25be0bb4a02c942fe6311fbf"
"15.10",
"355a7c69d3f907f3612eb406cc7b9c2f55d59b59"
],
"v14": [
"14.16",
"62a86dfc91e0c35a72f2ea5e99e6969b830c0c26"
"14.15",
"c0aedfd3cac447510a2db843b561f0c52901b679"
]
}