mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 07:00:38 +00:00
Compare commits
2 Commits
jcsp/storc
...
rewrite-co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
feead26e04 | ||
|
|
87a9afbc64 |
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -263,9 +263,8 @@ jobs:
|
||||
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
|
||||
|
||||
benchmarks:
|
||||
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `deploy` in PRs
|
||||
if: github.ref_name == 'main' || (contains(github.event.pull_request.labels.*.name, 'run-benchmarks') && !failure() && !cancelled())
|
||||
needs: [ check-permissions, build-build-tools-image, get-benchmarks-durations, deploy ]
|
||||
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
|
||||
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
|
||||
2
.github/workflows/trigger-e2e-tests.yml
vendored
2
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -88,7 +88,7 @@ jobs:
|
||||
BUILD_AND_TEST_RUN_ID=${TAG}
|
||||
while true; do
|
||||
gh run --repo ${GITHUB_REPOSITORY} view ${BUILD_AND_TEST_RUN_ID} --json jobs --jq '[.jobs[] | select((.name | startswith("push-neon-image-dev")) or (.name | startswith("push-compute-image-dev"))) | {"name": .name, "conclusion": .conclusion, "url": .url}]' > jobs.json
|
||||
if [ $(jq '[.[] | select(.conclusion == "success")] | length' jobs.json) -eq 2 ]; then
|
||||
if [ $(jq '[.[] | select(.conclusion == "success")]' jobs.json) -eq 2 ]; then
|
||||
break
|
||||
fi
|
||||
jq -c '.[]' jobs.json | while read -r job; do
|
||||
|
||||
81
Cargo.lock
generated
81
Cargo.lock
generated
@@ -1293,7 +1293,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"jsonwebtoken",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
@@ -1321,7 +1320,6 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
@@ -2398,9 +2396,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
|
||||
|
||||
[[package]]
|
||||
name = "futures-timer"
|
||||
version = "3.0.3"
|
||||
version = "3.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
|
||||
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
@@ -2503,27 +2501,6 @@ version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
|
||||
[[package]]
|
||||
name = "governor"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"dashmap 6.1.0",
|
||||
"futures-sink",
|
||||
"futures-timer",
|
||||
"futures-util",
|
||||
"no-std-compat",
|
||||
"nonzero_ext",
|
||||
"parking_lot 0.12.1",
|
||||
"portable-atomic",
|
||||
"quanta",
|
||||
"rand 0.8.5",
|
||||
"smallvec",
|
||||
"spinning_top",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "group"
|
||||
version = "0.12.1"
|
||||
@@ -3723,12 +3700,6 @@ dependencies = [
|
||||
"memoffset 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "no-std-compat"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@@ -3739,12 +3710,6 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nonzero_ext"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "8.0.0"
|
||||
@@ -4603,12 +4568,6 @@ dependencies = [
|
||||
"never-say-never",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "portable-atomic"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
|
||||
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.7"
|
||||
@@ -5075,21 +5034,6 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quanta"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"raw-cpuid",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
@@ -5220,15 +5164,6 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "raw-cpuid"
|
||||
version = "11.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.7.0"
|
||||
@@ -6437,15 +6372,6 @@ version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
|
||||
[[package]]
|
||||
name = "spinning_top"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
version = "0.6.0"
|
||||
@@ -6521,7 +6447,6 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"governor",
|
||||
"hex",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
@@ -6539,8 +6464,6 @@ dependencies = [
|
||||
"routerify",
|
||||
"rustls 0.23.18",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scoped-futures",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
|
||||
16
Dockerfile
16
Dockerfile
@@ -50,14 +50,6 @@ RUN set -e \
|
||||
&& rm -rf pg_install/build \
|
||||
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
|
||||
|
||||
# Prepare cargo-chef recipe
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS plan
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
# Build neon binaries
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS build
|
||||
WORKDIR /home/nonroot
|
||||
@@ -71,15 +63,9 @@ COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_i
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
|
||||
COPY --from=plan /home/nonroot/recipe.json recipe.json
|
||||
|
||||
ARG ADDITIONAL_RUSTFLAGS=""
|
||||
|
||||
RUN set -e \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
|
||||
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
ARG ADDITIONAL_RUSTFLAGS
|
||||
RUN set -e \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
|
||||
--bin pg_sni_router \
|
||||
|
||||
@@ -300,7 +300,6 @@ ARG CARGO_HAKARI_VERSION=0.9.33
|
||||
ARG CARGO_DENY_VERSION=0.16.2
|
||||
ARG CARGO_HACK_VERSION=0.6.33
|
||||
ARG CARGO_NEXTEST_VERSION=0.9.85
|
||||
ARG CARGO_CHEF_VERSION=0.1.71
|
||||
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
chmod +x rustup-init && \
|
||||
@@ -315,7 +314,6 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
|
||||
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
|
||||
cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \
|
||||
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \
|
||||
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
|
||||
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \
|
||||
--features postgres-bundled --no-default-features && \
|
||||
rm -rf /home/nonroot/.cargo/registry && \
|
||||
|
||||
@@ -1750,7 +1750,7 @@ COPY --from=pg_graphql-src /ext-src/ /ext-src/
|
||||
COPY --from=hypopg-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_hashids-src /ext-src/ /ext-src/
|
||||
COPY --from=rum-src /ext-src/ /ext-src/
|
||||
COPY --from=pgtap-src /ext-src/ /ext-src/
|
||||
#COPY --from=pgtap-src /ext-src/ /ext-src/
|
||||
COPY --from=ip4r-src /ext-src/ /ext-src/
|
||||
COPY --from=prefix-src /ext-src/ /ext-src/
|
||||
COPY --from=hll-src /ext-src/ /ext-src/
|
||||
|
||||
@@ -24,7 +24,6 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
|
||||
@@ -55,7 +55,7 @@ use signal_hook::{consts::SIGINT, iterator::Signals};
|
||||
use tracing::{error, info, warn};
|
||||
use url::Url;
|
||||
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
|
||||
use compute_tools::compute::{
|
||||
@@ -281,7 +281,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
info!("got spec from cli argument {}", spec_json);
|
||||
return Ok(CliSpecParams {
|
||||
spec: Some(serde_json::from_str(spec_json)?),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
live_config_allowed: false,
|
||||
});
|
||||
}
|
||||
@@ -291,7 +290,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
let file = File::open(Path::new(spec_path))?;
|
||||
return Ok(CliSpecParams {
|
||||
spec: Some(serde_json::from_reader(file)?),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
live_config_allowed: true,
|
||||
});
|
||||
}
|
||||
@@ -301,9 +299,8 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
};
|
||||
|
||||
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
|
||||
Ok(resp) => Ok(CliSpecParams {
|
||||
spec: resp.0,
|
||||
compute_ctl_config: resp.1,
|
||||
Ok(spec) => Ok(CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed: true,
|
||||
}),
|
||||
Err(e) => {
|
||||
@@ -320,8 +317,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
struct CliSpecParams {
|
||||
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
|
||||
spec: Option<ComputeSpec>,
|
||||
#[allow(dead_code)]
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
live_config_allowed: bool,
|
||||
}
|
||||
|
||||
@@ -331,7 +326,6 @@ fn wait_spec(
|
||||
CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed,
|
||||
compute_ctl_config: _,
|
||||
}: CliSpecParams,
|
||||
) -> Result<Arc<ComputeNode>> {
|
||||
let mut new_state = ComputeState::new();
|
||||
|
||||
@@ -11,9 +11,7 @@ use crate::migration::MigrationRunner;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
|
||||
use compute_api::responses::{
|
||||
ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
|
||||
};
|
||||
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
|
||||
use compute_api::spec::ComputeSpec;
|
||||
|
||||
// Do control plane request and return response if any. In case of error it
|
||||
@@ -75,13 +73,14 @@ fn do_control_plane_request(
|
||||
pub fn get_spec_from_control_plane(
|
||||
base_uri: &str,
|
||||
compute_id: &str,
|
||||
) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
|
||||
) -> Result<Option<ComputeSpec>> {
|
||||
let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
let mut attempt = 1;
|
||||
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
|
||||
|
||||
info!("getting spec from control plane: {}", cp_uri);
|
||||
|
||||
@@ -91,7 +90,7 @@ pub fn get_spec_from_control_plane(
|
||||
// - no spec for compute yet (Empty state) -> return Ok(None)
|
||||
// - got spec -> return Ok(Some(spec))
|
||||
while attempt < 4 {
|
||||
let result = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
spec = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
Ok(spec_resp) => {
|
||||
CPLANE_REQUESTS_TOTAL
|
||||
.with_label_values(&[
|
||||
@@ -100,10 +99,10 @@ pub fn get_spec_from_control_plane(
|
||||
])
|
||||
.inc();
|
||||
match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
|
||||
ControlPlaneComputeStatus::Empty => Ok(None),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok((Some(spec), spec_resp.compute_ctl_config))
|
||||
Ok(Some(spec))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
}
|
||||
@@ -122,10 +121,10 @@ pub fn get_spec_from_control_plane(
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = &result {
|
||||
if let Err(e) = &spec {
|
||||
error!("attempt {} to get spec failed with: {}", attempt, e);
|
||||
} else {
|
||||
return result;
|
||||
return spec;
|
||||
}
|
||||
|
||||
attempt += 1;
|
||||
@@ -133,9 +132,7 @@ pub fn get_spec_from_control_plane(
|
||||
}
|
||||
|
||||
// All attempts failed, return error.
|
||||
Err(anyhow::anyhow!(
|
||||
"Exhausted all attempts to retrieve the spec from the control plane"
|
||||
))
|
||||
spec
|
||||
}
|
||||
|
||||
/// Check `pg_hba.conf` and update if needed to allow external connections.
|
||||
|
||||
@@ -48,8 +48,6 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::responses::ComputeCtlConfig;
|
||||
use compute_api::spec::Database;
|
||||
use compute_api::spec::PgIdent;
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
@@ -882,13 +880,10 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
))
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.body(
|
||||
serde_json::to_string(&ConfigurationRequest {
|
||||
spec,
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.body(format!(
|
||||
"{{\"spec\":{}}}",
|
||||
serde_json::to_string_pretty(&spec)?
|
||||
))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -838,10 +838,7 @@ impl StorageController {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
|
||||
Some(TenantShardMigrateRequest {
|
||||
node_id,
|
||||
migration_config: None,
|
||||
}),
|
||||
Some(TenantShardMigrateRequest { node_id }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -609,10 +609,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest {
|
||||
node_id: node,
|
||||
migration_config: None,
|
||||
};
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
@@ -626,10 +623,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest {
|
||||
node_id: node,
|
||||
migration_config: None,
|
||||
};
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
@@ -1088,10 +1082,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
|
||||
Some(TenantShardMigrateRequest {
|
||||
node_id: mv.to,
|
||||
migration_config: None,
|
||||
}),
|
||||
Some(TenantShardMigrateRequest { node_id: mv.to }),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
|
||||
|
||||
@@ -71,7 +71,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
|
||||
# We are running tests now
|
||||
rm -f testout.txt testout_contrib.txt
|
||||
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
|
||||
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
|
||||
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
diff --git a/test/schedule/create.sql b/test/schedule/create.sql
|
||||
index ba355ed..7e250f5 100644
|
||||
--- a/test/schedule/create.sql
|
||||
+++ b/test/schedule/create.sql
|
||||
@@ -1,3 +1,2 @@
|
||||
\unset ECHO
|
||||
\i test/psql.sql
|
||||
-CREATE EXTENSION pgtap;
|
||||
diff --git a/test/schedule/main.sch b/test/schedule/main.sch
|
||||
index a8a5fbc..0463fc4 100644
|
||||
--- a/test/schedule/main.sch
|
||||
+++ b/test/schedule/main.sch
|
||||
@@ -1,2 +1 @@
|
||||
-test: build
|
||||
test: create
|
||||
@@ -1,6 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
patch -p1 <test-upgrade.patch
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=86 --schedule test/schedule/main.sch --schedule test/build/run.sch --dbname contrib_regression --use-existing
|
||||
@@ -41,8 +41,7 @@ EXTENSIONS='[
|
||||
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
|
||||
{"extname": "semver", "extdir": "pg_semver-src"},
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"},
|
||||
{"extname": "pgtap", "extdir": "pgtap-src"}
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"}
|
||||
]'
|
||||
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
|
||||
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
|
||||
|
||||
203
docs/rfcs/042-compute-pageserver-communicator.md
Normal file
203
docs/rfcs/042-compute-pageserver-communicator.md
Normal file
@@ -0,0 +1,203 @@
|
||||
# Compute <-> Pageserver Communicator Rewrite
|
||||
|
||||
## Summary
|
||||
|
||||
## Motivation
|
||||
|
||||
- prefetching logic is complicated
|
||||
- handling async libpq connections in C code are difficult and error-prone
|
||||
- only few people are comfortable working on the code
|
||||
- new AIO (maybe) coming up in Postgres v18
|
||||
- cannot process prefetch replies until another I/O function is called. Makes it impossible to accurately measure when a reply was received
|
||||
- every backend opens a separate connection to pageservers -> lots of connections, first query in backend is slow
|
||||
|
||||
- desire for better protocol, not libpq-based
|
||||
|
||||
- By writing the "communicator" as a separate rust module, it can be reused in
|
||||
tests, outside PostgreSQL.
|
||||
|
||||
## Non Goals (if relevant)
|
||||
|
||||
- We will keep LFC unmodified for now. It might be a good idea to rewrite it
|
||||
too, but it's out of scope here.
|
||||
|
||||
- We should consider a similar rewrite for the walproposer - safekeeper
|
||||
communication, but it's out of scope for this RFC
|
||||
|
||||
## Impacted components (e.g. pageserver, safekeeper, console, etc)
|
||||
|
||||
- Most changes are to the neon extension in compute.
|
||||
|
||||
- Pageserver, to implement the new protocol.
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
- we will use the new implementation with all PostgreSQL versions.
|
||||
- we will have a feature flag to switch between old and new communicator. Once we're
|
||||
comfortable with the new communicator, remove old code and protocol.
|
||||
|
||||
- What about relation size cache? Out of scope? Or move it to the communicator process,
|
||||
and have smgrnblocks() requests always through communicator process?
|
||||
|
||||
### Communicator process
|
||||
|
||||
There is one communicator process in the Postgres server. It's a background
|
||||
worker process. It handles all communication with the pageservers.
|
||||
|
||||
The communicator process is written in a mix of C and Rust. Mostly in Rust, but
|
||||
some C code is needed to interface with the Postgres facilities. For example:
|
||||
- logging
|
||||
- error handling (in a very limited form, we don't want to ereport() on most errors)
|
||||
- expose a shared memory area for IPC
|
||||
- signal other processes
|
||||
|
||||
We will write unsafe rust or C glue code for those facilities, which allow us to
|
||||
write the rest of the communicator in safe rust.
|
||||
|
||||
The Rust parts of the communicator process can use multiple threads and
|
||||
tokio. The glue code is written taken that into account, making it safe.
|
||||
|
||||
pqrx is a rust crate for writing Postgres extensions in Rust. We will _not_ use
|
||||
that. It's a fine crate, good for most extensions, but I don't think we need
|
||||
most of the facilities that it provides. Our wrappers are more low-level than
|
||||
what most extensions need. We don't expose SQL functions or types from this
|
||||
extension for example.
|
||||
|
||||
### Communicator <-> backend interface
|
||||
|
||||
The backends and the communicator process communicate via shared memory. Each
|
||||
backend has a fixed number of "request slots", forming a ring. When a backend
|
||||
wants to perform an I/O, it writes the request details like blk # and LSN, to
|
||||
the next available slot. The request also includes a pointer or buffer ID where
|
||||
the resulting page should be written. The backend then wakes up the
|
||||
communicator, with a signal/futex/latch or something, telling the communicator
|
||||
that it has work to do.
|
||||
|
||||
The communicator picks up the request from the backend's ring, and performs
|
||||
it. It writes result page to the address requested by the backend (most likely a
|
||||
shared buffer that the backend is holding a lock on), marks the request as
|
||||
completed, and wakes up the backend.
|
||||
|
||||
In this design, because each backend has its own small ring, a backend doesn't
|
||||
need to do any locking to manipulate the request slots. Similarly, after a
|
||||
request has been submitted, the communicator has temporary ownership of the
|
||||
request slot, and doesn't need to do locking on it.
|
||||
|
||||
This design is somewhat similar to how the upcoming AIO patches in PostgreSQL
|
||||
will work. That should make it easy to adapt to new PostgreSQL versions.
|
||||
|
||||
In the above example, I assumed a GetPage request, but everything applies to
|
||||
other other request types like "smgrnblocks" too.
|
||||
|
||||
Q: How we are going to handle backend termination while it is waiting for response? Should we allow it or wait request completion? If PS is down, it can take quite long time during which user will not be able to interrupt the query.
|
||||
|
||||
### Prefetching
|
||||
|
||||
A backend can also issue a "blind" prefetch request. When a communicator
|
||||
processes a blind prefetch request, it starts the GetPage request and writes the
|
||||
result to a local buffer within the communicator process. But it could also
|
||||
decide to do nothing, or to schedule the request with a lower priority. It
|
||||
doesn't give any result back to the requesting backend, hence it's "blind".
|
||||
Later, when the backend - or a different backend - requests the page that was
|
||||
prefetched and the prefetch was performed and completed, the communicator
|
||||
process can satisfy the request quickly from the private buffer.
|
||||
|
||||
In this design, the "prefetch" slots are shared by all backends. If one backend
|
||||
issues a prefetch request but never consumes it, but another backend reads the
|
||||
same page, the prefetch can be used to satisfy the request. (In our current
|
||||
implementaiton, it would be wasted, and the same GetPage is performed
|
||||
twice. https://github.com/neondatabase/neon/pull/10442 helps with that, but
|
||||
doesn't fully fix the problem)
|
||||
|
||||
### PostgreSQL versions < 17
|
||||
|
||||
- In 16 and below, prefetching calls are made without holding the buffer pinned.
|
||||
Backends will perform "blind" prefetch requests for smgrprefetch().
|
||||
|
||||
### PostgreSQL version 17
|
||||
|
||||
In version 17, when prefetching is requested, the pages are already pinned in
|
||||
the buffer manager. We possibly could write the page directly to the shared
|
||||
buffer, but there's a risk that the backend stops the scan and releases the pins
|
||||
without ever performing the real I/Os. Because of that, backends will perform
|
||||
blind prefetch requests like in v16; we can't easily take advantage of the
|
||||
pinned buffer.
|
||||
|
||||
### PostgreSQL version 18, if the AIO patches are committed
|
||||
|
||||
With the AIO patches, prefetching is no longer performed with posix_fadvise
|
||||
calls. The backends will start the prefetch I/Os "for real", into the locked
|
||||
shared buffer. On completion of an AIO, the process that processes the
|
||||
completion will have to call a small callback routine that releases the buffer
|
||||
lock and wakes up any processes waiting on the I/O. It'll require some care to
|
||||
execute that safely from the communicator process.
|
||||
|
||||
### Compute <-> Pageserver Protocol
|
||||
|
||||
As part of the project, we will change the protocol. Desires for new protocol:
|
||||
|
||||
- Use protobuf or something else more standard. Maybe gRPC. So that we can use
|
||||
standard tools like Wireshark to easily analyze the traffic.
|
||||
|
||||
- Batching. Have capability to request more than one page in one request.
|
||||
|
||||
In principle, changing the protocol is an independent change from the new
|
||||
communicator process. But it makes sense to do at the same time:
|
||||
|
||||
- Switching to Rust in the communicator process makes it possible to use
|
||||
existing libraries
|
||||
|
||||
- Using a library might help with managing the pool of pageserver connnection,
|
||||
so we want need to implement that ourselves
|
||||
|
||||
### Reliability, failure modes and corner cases (if relevant)
|
||||
|
||||
### Interaction/Sequence diagram (if relevant)
|
||||
|
||||
### Scalability (if relevant)
|
||||
|
||||
- Could the single communicator process become a bottleneck? In the new v18 AIO
|
||||
system, the process needs to execute all the I/O completion callbacks. They're
|
||||
very short, but I still wonder if a single process can handle it.
|
||||
|
||||
- Goal is to sustain ~2.5GB/s bandwidth (typical EC2 NIC).
|
||||
- John: I'd presume a single process to be capable of that if it's just passing buffers around. If we needed more than one in future it would probably be quite a small lift to make that happen (but I bet we never do). (I don't fully know what's involved in the execute all the I/O completion callbacks part though)
|
||||
|
||||
|
||||
### Security implications (if relevant)
|
||||
|
||||
- We currently use libpq authentication with a JWT token. We can continue to use
|
||||
the token for authentication in the new protocol.
|
||||
|
||||
### Unresolved questions (if relevant)
|
||||
|
||||
## Alternative implementation (if relevant)
|
||||
|
||||
I think UDP might also be a good fit for the protocol. No overhead of
|
||||
establishing or holding a connection. No head-of-line blocking; prefetch
|
||||
requests can be processed with lower priority. We would control our own
|
||||
destiny. But it has its own set of challenges: congestion control,
|
||||
authentication & encryption.
|
||||
|
||||
## Pros/cons of proposed approaches (if relevant)
|
||||
|
||||
## Definition of Done (if relevant)
|
||||
|
||||
New communicator has replaced the old code, deployed in production, old protocol
|
||||
support is removed.
|
||||
|
||||
Implentation phases:
|
||||
|
||||
- Implement new protocol in pageserver. In first prototype, maybe just
|
||||
wrap/convert the existing message types into HTTP+protobuf, to keep it simple.
|
||||
|
||||
- Implement the C/Rust wrappers needed to launch the communicator as a background
|
||||
worker process, with access to shard memory.
|
||||
|
||||
- Implement a simple request / response interface in shared memory between the
|
||||
backends and the communicator.
|
||||
|
||||
- Implement a minimalistic communicator: hold one connection to
|
||||
pageserver/shard. No prefetching. Process one request at a time
|
||||
|
||||
- Improve the communicator: multiple threads, multiple connections, prefetching
|
||||
@@ -7,7 +7,6 @@ license.workspace = true
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
regex.workspace = true
|
||||
|
||||
@@ -1,20 +1,18 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
use crate::{
|
||||
privilege::Privilege,
|
||||
responses::ComputeCtlConfig,
|
||||
spec::{ComputeSpec, ExtVersion, PgIdent},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Deserialize;
|
||||
|
||||
/// Request of the /configure API
|
||||
///
|
||||
/// We now pass only `spec` in the configuration request, but later we can
|
||||
/// extend it and something like `restart: bool` or something else. So put
|
||||
/// `spec` into a struct initially to be more flexible in the future.
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ConfigurationRequest {
|
||||
pub spec: ComputeSpec,
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use jsonwebtoken::jwk::JwkSet;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
|
||||
use crate::{
|
||||
@@ -136,27 +135,13 @@ pub struct CatalogObjects {
|
||||
pub databases: Vec<Database>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct ComputeCtlConfig {
|
||||
pub jwks: JwkSet,
|
||||
}
|
||||
|
||||
impl Default for ComputeCtlConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
jwks: JwkSet {
|
||||
keys: Vec::default(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||
/// This is not actually a compute API response, so consider moving
|
||||
/// to a different place.
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneSpecResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
pub status: ControlPlaneComputeStatus,
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
|
||||
|
||||
@@ -182,18 +182,6 @@ pub struct TenantDescribeResponseShard {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateRequest {
|
||||
pub node_id: NodeId,
|
||||
#[serde(default)]
|
||||
pub migration_config: Option<MigrationConfig>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MigrationConfig {
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub secondary_warmup_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub secondary_download_request_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Debug)]
|
||||
|
||||
@@ -1136,24 +1136,7 @@ pub struct TimelineInfo {
|
||||
pub ancestor_lsn: Option<Lsn>,
|
||||
pub last_record_lsn: Lsn,
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
|
||||
/// Legacy field for compat with control plane. Synonym of `min_readable_lsn`.
|
||||
/// TODO: remove once control plane no longer reads it.
|
||||
pub latest_gc_cutoff_lsn: Lsn,
|
||||
|
||||
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
|
||||
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
|
||||
/// as it is easier to reason about.
|
||||
pub applied_gc_cutoff_lsn: Lsn,
|
||||
|
||||
/// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
|
||||
/// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
|
||||
/// LSN at which it is legal to create a branch or ephemeral endpoint.
|
||||
///
|
||||
/// Note that holders of valid LSN leases may be able to create branches and read pages earlier
|
||||
/// than this LSN, but new leases may not be taken out earlier than this LSN.
|
||||
pub min_readable_lsn: Lsn,
|
||||
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
|
||||
/// The LSN that we have succesfully uploaded to remote storage
|
||||
|
||||
@@ -42,8 +42,8 @@ use utils::lsn::Lsn;
|
||||
pub enum BasebackupError {
|
||||
#[error("basebackup pageserver error {0:#}")]
|
||||
Server(#[from] anyhow::Error),
|
||||
#[error("basebackup client error {0:#} when {1}")]
|
||||
Client(#[source] io::Error, &'static str),
|
||||
#[error("basebackup client error {0:#}")]
|
||||
Client(#[source] io::Error),
|
||||
}
|
||||
|
||||
/// Create basebackup with non-rel data in it.
|
||||
@@ -234,7 +234,7 @@ where
|
||||
self.ar
|
||||
.append(&header, self.buf.as_slice())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "flush"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
self.total_blocks += nblocks;
|
||||
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
@@ -273,9 +273,9 @@ where
|
||||
for dir in subdirs.iter() {
|
||||
let header = new_tar_header_dir(dir)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball"))?;
|
||||
.context("could not add directory to basebackup tarball")?;
|
||||
}
|
||||
|
||||
// Send config files.
|
||||
@@ -286,13 +286,13 @@ where
|
||||
self.ar
|
||||
.append(&header, data)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
|
||||
.context("could not add config file to basebackup tarball")?;
|
||||
} else {
|
||||
let header = new_tar_header(filepath, 0)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_config_file"))?;
|
||||
.context("could not add config file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
if !lazy_slru_download {
|
||||
@@ -406,7 +406,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_aux_file"))?;
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
@@ -419,7 +419,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &data[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,restart.lsn"))?;
|
||||
.context("could not add restart.lsn file to basebackup tarball")?;
|
||||
}
|
||||
for xid in self
|
||||
.timeline
|
||||
@@ -451,9 +451,9 @@ where
|
||||
let crc32 = crc32c::crc32c(&content);
|
||||
content.extend_from_slice(&crc32.to_le_bytes());
|
||||
let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?;
|
||||
self.ar.append(&header, &*content).await.map_err(|e| {
|
||||
BasebackupError::Client(e, "send_tarball,pg_logical/replorigin_checkpoint")
|
||||
})?;
|
||||
self.ar.append(&header, &*content).await.context(
|
||||
"could not add pg_logical/replorigin_checkpoint file to basebackup tarball",
|
||||
)?;
|
||||
}
|
||||
|
||||
fail_point!("basebackup-before-control-file", |_| {
|
||||
@@ -464,10 +464,7 @@ where
|
||||
|
||||
// Generate pg_control and bootstrap WAL segment.
|
||||
self.add_pgcontrol_file().await?;
|
||||
self.ar
|
||||
.finish()
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,finish"))?;
|
||||
self.ar.finish().await.map_err(BasebackupError::Client)?;
|
||||
debug!("all tarred up!");
|
||||
Ok(())
|
||||
}
|
||||
@@ -485,9 +482,9 @@ where
|
||||
let file_name = dst.to_segfile_name(0);
|
||||
let header = new_tar_header(&file_name, 0)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_rel,empty"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -518,7 +515,7 @@ where
|
||||
self.ar
|
||||
.append(&header, segment_data.as_slice())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_rel,segment"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
seg += 1;
|
||||
startblk = endblk;
|
||||
@@ -569,7 +566,7 @@ where
|
||||
self.ar
|
||||
.append(&header, pg_version_str.as_bytes())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,PG_VERSION"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
info!("timeline.pg_version {}", self.timeline.pg_version);
|
||||
|
||||
@@ -579,7 +576,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &img[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,global/pg_filenode.map"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
} else {
|
||||
warn!("global/pg_filenode.map is missing");
|
||||
}
|
||||
@@ -615,9 +612,9 @@ where
|
||||
let path = format!("base/{}", dbnode);
|
||||
let header = new_tar_header_dir(&path)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
if let Some(img) = relmap_img {
|
||||
let dst_path = format!("base/{}/PG_VERSION", dbnode);
|
||||
@@ -630,14 +627,14 @@ where
|
||||
self.ar
|
||||
.append(&header, pg_version_str.as_bytes())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
|
||||
let header = new_tar_header(&relmap_path, img.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &img[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/pg_filenode.map"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
@@ -666,7 +663,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &buf[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_twophase_file"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -696,7 +693,7 @@ where
|
||||
zenith_signal.as_bytes(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
@@ -721,7 +718,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &pg_control_bytes[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,pg_control"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
//send wal segment
|
||||
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
|
||||
@@ -745,7 +742,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &wal_seg[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,wal_segment"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1080,10 +1080,7 @@ components:
|
||||
type: integer
|
||||
state:
|
||||
type: string
|
||||
min_readable_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
applied_gc_cutoff_lsn:
|
||||
latest_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
|
||||
@@ -482,11 +482,6 @@ async fn build_timeline_info_common(
|
||||
|
||||
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
|
||||
|
||||
let min_readable_lsn = std::cmp::max(
|
||||
timeline.get_gc_cutoff_lsn(),
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
);
|
||||
|
||||
let info = TimelineInfo {
|
||||
tenant_id: timeline.tenant_shard_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
@@ -498,12 +493,7 @@ async fn build_timeline_info_common(
|
||||
initdb_lsn,
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
// Externally, expose the lowest LSN that can be used to create a branch as the "GC cutoff", although internally
|
||||
// we distinguish between the "planned" GC cutoff (PITR point) and the "latest" GC cutoff (where we
|
||||
// actually trimmed data to), which can pass each other when PITR is changed.
|
||||
latest_gc_cutoff_lsn: min_readable_lsn,
|
||||
min_readable_lsn,
|
||||
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
|
||||
current_logical_size_is_accurate: match current_logical_size.accuracy() {
|
||||
tenant::timeline::logical_size::Accuracy::Approximate => false,
|
||||
|
||||
@@ -914,7 +914,7 @@ impl PageServerHandler {
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
&shard.get_latest_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
@@ -1810,7 +1810,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -1837,7 +1837,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -1864,7 +1864,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -1954,7 +1954,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamGetSlruSegmentRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -2050,8 +2050,7 @@ impl PageServerHandler {
|
||||
{
|
||||
fn map_basebackup_error(err: BasebackupError) -> QueryError {
|
||||
match err {
|
||||
// TODO: passthrough the error site to the final error message?
|
||||
BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
|
||||
BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
|
||||
BasebackupError::Server(e) => QueryError::Other(e),
|
||||
}
|
||||
}
|
||||
@@ -2072,7 +2071,7 @@ impl PageServerHandler {
|
||||
//return Err(QueryError::NotFound("timeline is archived".into()))
|
||||
}
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
info!("waiting for {}", lsn);
|
||||
@@ -2152,12 +2151,10 @@ impl PageServerHandler {
|
||||
.await
|
||||
.map_err(map_basebackup_error)?;
|
||||
}
|
||||
writer.flush().await.map_err(|e| {
|
||||
map_basebackup_error(BasebackupError::Client(
|
||||
e,
|
||||
"handle_basebackup_request,flush",
|
||||
))
|
||||
})?;
|
||||
writer
|
||||
.flush()
|
||||
.await
|
||||
.map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)
|
||||
|
||||
@@ -611,7 +611,7 @@ impl Timeline {
|
||||
) -> Result<LsnForTimestamp, PageReconstructError> {
|
||||
pausable_failpoint!("find-lsn-for-timestamp-pausable");
|
||||
|
||||
let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
|
||||
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
|
||||
let gc_cutoff_planned = {
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
gc_info.min_cutoff()
|
||||
|
||||
@@ -40,8 +40,6 @@ use remote_timeline_client::manifest::{
|
||||
use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use remote_timeline_client::FAILED_REMOTE_OP_RETRIES;
|
||||
use remote_timeline_client::FAILED_UPLOAD_WARN_THRESHOLD;
|
||||
use secondary::heatmap::HeatMapTenant;
|
||||
use secondary::heatmap::HeatMapTimeline;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
@@ -57,7 +55,6 @@ use timeline::offload::OffloadError;
|
||||
use timeline::CompactFlags;
|
||||
use timeline::CompactOptions;
|
||||
use timeline::CompactionError;
|
||||
use timeline::PreviousHeatmap;
|
||||
use timeline::ShutdownMode;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
@@ -265,7 +262,6 @@ struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
client: RemoteTimelineClient,
|
||||
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
}
|
||||
|
||||
pub(crate) struct TenantPreload {
|
||||
@@ -1132,7 +1128,6 @@ impl Tenant {
|
||||
resources: TimelineResources,
|
||||
mut index_part: IndexPart,
|
||||
metadata: TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
cause: LoadTimelineCause,
|
||||
ctx: &RequestContext,
|
||||
@@ -1163,7 +1158,6 @@ impl Tenant {
|
||||
let timeline = self.create_timeline_struct(
|
||||
timeline_id,
|
||||
&metadata,
|
||||
previous_heatmap,
|
||||
ancestor.clone(),
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
@@ -1563,18 +1557,8 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(vlad): Could go to S3 if the secondary is freezing cold and hasn't even
|
||||
// pulled the first heatmap. Not entirely necessary since the storage controller
|
||||
// will kick the secondary in any case and cause a download.
|
||||
let maybe_heatmap_at = self.read_on_disk_heatmap().await;
|
||||
|
||||
let timelines = self
|
||||
.load_timelines_metadata(
|
||||
remote_timeline_ids,
|
||||
remote_storage,
|
||||
maybe_heatmap_at,
|
||||
cancel,
|
||||
)
|
||||
.load_timelines_metadata(remote_timeline_ids, remote_storage, cancel)
|
||||
.await?;
|
||||
|
||||
Ok(TenantPreload {
|
||||
@@ -1587,26 +1571,6 @@ impl Tenant {
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_on_disk_heatmap(&self) -> Option<(HeatMapTenant, std::time::Instant)> {
|
||||
let on_disk_heatmap_path = self.conf.tenant_heatmap_path(&self.tenant_shard_id);
|
||||
match tokio::fs::read_to_string(on_disk_heatmap_path).await {
|
||||
Ok(heatmap) => match serde_json::from_str::<HeatMapTenant>(&heatmap) {
|
||||
Ok(heatmap) => Some((heatmap, std::time::Instant::now())),
|
||||
Err(err) => {
|
||||
error!("Failed to deserialize old heatmap: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(err) => match err.kind() {
|
||||
std::io::ErrorKind::NotFound => None,
|
||||
_ => {
|
||||
error!("Unexpected IO error reading old heatmap: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
@@ -1694,10 +1658,7 @@ impl Tenant {
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
|
||||
remote_index_and_client.insert(
|
||||
timeline_id,
|
||||
(index_part, preload.client, preload.previous_heatmap),
|
||||
);
|
||||
remote_index_and_client.insert(timeline_id, (index_part, preload.client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
info!(
|
||||
@@ -1716,7 +1677,7 @@ impl Tenant {
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client, previous_heatmap) = remote_index_and_client
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
@@ -1736,7 +1697,6 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
previous_heatmap,
|
||||
self.get_timeline_resources_for(remote_client),
|
||||
LoadTimelineCause::Attach,
|
||||
ctx,
|
||||
@@ -1886,13 +1846,11 @@ impl Tenant {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn load_remote_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
resources: TimelineResources,
|
||||
cause: LoadTimelineCause,
|
||||
ctx: &RequestContext,
|
||||
@@ -1922,7 +1880,6 @@ impl Tenant {
|
||||
resources,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
previous_heatmap,
|
||||
ancestor,
|
||||
cause,
|
||||
ctx,
|
||||
@@ -1934,29 +1891,14 @@ impl Tenant {
|
||||
self: &Arc<Tenant>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
heatmap: Option<(HeatMapTenant, std::time::Instant)>,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
|
||||
let mut timeline_heatmaps = heatmap.map(|h| (h.0.into_timelines_index(), h.1));
|
||||
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in timeline_ids {
|
||||
let cancel_clone = cancel.clone();
|
||||
|
||||
let previous_timeline_heatmap = timeline_heatmaps.as_mut().and_then(|hs| {
|
||||
hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active {
|
||||
heatmap: h,
|
||||
read_at: hs.1,
|
||||
})
|
||||
});
|
||||
part_downloads.spawn(
|
||||
self.load_timeline_metadata(
|
||||
timeline_id,
|
||||
remote_storage.clone(),
|
||||
previous_timeline_heatmap,
|
||||
cancel_clone,
|
||||
)
|
||||
.instrument(info_span!("download_index_part", %timeline_id)),
|
||||
self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone)
|
||||
.instrument(info_span!("download_index_part", %timeline_id)),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2004,7 +1946,6 @@ impl Tenant {
|
||||
self: &Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
cancel: CancellationToken,
|
||||
) -> impl Future<Output = TimelinePreload> {
|
||||
let client = self.build_timeline_client(timeline_id, remote_storage);
|
||||
@@ -2020,7 +1961,6 @@ impl Tenant {
|
||||
client,
|
||||
timeline_id,
|
||||
index_part,
|
||||
previous_heatmap,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2132,12 +2072,7 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
let timeline_preload = self
|
||||
.load_timeline_metadata(
|
||||
timeline_id,
|
||||
self.remote_storage.clone(),
|
||||
None,
|
||||
cancel.clone(),
|
||||
)
|
||||
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
|
||||
.await;
|
||||
|
||||
let index_part = match timeline_preload.index_part {
|
||||
@@ -2171,7 +2106,6 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
None,
|
||||
timeline_resources,
|
||||
LoadTimelineCause::Unoffload,
|
||||
&ctx,
|
||||
@@ -2887,7 +2821,7 @@ impl Tenant {
|
||||
};
|
||||
let metadata = index_part.metadata.clone();
|
||||
self
|
||||
.load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{
|
||||
.load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{
|
||||
create_guard: timeline_create_guard, activate, }, &ctx)
|
||||
.await?
|
||||
.ready_to_activate()
|
||||
@@ -4096,7 +4030,6 @@ impl Tenant {
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
resources: TimelineResources,
|
||||
cause: CreateTimelineCause,
|
||||
@@ -4120,7 +4053,6 @@ impl Tenant {
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
new_metadata,
|
||||
previous_heatmap,
|
||||
ancestor,
|
||||
new_timeline_id,
|
||||
self.tenant_shard_id,
|
||||
@@ -4763,24 +4695,24 @@ impl Tenant {
|
||||
// We check it against both the planned GC cutoff stored in 'gc_info',
|
||||
// and the 'latest_gc_cutoff' of the last GC that was performed. The
|
||||
// planned GC cutoff in 'gc_info' is normally larger than
|
||||
// 'applied_gc_cutoff_lsn', but beware of corner cases like if you just
|
||||
// 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
|
||||
// changed the GC settings for the tenant to make the PITR window
|
||||
// larger, but some of the data was already removed by an earlier GC
|
||||
// iteration.
|
||||
|
||||
// check against last actual 'latest_gc_cutoff' first
|
||||
let applied_gc_cutoff_lsn = src_timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
|
||||
{
|
||||
let gc_info = src_timeline.gc_info.read().unwrap();
|
||||
let planned_cutoff = gc_info.min_cutoff();
|
||||
if gc_info.lsn_covered_by_lease(start_lsn) {
|
||||
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *applied_gc_cutoff_lsn);
|
||||
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *latest_gc_cutoff_lsn);
|
||||
} else {
|
||||
src_timeline
|
||||
.check_lsn_is_in_scope(start_lsn, &applied_gc_cutoff_lsn)
|
||||
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
|
||||
.context(format!(
|
||||
"invalid branch start lsn: less than latest GC cutoff {}",
|
||||
*applied_gc_cutoff_lsn,
|
||||
*latest_gc_cutoff_lsn,
|
||||
))
|
||||
.map_err(CreateTimelineError::AncestorLsn)?;
|
||||
|
||||
@@ -4819,7 +4751,7 @@ impl Tenant {
|
||||
dst_prev,
|
||||
Some(src_id),
|
||||
start_lsn,
|
||||
*src_timeline.applied_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
src_timeline.initdb_lsn,
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
@@ -5192,7 +5124,6 @@ impl Tenant {
|
||||
.create_timeline_struct(
|
||||
new_timeline_id,
|
||||
new_metadata,
|
||||
None,
|
||||
ancestor,
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
@@ -6199,8 +6130,8 @@ mod tests {
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
|
||||
let applied_gc_cutoff_lsn = tline.get_applied_gc_cutoff_lsn();
|
||||
assert!(*applied_gc_cutoff_lsn > Lsn(0x25));
|
||||
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
|
||||
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
|
||||
match tline.get(*TEST_KEY, Lsn(0x25)) {
|
||||
Ok(_) => panic!("request for page should have failed"),
|
||||
Err(err) => assert!(err.to_string().contains("not found at")),
|
||||
@@ -8496,7 +8427,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -8604,7 +8535,7 @@ mod tests {
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x40))
|
||||
.wait()
|
||||
@@ -8772,8 +8703,8 @@ mod tests {
|
||||
|
||||
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
|
||||
info!(
|
||||
"applied_gc_cutoff_lsn: {}",
|
||||
*timeline.get_applied_gc_cutoff_lsn()
|
||||
"latest_gc_cutoff_lsn: {}",
|
||||
*timeline.get_latest_gc_cutoff_lsn()
|
||||
);
|
||||
timeline.force_set_disk_consistent_lsn(end_lsn);
|
||||
|
||||
@@ -8799,7 +8730,7 @@ mod tests {
|
||||
|
||||
// Make lease on a already GC-ed LSN.
|
||||
// 0/80 does not have a valid lease + is below latest_gc_cutoff
|
||||
assert!(Lsn(0x80) < *timeline.get_applied_gc_cutoff_lsn());
|
||||
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
|
||||
timeline
|
||||
.init_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx)
|
||||
.expect_err("lease request on GC-ed LSN should fail");
|
||||
@@ -8990,7 +8921,7 @@ mod tests {
|
||||
};
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -9077,7 +9008,7 @@ mod tests {
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x40))
|
||||
.wait()
|
||||
@@ -9530,7 +9461,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -9677,7 +9608,7 @@ mod tests {
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x38))
|
||||
.wait()
|
||||
@@ -9778,7 +9709,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -10029,7 +9960,7 @@ mod tests {
|
||||
|
||||
{
|
||||
parent_tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x10))
|
||||
.wait()
|
||||
@@ -10049,7 +9980,7 @@ mod tests {
|
||||
|
||||
{
|
||||
branch_tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x50))
|
||||
.wait()
|
||||
@@ -10405,7 +10336,7 @@ mod tests {
|
||||
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -10790,7 +10721,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -11041,7 +10972,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
|
||||
@@ -130,10 +130,7 @@ struct TimelineMetadataBodyV2 {
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<TimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
|
||||
// The LSN at which GC was last executed. Synonym of [`Timeline::applied_gc_cutoff_lsn`].
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::HashMap, time::SystemTime};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
|
||||
use utils::{generation::Generation, id::TimelineId};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapTenant {
|
||||
pub(super) struct HeatMapTenant {
|
||||
/// Generation of the attached location that uploaded the heatmap: this is not required
|
||||
/// for correctness, but acts as a hint to secondary locations in order to detect thrashing
|
||||
/// in the unlikely event that two attached locations are both uploading conflicting heatmaps.
|
||||
@@ -25,17 +25,8 @@ pub(crate) struct HeatMapTenant {
|
||||
pub(super) upload_period_ms: Option<u128>,
|
||||
}
|
||||
|
||||
impl HeatMapTenant {
|
||||
pub(crate) fn into_timelines_index(self) -> HashMap<TimelineId, HeatMapTimeline> {
|
||||
self.timelines
|
||||
.into_iter()
|
||||
.map(|htl| (htl.timeline_id, htl))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapTimeline {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
@@ -44,13 +35,13 @@ pub(crate) struct HeatMapTimeline {
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapLayer {
|
||||
pub(crate) name: LayerName,
|
||||
pub(crate) metadata: LayerFileMetadata,
|
||||
|
||||
#[serde_as(as = "TimestampSeconds<i64>")]
|
||||
pub(crate) access_time: SystemTime,
|
||||
pub(super) access_time: SystemTime,
|
||||
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
}
|
||||
|
||||
@@ -394,7 +394,7 @@ pub(super) async fn gather_inputs(
|
||||
ancestor_lsn,
|
||||
last_record: last_record_lsn,
|
||||
// this is not used above, because it might not have updated recently enough
|
||||
latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
|
||||
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
next_pitr_cutoff,
|
||||
retention_param_cutoff,
|
||||
lease_points,
|
||||
|
||||
@@ -136,22 +136,6 @@ pub(crate) fn local_layer_path(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum LastEviction {
|
||||
Never,
|
||||
At(std::time::Instant),
|
||||
Evicting,
|
||||
}
|
||||
|
||||
impl LastEviction {
|
||||
pub(crate) fn happened_after(&self, timepoint: std::time::Instant) -> bool {
|
||||
match self {
|
||||
LastEviction::Never => false,
|
||||
LastEviction::At(evicted_at) => evicted_at > &timepoint,
|
||||
LastEviction::Evicting => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
/// Creates a layer value for a file we know to not be resident.
|
||||
pub(crate) fn for_evicted(
|
||||
@@ -421,17 +405,6 @@ impl Layer {
|
||||
self.0.metadata()
|
||||
}
|
||||
|
||||
pub(crate) fn last_evicted_at(&self) -> LastEviction {
|
||||
match self.0.last_evicted_at.try_lock() {
|
||||
Ok(lock) => match *lock {
|
||||
None => LastEviction::Never,
|
||||
Some(at) => LastEviction::At(at),
|
||||
},
|
||||
Err(std::sync::TryLockError::WouldBlock) => LastEviction::Evicting,
|
||||
Err(std::sync::TryLockError::Poisoned(p)) => panic!("Lock poisoned: {p}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.0
|
||||
.timeline
|
||||
@@ -683,9 +656,7 @@ struct LayerInner {
|
||||
|
||||
/// When the Layer was last evicted but has not been downloaded since.
|
||||
///
|
||||
/// This is used for skipping evicted layers from the previous heatmap (see
|
||||
/// `[Timeline::generate_heatmap]`) and for updating metrics
|
||||
/// (see [`LayerImplMetrics::redownload_after`]).
|
||||
/// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
|
||||
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -150,15 +150,16 @@ use super::{
|
||||
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
|
||||
MaybeOffloaded,
|
||||
};
|
||||
use super::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline,
|
||||
};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{
|
||||
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
|
||||
storage_layer::ReadableLayer,
|
||||
};
|
||||
use super::{secondary::heatmap::HeatMapLayer, GcError};
|
||||
use super::{
|
||||
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
|
||||
GcError,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
use pageserver_api::value::Value;
|
||||
@@ -351,11 +352,8 @@ pub struct Timeline {
|
||||
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
|
||||
|
||||
// The LSN at which we have executed GC: whereas [`Self::gc_info`] records the LSN at which
|
||||
// we _intend_ to GC (i.e. the PITR cutoff), this LSN records where we actually last did it.
|
||||
// Because PITR interval is mutable, it's possible for this LSN to be earlier or later than
|
||||
// the planned GC cutoff.
|
||||
pub applied_gc_cutoff_lsn: Rcu<Lsn>,
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
|
||||
|
||||
pub(crate) gc_compaction_layer_update_lock: tokio::sync::RwLock<()>,
|
||||
|
||||
@@ -464,16 +462,6 @@ pub struct Timeline {
|
||||
|
||||
/// If Some, collects GetPage metadata for an ongoing PageTrace.
|
||||
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
|
||||
|
||||
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
Active {
|
||||
heatmap: HeatMapTimeline,
|
||||
read_at: std::time::Instant,
|
||||
},
|
||||
Obsolete,
|
||||
}
|
||||
|
||||
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
|
||||
@@ -1089,15 +1077,9 @@ impl Timeline {
|
||||
(history, gc_info.within_ancestor_pitr)
|
||||
}
|
||||
|
||||
/// Read timeline's GC cutoff: this is the LSN at which GC has started to happen
|
||||
pub(crate) fn get_applied_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.applied_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
/// Read timeline's planned GC cutoff: this is the logical end of history that users
|
||||
/// are allowed to read (based on configured PITR), even if physically we have more history.
|
||||
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
|
||||
self.gc_info.read().unwrap().cutoffs.time
|
||||
/// Lock and get timeline's GC cutoff
|
||||
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
/// Look up given page version.
|
||||
@@ -1605,7 +1587,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
if init || validate {
|
||||
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
|
||||
if lsn < *latest_gc_cutoff_lsn {
|
||||
bail!("tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
|
||||
}
|
||||
@@ -2577,7 +2559,6 @@ impl Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
metadata: &TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -2678,7 +2659,7 @@ impl Timeline {
|
||||
LastImageLayerCreationStatus::default(),
|
||||
)),
|
||||
|
||||
applied_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
|
||||
current_logical_size: if disk_consistent_lsn.is_valid() {
|
||||
@@ -2740,8 +2721,6 @@ impl Timeline {
|
||||
create_idempotency,
|
||||
|
||||
page_trace: Default::default(),
|
||||
|
||||
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -3480,52 +3459,12 @@ impl Timeline {
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
// Firstly, if there's any heatmap left over from when this location
|
||||
// was a secondary, take that into account. Keep layers that are:
|
||||
// * present in the layer map
|
||||
// * visible
|
||||
// * non-resident
|
||||
// * not evicted since we read the heatmap
|
||||
//
|
||||
// Without this, a new cold, attached location would clobber the previous
|
||||
// heatamp.
|
||||
let previous_heatmap = self.previous_heatmap.load();
|
||||
let visible_non_resident = match previous_heatmap.as_deref() {
|
||||
Some(PreviousHeatmap::Active { heatmap, read_at }) => {
|
||||
Some(heatmap.layers.iter().filter_map(|hl| {
|
||||
let desc: PersistentLayerDesc = hl.name.clone().into();
|
||||
let layer = guard.try_get_from_key(&desc.key())?;
|
||||
|
||||
if layer.visibility() == LayerVisibilityHint::Covered {
|
||||
return None;
|
||||
}
|
||||
|
||||
if layer.is_likely_resident() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if layer.last_evicted_at().happened_after(*read_at) {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((desc, hl.metadata.clone(), hl.access_time))
|
||||
}))
|
||||
}
|
||||
Some(PreviousHeatmap::Obsolete) => None,
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Secondly, all currently visible, resident layers are included.
|
||||
let resident = guard.likely_resident_layers().filter_map(|layer| {
|
||||
match layer.visibility() {
|
||||
LayerVisibilityHint::Visible => {
|
||||
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
|
||||
let last_activity_ts = layer.latest_activity();
|
||||
Some((
|
||||
layer.layer_desc().clone(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
))
|
||||
Some((layer.layer_desc(), layer.metadata(), last_activity_ts))
|
||||
}
|
||||
LayerVisibilityHint::Covered => {
|
||||
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
|
||||
@@ -3534,18 +3473,7 @@ impl Timeline {
|
||||
}
|
||||
});
|
||||
|
||||
let mut layers = match visible_non_resident {
|
||||
Some(non_resident) => {
|
||||
let mut non_resident = non_resident.peekable();
|
||||
if non_resident.peek().is_none() {
|
||||
self.previous_heatmap
|
||||
.store(Some(PreviousHeatmap::Obsolete.into()));
|
||||
}
|
||||
|
||||
non_resident.chain(resident).collect::<Vec<_>>()
|
||||
}
|
||||
None => resident.collect::<Vec<_>>(),
|
||||
};
|
||||
let mut layers = resident.collect::<Vec<_>>();
|
||||
|
||||
// Sort layers in order of which to download first. For a large set of layers to download, we
|
||||
// want to prioritize those layers which are most likely to still be in the resident many minutes
|
||||
@@ -3734,7 +3662,7 @@ impl Timeline {
|
||||
// the timeline, then it will remove layers that are required for fulfilling
|
||||
// the current get request (read-path cannot "look back" and notice the new
|
||||
// image layer).
|
||||
let _gc_cutoff_holder = timeline.get_applied_gc_cutoff_lsn();
|
||||
let _gc_cutoff_holder = timeline.get_latest_gc_cutoff_lsn();
|
||||
|
||||
// See `compaction::compact_with_gc` for why we need this.
|
||||
let _guard = timeline.gc_compaction_layer_update_lock.read().await;
|
||||
@@ -4421,7 +4349,7 @@ impl Timeline {
|
||||
let update = crate::tenant::metadata::MetadataUpdate::new(
|
||||
disk_consistent_lsn,
|
||||
ondisk_prev_record_lsn,
|
||||
*self.applied_gc_cutoff_lsn.read(),
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
);
|
||||
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
@@ -5649,7 +5577,7 @@ impl Timeline {
|
||||
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
|
||||
// cannot advance beyond what was already GC'd, and respect space-based retention
|
||||
GcCutoffs {
|
||||
time: *self.get_applied_gc_cutoff_lsn(),
|
||||
time: *self.get_latest_gc_cutoff_lsn(),
|
||||
space: space_cutoff,
|
||||
}
|
||||
}
|
||||
@@ -5770,7 +5698,7 @@ impl Timeline {
|
||||
let mut result: GcResult = GcResult::default();
|
||||
|
||||
// Nothing to GC. Return early.
|
||||
let latest_gc_cutoff = *self.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
|
||||
if latest_gc_cutoff >= new_gc_cutoff {
|
||||
info!(
|
||||
"Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
|
||||
@@ -5784,7 +5712,7 @@ impl Timeline {
|
||||
//
|
||||
// The GC cutoff should only ever move forwards.
|
||||
let waitlist = {
|
||||
let write_guard = self.applied_gc_cutoff_lsn.lock_for_write();
|
||||
let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
|
||||
if *write_guard > new_gc_cutoff {
|
||||
return Err(GcError::BadLsn {
|
||||
why: format!(
|
||||
@@ -6724,32 +6652,18 @@ fn is_send() {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::Instrument;
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::tenant::{
|
||||
harness::{test_img, TenantHarness},
|
||||
layer_map::LayerMap,
|
||||
storage_layer::{Layer, LayerName, LayerVisibilityHint},
|
||||
storage_layer::{Layer, LayerName},
|
||||
timeline::{DeltaLayerTestDesc, EvictionError},
|
||||
PreviousHeatmap, Timeline,
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use super::HeatMapTimeline;
|
||||
|
||||
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
|
||||
assert_eq!(lhs.layers.len(), rhs.layers.len());
|
||||
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
|
||||
for (l, r) in lhs_rhs {
|
||||
assert_eq!(l.name, r.name);
|
||||
assert_eq!(l.metadata, r.metadata);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_heatmap_generation() {
|
||||
let harness = TenantHarness::create("heatmap_generation").await.unwrap();
|
||||
@@ -6823,7 +6737,7 @@ mod tests {
|
||||
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
|
||||
|
||||
let mut last_lsn = Lsn::MAX;
|
||||
for layer in &heatmap.layers {
|
||||
for layer in heatmap.layers {
|
||||
// Covered layer should be omitted
|
||||
assert!(layer.name != covered_delta.layer_name());
|
||||
|
||||
@@ -6838,144 +6752,6 @@ mod tests {
|
||||
last_lsn = layer_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
// Evict all the layers and stash the old heatmap in the timeline.
|
||||
// This simulates a migration to a cold secondary location.
|
||||
|
||||
let guard = timeline.layers.read().await;
|
||||
let mut all_layers = Vec::new();
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
for layer in guard.likely_resident_layers() {
|
||||
all_layers.push(layer.clone());
|
||||
layer.evict_and_wait(forever).await.unwrap();
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
timeline
|
||||
.previous_heatmap
|
||||
.store(Some(Arc::new(PreviousHeatmap::Active {
|
||||
heatmap: heatmap.clone(),
|
||||
read_at: std::time::Instant::now(),
|
||||
})));
|
||||
|
||||
// Generate a new heatmap and assert that it contains the same layers as the old one.
|
||||
let post_migration_heatmap = timeline.generate_heatmap().await.unwrap();
|
||||
assert_heatmaps_have_same_layers(&heatmap, &post_migration_heatmap);
|
||||
|
||||
// Download each layer one by one. Generate the heatmap at each step and check
|
||||
// that it's stable.
|
||||
for layer in all_layers {
|
||||
if layer.visibility() == LayerVisibilityHint::Covered {
|
||||
continue;
|
||||
}
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident()
|
||||
.instrument(tracing::info_span!(
|
||||
parent: None,
|
||||
"download_layer",
|
||||
tenant_id = %timeline.tenant_shard_id.tenant_id,
|
||||
shard_id = %timeline.tenant_shard_id.shard_slug(),
|
||||
timeline_id = %timeline.timeline_id
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post_download_heatmap = timeline.generate_heatmap().await.unwrap();
|
||||
assert_heatmaps_have_same_layers(&heatmap, &post_download_heatmap);
|
||||
}
|
||||
|
||||
// Everything from the post-migration heatmap is now resident.
|
||||
// Check that we drop it from memory.
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_previous_heatmap_obsoletion() {
|
||||
let harness = TenantHarness::create("heatmap_previous_heatmap_obsoletion")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let l0_delta = DeltaLayerTestDesc::new(
|
||||
Lsn(0x20)..Lsn(0x30),
|
||||
Key::from_hex("000000000000000000000000000000000000").unwrap()
|
||||
..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(),
|
||||
vec![(
|
||||
Key::from_hex("720000000033333333444444445500000000").unwrap(),
|
||||
Lsn(0x25),
|
||||
Value::Image(test_img("foo")),
|
||||
)],
|
||||
);
|
||||
|
||||
let image_layer = (
|
||||
Lsn(0x40),
|
||||
vec![(
|
||||
Key::from_hex("620000000033333333444444445500000000").unwrap(),
|
||||
test_img("bar"),
|
||||
)],
|
||||
);
|
||||
|
||||
let delta_layers = vec![l0_delta];
|
||||
let image_layers = vec![image_layer];
|
||||
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TimelineId::generate(),
|
||||
Lsn(0x10),
|
||||
14,
|
||||
&ctx,
|
||||
delta_layers,
|
||||
image_layers,
|
||||
Lsn(0x100),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Layer visibility is an input to heatmap generation, so refresh it first
|
||||
timeline.update_layer_visibility().await.unwrap();
|
||||
|
||||
let heatmap = timeline
|
||||
.generate_heatmap()
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
// Both layers should be in the heatmap
|
||||
assert!(!heatmap.layers.is_empty());
|
||||
|
||||
// Now simulate a migration.
|
||||
timeline
|
||||
.previous_heatmap
|
||||
.store(Some(Arc::new(PreviousHeatmap::Active {
|
||||
heatmap: heatmap.clone(),
|
||||
read_at: std::time::Instant::now(),
|
||||
})));
|
||||
|
||||
// Evict all the layers in the previous heatmap
|
||||
let guard = timeline.layers.read().await;
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
for layer in guard.likely_resident_layers() {
|
||||
layer.evict_and_wait(forever).await.unwrap();
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
// Generate a new heatmap and check that the previous heatmap
|
||||
// has been marked obsolete.
|
||||
let post_eviction_heatmap = timeline
|
||||
.generate_heatmap()
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
assert!(post_eviction_heatmap.layers.is_empty());
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -852,7 +852,7 @@ impl Timeline {
|
||||
//
|
||||
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
|
||||
// are rewriting layers.
|
||||
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
|
||||
|
||||
tracing::info!(
|
||||
"latest_gc_cutoff: {}, pitr cutoff {}",
|
||||
@@ -2202,7 +2202,7 @@ impl Timeline {
|
||||
|
||||
// TODO: ensure the child branches will not use anything below the watermark, or consider
|
||||
// them when computing the watermark.
|
||||
gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
|
||||
gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
|
||||
}
|
||||
|
||||
/// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
|
||||
|
||||
@@ -294,7 +294,6 @@ impl DeleteTimelineFlow {
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
None, // Ancestor is not needed for deletion.
|
||||
None, // Previous heatmap is not needed for deletion
|
||||
tenant.get_timeline_resources_for(remote_client),
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
|
||||
@@ -496,8 +496,7 @@ pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
|
||||
/// bad storage or bad configuration, and we can't fix that from inside
|
||||
/// a running process.
|
||||
pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
|
||||
let backtrace = std::backtrace::Backtrace::force_capture();
|
||||
tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
|
||||
tracing::error!("Fatal I/O error: {e}: {context})");
|
||||
std::process::abort();
|
||||
}
|
||||
|
||||
@@ -948,18 +947,13 @@ impl VirtualFileInner {
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
let file_guard = match self
|
||||
.lock_file()
|
||||
.await
|
||||
.maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
|
||||
{
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
|
||||
if let Ok(size) = res {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&[
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
#include "utils/guc.h"
|
||||
|
||||
#include "extension_server.h"
|
||||
#include "extension_server.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
static int extension_server_port = 0;
|
||||
@@ -45,7 +45,7 @@ neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
handle = alloc_curl_handle();
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 60L /* seconds */ );
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ );
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||
|
||||
@@ -3765,7 +3765,7 @@ neon_dbsize(Oid dbNode)
|
||||
* neon_truncate() -- Truncate relation to specified number of blocks.
|
||||
*/
|
||||
static void
|
||||
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
|
||||
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
|
||||
@@ -3780,7 +3780,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdtruncate(reln, forknum, old_blocks, nblocks);
|
||||
mdtruncate(reln, forknum, nblocks);
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -3818,7 +3818,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
mdtruncate(reln, forknum, old_blocks, nblocks);
|
||||
mdtruncate(reln, forknum, nblocks);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ static void inmem_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, BlockNumber nblocks);
|
||||
static BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
|
||||
static void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber old_blocks, BlockNumber nblocks);
|
||||
BlockNumber nblocks);
|
||||
static void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static void inmem_registersync(SMgrRelation reln, ForkNumber forknum);
|
||||
@@ -345,7 +345,7 @@ inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
* inmem_truncate() -- Truncate relation to specified number of blocks.
|
||||
*/
|
||||
static void
|
||||
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
|
||||
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@@ -501,7 +501,7 @@ impl Session {
|
||||
_guard: Metrics::get()
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
.guard(RedisMsgKind::HDel),
|
||||
.guard(RedisMsgKind::HSet),
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
|
||||
use http_utils::error::HttpErrorBody;
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
|
||||
use std::error::Error as _;
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
@@ -32,9 +32,6 @@ pub enum Error {
|
||||
/// Status is not ok; parsed error in body as `HttpErrorBody`.
|
||||
#[error("safekeeper API: {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -127,10 +124,9 @@ impl Client {
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
|
||||
pub async fn utilization(&self) -> Result<reqwest::Response> {
|
||||
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
|
||||
let resp = self.get(&uri).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
async fn post<B: serde::Serialize, U: IntoUrl>(
|
||||
|
||||
@@ -626,7 +626,7 @@ pub fn make_router(
|
||||
failpoints_handler(r, cancel).await
|
||||
})
|
||||
})
|
||||
.get("/v1/utilization", |r| request_span(r, utilization_handler))
|
||||
.get("/v1/uzilization", |r| request_span(r, utilization_handler))
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
|
||||
@@ -310,12 +310,9 @@ impl WalBackupTask {
|
||||
retry_attempt = 0;
|
||||
}
|
||||
Err(e) => {
|
||||
// We might have managed to upload some segment even though
|
||||
// some later in the range failed, so log backup_lsn
|
||||
// separately.
|
||||
error!(
|
||||
"failed while offloading range {}-{}, backup_lsn {}: {:?}",
|
||||
backup_lsn, commit_lsn, backup_lsn, e
|
||||
"failed while offloading range {}-{}: {:?}",
|
||||
backup_lsn, commit_lsn, e
|
||||
);
|
||||
|
||||
retry_attempt = retry_attempt.saturating_add(1);
|
||||
@@ -341,13 +338,6 @@ async fn backup_lsn_range(
|
||||
let start_lsn = *backup_lsn;
|
||||
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
|
||||
|
||||
info!(
|
||||
"offloading segnos {:?} of range [{}-{})",
|
||||
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
);
|
||||
|
||||
// Pool of concurrent upload tasks. We use `FuturesOrdered` to
|
||||
// preserve order of uploads, and update `backup_lsn` only after
|
||||
// all previous uploads are finished.
|
||||
@@ -384,10 +374,10 @@ async fn backup_lsn_range(
|
||||
}
|
||||
|
||||
info!(
|
||||
"offloaded segnos {:?} of range [{}-{})",
|
||||
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
|
||||
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
start_lsn,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -26,15 +26,12 @@ humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
governor = {version = "0.8.0"}
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use safekeeper_api::models::SafekeeperUtilization;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
future::Future,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -13,15 +9,15 @@ use tokio_util::sync::CancellationToken;
|
||||
use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
|
||||
|
||||
use thiserror::Error;
|
||||
use utils::{id::NodeId, logging::SecretString};
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::{node::Node, safekeeper::Safekeeper};
|
||||
use crate::node::Node;
|
||||
|
||||
struct HeartbeaterTask<Server, State> {
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
|
||||
struct HeartbeaterTask {
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
|
||||
cancel: CancellationToken,
|
||||
|
||||
state: HashMap<NodeId, State>,
|
||||
state: HashMap<NodeId, PageserverState>,
|
||||
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
@@ -40,17 +36,8 @@ pub(crate) enum PageserverState {
|
||||
Offline,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum SafekeeperState {
|
||||
Available {
|
||||
last_seen_at: Instant,
|
||||
utilization: SafekeeperUtilization,
|
||||
},
|
||||
Offline,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
|
||||
pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum HeartbeaterError {
|
||||
@@ -58,28 +45,23 @@ pub(crate) enum HeartbeaterError {
|
||||
Cancel,
|
||||
}
|
||||
|
||||
struct HeartbeatRequest<Server, State> {
|
||||
servers: Arc<HashMap<NodeId, Server>>,
|
||||
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
|
||||
struct HeartbeatRequest {
|
||||
pageservers: Arc<HashMap<NodeId, Node>>,
|
||||
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
|
||||
}
|
||||
|
||||
pub(crate) struct Heartbeater<Server, State> {
|
||||
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
|
||||
pub(crate) struct Heartbeater {
|
||||
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
|
||||
}
|
||||
|
||||
#[allow(private_bounds)]
|
||||
impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
|
||||
where
|
||||
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
|
||||
{
|
||||
impl Heartbeater {
|
||||
pub(crate) fn new(
|
||||
jwt_token: Option<String>,
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
let (sender, receiver) =
|
||||
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
|
||||
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
|
||||
let mut heartbeater = HeartbeaterTask::new(
|
||||
receiver,
|
||||
jwt_token,
|
||||
@@ -94,12 +76,12 @@ where
|
||||
|
||||
pub(crate) async fn heartbeat(
|
||||
&self,
|
||||
servers: Arc<HashMap<NodeId, Server>>,
|
||||
) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
|
||||
pageservers: Arc<HashMap<NodeId, Node>>,
|
||||
) -> Result<AvailablityDeltas, HeartbeaterError> {
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
self.sender
|
||||
.send(HeartbeatRequest {
|
||||
servers,
|
||||
pageservers,
|
||||
reply: sender,
|
||||
})
|
||||
.map_err(|_| HeartbeaterError::Cancel)?;
|
||||
@@ -111,12 +93,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Server, State: Debug> HeartbeaterTask<Server, State>
|
||||
where
|
||||
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
|
||||
{
|
||||
impl HeartbeaterTask {
|
||||
fn new(
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
|
||||
jwt_token: Option<String>,
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
@@ -131,13 +110,14 @@ where
|
||||
jwt_token,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(&mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
request = self.receiver.recv() => {
|
||||
match request {
|
||||
Some(req) => {
|
||||
let res = self.heartbeat(req.servers).await;
|
||||
let res = self.heartbeat(req.pageservers).await;
|
||||
req.reply.send(res).unwrap();
|
||||
},
|
||||
None => { return; }
|
||||
@@ -147,20 +127,11 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait HeartBeat<Server, State> {
|
||||
fn heartbeat(
|
||||
&mut self,
|
||||
pageservers: Arc<HashMap<NodeId, Server>>,
|
||||
) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
|
||||
}
|
||||
|
||||
impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
|
||||
async fn heartbeat(
|
||||
&mut self,
|
||||
pageservers: Arc<HashMap<NodeId, Node>>,
|
||||
) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
|
||||
) -> Result<AvailablityDeltas, HeartbeaterError> {
|
||||
let mut new_state = HashMap::new();
|
||||
|
||||
let mut heartbeat_futs = FuturesUnordered::new();
|
||||
@@ -301,121 +272,3 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
Ok(AvailablityDeltas(deltas))
|
||||
}
|
||||
}
|
||||
|
||||
impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
|
||||
async fn heartbeat(
|
||||
&mut self,
|
||||
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
|
||||
) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
|
||||
let mut new_state = HashMap::new();
|
||||
|
||||
let mut heartbeat_futs = FuturesUnordered::new();
|
||||
for (node_id, sk) in &*safekeepers {
|
||||
heartbeat_futs.push({
|
||||
let jwt_token = self
|
||||
.jwt_token
|
||||
.as_ref()
|
||||
.map(|t| SecretString::from(t.to_owned()));
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
async move {
|
||||
let response = sk
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_utilization().await },
|
||||
&jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(1),
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
let status = match response {
|
||||
Ok(utilization) => SafekeeperState::Available {
|
||||
last_seen_at: Instant::now(),
|
||||
utilization,
|
||||
},
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
// This indicates cancellation of the request.
|
||||
// We ignore the node in this case.
|
||||
return None;
|
||||
}
|
||||
Err(_) => SafekeeperState::Offline,
|
||||
};
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
let maybe_status = tokio::select! {
|
||||
next = heartbeat_futs.next() => {
|
||||
match next {
|
||||
Some(result) => result,
|
||||
None => { break; }
|
||||
}
|
||||
},
|
||||
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
|
||||
};
|
||||
|
||||
if let Some((node_id, status)) = maybe_status {
|
||||
new_state.insert(node_id, status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut offline = 0;
|
||||
for state in new_state.values() {
|
||||
match state {
|
||||
SafekeeperState::Offline { .. } => offline += 1,
|
||||
SafekeeperState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Heartbeat round complete for {} safekeepers, {} offline",
|
||||
new_state.len(),
|
||||
offline
|
||||
);
|
||||
|
||||
let mut deltas = Vec::new();
|
||||
let now = Instant::now();
|
||||
for (node_id, sk_state) in new_state.iter_mut() {
|
||||
use std::collections::hash_map::Entry::*;
|
||||
let entry = self.state.entry(*node_id);
|
||||
|
||||
let mut needs_update = false;
|
||||
match entry {
|
||||
Occupied(ref occ) => match (occ.get(), &sk_state) {
|
||||
(SafekeeperState::Offline, SafekeeperState::Offline) => {}
|
||||
(SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
|
||||
if now - *last_seen_at >= self.max_offline_interval {
|
||||
deltas.push((*node_id, sk_state.clone()));
|
||||
needs_update = true;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
deltas.push((*node_id, sk_state.clone()));
|
||||
needs_update = true;
|
||||
}
|
||||
},
|
||||
Vacant(_) => {
|
||||
// This is a new node. Don't generate a delta for it.
|
||||
deltas.push((*node_id, sk_state.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
match entry {
|
||||
Occupied(mut occ) if needs_update => {
|
||||
(*occ.get_mut()) = sk_state.clone();
|
||||
}
|
||||
Vacant(vac) => {
|
||||
vac.insert(sk_state.clone());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AvailablityDeltas(deltas))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use crate::reconciler::ReconcileError;
|
||||
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
|
||||
use anyhow::Context;
|
||||
use futures::Future;
|
||||
use governor::{Quota, RateLimiter};
|
||||
use http_utils::{
|
||||
endpoint::{self, auth_middleware, check_permission_with, request_span},
|
||||
error::ApiError,
|
||||
@@ -33,7 +32,6 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use std::num::NonZero;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -518,14 +516,6 @@ async fn handle_tenant_timeline_block_unblock_gc(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
static PASSTHROUGH_RATE_LIMITER: std::sync::OnceLock<
|
||||
RateLimiter<
|
||||
TenantId,
|
||||
governor::state::keyed::DefaultKeyedStateStore<TenantId>,
|
||||
governor::clock::DefaultClock,
|
||||
>,
|
||||
> = std::sync::OnceLock::new();
|
||||
|
||||
async fn handle_tenant_timeline_passthrough(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -547,19 +537,6 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
|
||||
|
||||
// Proxied requests are expected to be rare on a per-tenant basis: these are things
|
||||
// like inspecting a timeline's details or doing an LSN<->timestamp mapping. Not anything
|
||||
// that has high throughput.
|
||||
let limiter = PASSTHROUGH_RATE_LIMITER.get_or_init(|| {
|
||||
RateLimiter::new(
|
||||
Quota::per_second(NonZero::new(10).unwrap()),
|
||||
governor::state::keyed::DefaultKeyedStateStore::new(),
|
||||
governor::clock::DefaultClock::default(),
|
||||
)
|
||||
});
|
||||
|
||||
limiter.until_key_ready(&tenant_id).await;
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
|
||||
|
||||
@@ -17,8 +17,6 @@ mod pageserver_client;
|
||||
mod peer_client;
|
||||
pub mod persistence;
|
||||
mod reconciler;
|
||||
mod safekeeper;
|
||||
mod safekeeper_client;
|
||||
mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
|
||||
@@ -80,11 +80,6 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Count of HTTP requests to the safekeeper that resulted in an error,
|
||||
/// broken down by the safekeeper node id, request name and method
|
||||
pub(crate) storage_controller_safekeeper_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Latency of HTTP requests to the pageserver, broken down by pageserver
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
@@ -92,13 +87,6 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
|
||||
pub(crate) storage_controller_safekeeper_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
|
||||
/// broken down by the pageserver node id, request name and method
|
||||
pub(crate) storage_controller_passthrough_request_error:
|
||||
|
||||
@@ -1185,6 +1185,23 @@ impl Persistence {
|
||||
Ok(safekeepers)
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeeper_get(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<SafekeeperPersistence, DatabaseError> {
|
||||
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
|
||||
self.with_conn(move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(safekeepers
|
||||
.filter(id_column.eq(&id))
|
||||
.select(SafekeeperPersistence::as_select())
|
||||
.get_result(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeeper_upsert(
|
||||
&self,
|
||||
record: SafekeeperUpsert,
|
||||
@@ -1537,21 +1554,6 @@ pub(crate) struct SafekeeperPersistence {
|
||||
}
|
||||
|
||||
impl SafekeeperPersistence {
|
||||
pub(crate) fn from_upsert(
|
||||
upsert: SafekeeperUpsert,
|
||||
scheduling_policy: SkSchedulingPolicy,
|
||||
) -> Self {
|
||||
crate::persistence::SafekeeperPersistence {
|
||||
id: upsert.id,
|
||||
region_id: upsert.region_id,
|
||||
version: upsert.version,
|
||||
host: upsert.host,
|
||||
port: upsert.port,
|
||||
http_port: upsert.http_port,
|
||||
availability_zone_id: upsert.availability_zone_id,
|
||||
scheduling_policy: String::from(scheduling_policy),
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
let scheduling_policy =
|
||||
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
};
|
||||
@@ -162,22 +162,6 @@ impl ReconcilerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&MigrationConfig> for ReconcilerConfig {
|
||||
fn from(value: &MigrationConfig) -> Self {
|
||||
let mut builder = ReconcilerConfigBuilder::new();
|
||||
|
||||
if let Some(timeout) = value.secondary_warmup_timeout {
|
||||
builder = builder.secondary_warmup_timeout(timeout)
|
||||
}
|
||||
|
||||
if let Some(timeout) = value.secondary_download_request_timeout {
|
||||
builder = builder.secondary_download_request_timeout(timeout)
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
|
||||
pub(crate) struct ReconcileUnits {
|
||||
_sem_units: tokio::sync::OwnedSemaphorePermit,
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
use std::{str::FromStr, time::Duration};
|
||||
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
use reqwest::StatusCode;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{backoff, id::NodeId, logging::SecretString};
|
||||
|
||||
use crate::{
|
||||
heartbeater::SafekeeperState,
|
||||
persistence::{DatabaseError, SafekeeperPersistence},
|
||||
safekeeper_client::SafekeeperClient,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Safekeeper {
|
||||
pub(crate) skp: SafekeeperPersistence,
|
||||
cancel: CancellationToken,
|
||||
listen_http_addr: String,
|
||||
listen_http_port: u16,
|
||||
id: NodeId,
|
||||
availability: SafekeeperState,
|
||||
}
|
||||
|
||||
impl Safekeeper {
|
||||
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
|
||||
Self {
|
||||
cancel,
|
||||
listen_http_addr: skp.host.clone(),
|
||||
listen_http_port: skp.http_port as u16,
|
||||
id: NodeId(skp.id as u64),
|
||||
skp,
|
||||
availability: SafekeeperState::Offline,
|
||||
}
|
||||
}
|
||||
pub(crate) fn base_url(&self) -> String {
|
||||
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
|
||||
}
|
||||
|
||||
pub(crate) fn get_id(&self) -> NodeId {
|
||||
self.id
|
||||
}
|
||||
pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
self.skp.as_describe_response()
|
||||
}
|
||||
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
|
||||
self.availability = availability;
|
||||
}
|
||||
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
&self,
|
||||
mut op: O,
|
||||
jwt: &Option<SecretString>,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> mgmt_api::Result<T>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F,
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>>,
|
||||
{
|
||||
fn is_fatal(e: &mgmt_api::Error) -> bool {
|
||||
use mgmt_api::Error::*;
|
||||
match e {
|
||||
ReceiveBody(_) | ReceiveErrorBody(_) => false,
|
||||
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
|
||||
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
|
||||
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
|
||||
ApiError(_, _) => true,
|
||||
Cancelled => true,
|
||||
}
|
||||
}
|
||||
|
||||
backoff::retry(
|
||||
|| {
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.timeout(timeout)
|
||||
.build()
|
||||
.expect("Failed to construct HTTP client");
|
||||
|
||||
let client = SafekeeperClient::from_client(
|
||||
self.get_id(),
|
||||
http_client,
|
||||
self.base_url(),
|
||||
jwt.clone(),
|
||||
);
|
||||
|
||||
let node_cancel_fut = self.cancel.cancelled();
|
||||
|
||||
let op_fut = op(client);
|
||||
|
||||
async {
|
||||
tokio::select! {
|
||||
r = op_fut=> {r},
|
||||
_ = node_cancel_fut => {
|
||||
Err(mgmt_api::Error::Cancelled)
|
||||
}}
|
||||
}
|
||||
},
|
||||
is_fatal,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
&format!(
|
||||
"Call to node {} ({}:{}) management API",
|
||||
self.id, self.listen_http_addr, self.listen_http_port
|
||||
),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(mgmt_api::Error::Cancelled))
|
||||
}
|
||||
|
||||
pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
|
||||
let crate::persistence::SafekeeperUpsert {
|
||||
active: _,
|
||||
availability_zone_id: _,
|
||||
host,
|
||||
http_port,
|
||||
id,
|
||||
port: _,
|
||||
region_id: _,
|
||||
version: _,
|
||||
} = record.clone();
|
||||
if id != self.id.0 as i64 {
|
||||
// The way the function is called ensures this. If we regress on that, it's a bug.
|
||||
panic!(
|
||||
"id can't be changed via update_from_record function: {id} != {}",
|
||||
self.id.0
|
||||
);
|
||||
}
|
||||
self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
|
||||
record,
|
||||
SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
|
||||
);
|
||||
self.listen_http_port = http_port as u16;
|
||||
self.listen_http_addr = host;
|
||||
}
|
||||
}
|
||||
@@ -1,105 +0,0 @@
|
||||
use crate::metrics::PageserverRequestLabelGroup;
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_client::mgmt_api::{Client, Result};
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
///
|
||||
/// Analogous to [`crate::pageserver_client::PageserverClient`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SafekeeperClient {
|
||||
inner: Client,
|
||||
node_id_label: String,
|
||||
}
|
||||
|
||||
macro_rules! measured_request {
|
||||
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
|
||||
let labels = PageserverRequestLabelGroup {
|
||||
pageserver_id: $node_id,
|
||||
path: $name,
|
||||
method: $method,
|
||||
};
|
||||
|
||||
let latency = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safekeeper_request_latency;
|
||||
let _timer_guard = latency.start_timer(labels.clone());
|
||||
|
||||
let res = $invoke;
|
||||
|
||||
if res.is_err() {
|
||||
let error_counters = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_pageserver_request_error;
|
||||
error_counters.inc(labels)
|
||||
}
|
||||
|
||||
res
|
||||
}};
|
||||
}
|
||||
|
||||
impl SafekeeperClient {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(
|
||||
node_id: NodeId,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_client(
|
||||
node_id: NodeId,
|
||||
raw_client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
req: &TimelineCreateRequest,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"create_timeline",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.create_timeline(req).await
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"delete_timeline",
|
||||
crate::metrics::Method::Delete,
|
||||
&self.node_id_label,
|
||||
self.inner.delete_timeline(tenant_id, timeline_id).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_utilization(&self) -> Result<SafekeeperUtilization> {
|
||||
measured_request!(
|
||||
"utilization",
|
||||
crate::metrics::Method::Get,
|
||||
&self.node_id_label,
|
||||
self.inner.utilization().await
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
|
||||
use hyper::Uri;
|
||||
use safekeeper_api::models::SafekeeperUtilization;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
@@ -21,7 +20,6 @@ use crate::{
|
||||
},
|
||||
compute_hook::{self, NotifyError},
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
heartbeater::SafekeeperState,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
leadership::Leadership,
|
||||
metrics,
|
||||
@@ -31,7 +29,6 @@ use crate::{
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
safekeeper::Safekeeper,
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
|
||||
@@ -209,8 +206,6 @@ struct ServiceState {
|
||||
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
|
||||
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
|
||||
|
||||
scheduler: Scheduler,
|
||||
|
||||
/// Ongoing background operation on the cluster if any is running.
|
||||
@@ -277,7 +272,6 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
impl ServiceState {
|
||||
fn new(
|
||||
nodes: HashMap<NodeId, Node>,
|
||||
safekeepers: HashMap<NodeId, Safekeeper>,
|
||||
tenants: BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: Scheduler,
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
@@ -289,7 +283,6 @@ impl ServiceState {
|
||||
leadership_status: initial_leadership_status,
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
safekeepers: Arc::new(safekeepers),
|
||||
scheduler,
|
||||
ongoing_operation: None,
|
||||
delayed_reconcile_rx,
|
||||
@@ -306,23 +299,6 @@ impl ServiceState {
|
||||
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn parts_mut_sk(
|
||||
&mut self,
|
||||
) -> (
|
||||
&mut Arc<HashMap<NodeId, Node>>,
|
||||
&mut Arc<HashMap<NodeId, Safekeeper>>,
|
||||
&mut BTreeMap<TenantShardId, TenantShard>,
|
||||
&mut Scheduler,
|
||||
) {
|
||||
(
|
||||
&mut self.nodes,
|
||||
&mut self.safekeepers,
|
||||
&mut self.tenants,
|
||||
&mut self.scheduler,
|
||||
)
|
||||
}
|
||||
|
||||
fn get_leadership_status(&self) -> LeadershipStatus {
|
||||
self.leadership_status
|
||||
}
|
||||
@@ -421,8 +397,7 @@ pub struct Service {
|
||||
compute_hook: Arc<ComputeHook>,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
|
||||
heartbeater_ps: Heartbeater<Node, PageserverState>,
|
||||
heartbeater_sk: Heartbeater<Safekeeper, SafekeeperState>,
|
||||
heartbeater: Heartbeater,
|
||||
|
||||
// Channel for background cleanup from failed operations that require cleanup, such as shard split
|
||||
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
|
||||
@@ -632,8 +607,7 @@ impl Service {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.nodes.clone()
|
||||
};
|
||||
let (mut nodes_online, mut sks_online) =
|
||||
self.initial_heartbeat_round(all_nodes.keys()).await;
|
||||
let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
|
||||
|
||||
// List of tenants for which we will attempt to notify compute of their location at startup
|
||||
let mut compute_notifications = Vec::new();
|
||||
@@ -642,7 +616,7 @@ impl Service {
|
||||
tracing::info!("Populating tenant shards' states from initial pageserver scan...");
|
||||
let shard_count = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, safekeepers, tenants, scheduler) = locked.parts_mut_sk();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
@@ -654,17 +628,6 @@ impl Service {
|
||||
}
|
||||
*nodes = Arc::new(new_nodes);
|
||||
|
||||
let mut new_sks = (**safekeepers).clone();
|
||||
for (node_id, node) in new_sks.iter_mut() {
|
||||
if let Some((utilization, last_seen_at)) = sks_online.remove(node_id) {
|
||||
node.set_availability(SafekeeperState::Available {
|
||||
utilization,
|
||||
last_seen_at,
|
||||
});
|
||||
}
|
||||
}
|
||||
*safekeepers = Arc::new(new_sks);
|
||||
|
||||
for (tenant_shard_id, observed_state) in observed.0 {
|
||||
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
for node_id in observed_state.locations.keys() {
|
||||
@@ -773,10 +736,7 @@ impl Service {
|
||||
async fn initial_heartbeat_round<'a>(
|
||||
&self,
|
||||
node_ids: impl Iterator<Item = &'a NodeId>,
|
||||
) -> (
|
||||
HashMap<NodeId, PageserverUtilization>,
|
||||
HashMap<NodeId, (SafekeeperUtilization, Instant)>,
|
||||
) {
|
||||
) -> HashMap<NodeId, PageserverUtilization> {
|
||||
assert!(!self.startup_complete.is_ready());
|
||||
|
||||
let all_nodes = {
|
||||
@@ -796,20 +756,14 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
let all_sks = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
tracing::info!("Sending initial heartbeats...");
|
||||
let res_ps = self
|
||||
.heartbeater_ps
|
||||
let res = self
|
||||
.heartbeater
|
||||
.heartbeat(Arc::new(nodes_to_heartbeat))
|
||||
.await;
|
||||
let res_sk = self.heartbeater_sk.heartbeat(all_sks).await;
|
||||
|
||||
let mut online_nodes = HashMap::new();
|
||||
if let Ok(deltas) = res_ps {
|
||||
if let Ok(deltas) = res {
|
||||
for (node_id, status) in deltas.0 {
|
||||
match status {
|
||||
PageserverState::Available { utilization, .. } => {
|
||||
@@ -823,22 +777,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
let mut online_sks = HashMap::new();
|
||||
if let Ok(deltas) = res_sk {
|
||||
for (node_id, status) in deltas.0 {
|
||||
match status {
|
||||
SafekeeperState::Available {
|
||||
utilization,
|
||||
last_seen_at,
|
||||
} => {
|
||||
online_sks.insert(node_id, (utilization, last_seen_at));
|
||||
}
|
||||
SafekeeperState::Offline => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(online_nodes, online_sks)
|
||||
online_nodes
|
||||
}
|
||||
|
||||
/// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
|
||||
@@ -1045,14 +984,8 @@ impl Service {
|
||||
locked.nodes.clone()
|
||||
};
|
||||
|
||||
let safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
let res_ps = self.heartbeater_ps.heartbeat(nodes).await;
|
||||
let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await;
|
||||
if let Ok(deltas) = res_ps {
|
||||
let res = self.heartbeater.heartbeat(nodes).await;
|
||||
if let Ok(deltas) = res {
|
||||
let mut to_handle = Vec::default();
|
||||
|
||||
for (node_id, state) in deltas.0 {
|
||||
@@ -1153,18 +1086,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Ok(deltas) = res_sk {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
for (id, state) in deltas.0 {
|
||||
let Some(sk) = safekeepers.get_mut(&id) else {
|
||||
tracing::info!("Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}");
|
||||
continue;
|
||||
};
|
||||
sk.set_availability(state);
|
||||
}
|
||||
locked.safekeepers = Arc::new(safekeepers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1390,17 +1311,6 @@ impl Service {
|
||||
.storage_controller_pageserver_nodes
|
||||
.set(nodes.len() as i64);
|
||||
|
||||
tracing::info!("Loading safekeepers from database...");
|
||||
let safekeepers = persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
|
||||
.collect::<Vec<_>>();
|
||||
let safekeepers: HashMap<NodeId, Safekeeper> =
|
||||
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
|
||||
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
|
||||
|
||||
tracing::info!("Loading shards from database...");
|
||||
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
|
||||
tracing::info!(
|
||||
@@ -1527,14 +1437,7 @@ impl Service {
|
||||
let cancel = CancellationToken::new();
|
||||
let reconcilers_cancel = cancel.child_token();
|
||||
|
||||
let heartbeater_ps = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let heartbeater_sk = Heartbeater::new(
|
||||
let heartbeater = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
@@ -1550,7 +1453,6 @@ impl Service {
|
||||
let this = Arc::new(Self {
|
||||
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
|
||||
nodes,
|
||||
safekeepers,
|
||||
tenants,
|
||||
scheduler,
|
||||
delayed_reconcile_rx,
|
||||
@@ -1560,8 +1462,7 @@ impl Service {
|
||||
persistence,
|
||||
compute_hook: Arc::new(ComputeHook::new(config.clone())),
|
||||
result_tx,
|
||||
heartbeater_ps,
|
||||
heartbeater_sk,
|
||||
heartbeater,
|
||||
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
|
||||
config.reconciler_concurrency,
|
||||
)),
|
||||
@@ -5213,12 +5114,7 @@ impl Service {
|
||||
shard.sequence = shard.sequence.next();
|
||||
}
|
||||
|
||||
let reconciler_config = match migrate_req.migration_config {
|
||||
Some(cfg) => (&cfg).into(),
|
||||
None => ReconcilerConfig::default(),
|
||||
};
|
||||
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
|
||||
self.maybe_reconcile_shard(shard, nodes)
|
||||
};
|
||||
|
||||
if let Some(waiter) = waiter {
|
||||
@@ -7765,54 +7661,29 @@ impl Service {
|
||||
pub(crate) async fn safekeepers_list(
|
||||
&self,
|
||||
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut list = locked
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.1.describe_response())
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
list.sort_by_key(|v| v.id);
|
||||
Ok(list)
|
||||
self.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|v| v.as_describe_response())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
}
|
||||
|
||||
pub(crate) async fn get_safekeeper(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let sk = locked
|
||||
.safekeepers
|
||||
.get(&NodeId(id as u64))
|
||||
.ok_or(diesel::result::Error::NotFound)?;
|
||||
sk.describe_response()
|
||||
self.persistence
|
||||
.safekeeper_get(id)
|
||||
.await
|
||||
.and_then(|v| v.as_describe_response())
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_safekeeper(
|
||||
&self,
|
||||
record: crate::persistence::SafekeeperUpsert,
|
||||
) -> Result<(), DatabaseError> {
|
||||
let node_id = NodeId(record.id as u64);
|
||||
self.persistence.safekeeper_upsert(record.clone()).await?;
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
match safekeepers.entry(node_id) {
|
||||
std::collections::hash_map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().update_from_record(record);
|
||||
}
|
||||
std::collections::hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert(Safekeeper::from_persistence(
|
||||
crate::persistence::SafekeeperPersistence::from_upsert(
|
||||
record,
|
||||
SkSchedulingPolicy::Pause,
|
||||
),
|
||||
CancellationToken::new(),
|
||||
));
|
||||
}
|
||||
}
|
||||
locked.safekeepers = Arc::new(safekeepers);
|
||||
}
|
||||
Ok(())
|
||||
self.persistence.safekeeper_upsert(record).await
|
||||
}
|
||||
|
||||
pub(crate) async fn set_safekeeper_scheduling_policy(
|
||||
@@ -7822,20 +7693,7 @@ impl Service {
|
||||
) -> Result<(), DatabaseError> {
|
||||
self.persistence
|
||||
.set_safekeeper_scheduling_policy(id, scheduling_policy)
|
||||
.await?;
|
||||
let node_id = NodeId(id as u64);
|
||||
// After the change has been persisted successfully, update the in-memory state
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
let sk = safekeepers
|
||||
.get_mut(&node_id)
|
||||
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
|
||||
sk.skp.scheduling_policy = String::from(scheduling_policy);
|
||||
|
||||
locked.safekeepers = Arc::new(safekeepers);
|
||||
}
|
||||
Ok(())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn update_shards_preferred_azs(
|
||||
|
||||
@@ -3,7 +3,6 @@ from __future__ import annotations
|
||||
import abc
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import dataclasses
|
||||
import filecmp
|
||||
import json
|
||||
import os
|
||||
@@ -1676,12 +1675,6 @@ class StorageControllerLeadershipStatus(StrEnum):
|
||||
CANDIDATE = "candidate"
|
||||
|
||||
|
||||
@dataclass
|
||||
class StorageControllerMigrationConfig:
|
||||
secondary_warmup_timeout: str | None
|
||||
secondary_download_request_timeout: str | None
|
||||
|
||||
|
||||
class NeonStorageController(MetricsGetter, LogUtils):
|
||||
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
|
||||
self.env = env
|
||||
@@ -2075,20 +2068,11 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
shards: list[TenantShardId] = body["new_shards"]
|
||||
return shards
|
||||
|
||||
def tenant_shard_migrate(
|
||||
self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
dest_ps_id: int,
|
||||
config: StorageControllerMigrationConfig | None = None,
|
||||
):
|
||||
payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}
|
||||
if config is not None:
|
||||
payload["migration_config"] = dataclasses.asdict(config)
|
||||
|
||||
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate",
|
||||
json=payload,
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
|
||||
@@ -4988,13 +4972,8 @@ def check_restored_datadir_content(
|
||||
|
||||
restored_files = list_files_to_compare(restored_dir_path)
|
||||
|
||||
# pg_notify files are always ignored
|
||||
pgdata_files = [f for f in pgdata_files if not f.startswith("pg_notify")]
|
||||
restored_files = [f for f in restored_files if not f.startswith("pg_notify")]
|
||||
|
||||
# pg_xact and pg_multixact files are optional in basebackup: depending on our configuration they
|
||||
# may be omitted and loaded on demand.
|
||||
if pgdata_files != restored_files:
|
||||
# filter pg_xact and multixact files which are downloaded on demand
|
||||
pgdata_files = [
|
||||
f
|
||||
for f in pgdata_files
|
||||
|
||||
@@ -231,14 +231,14 @@ def test_pgdata_import_smoke(
|
||||
shard_zero_http = shard_zero_ps.http_client()
|
||||
shard_zero_timeline_info = shard_zero_http.timeline_detail(shard_zero["shard_id"], timeline_id)
|
||||
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
|
||||
min_readable_lsn = Lsn(shard_zero_timeline_info["min_readable_lsn"])
|
||||
latest_gc_cutoff_lsn = Lsn(shard_zero_timeline_info["latest_gc_cutoff_lsn"])
|
||||
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
|
||||
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
|
||||
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
|
||||
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
|
||||
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
|
||||
assert remote_consistent_lsn_visible == disk_consistent_lsn
|
||||
assert initdb_lsn == min_readable_lsn
|
||||
assert initdb_lsn == latest_gc_cutoff_lsn
|
||||
assert disk_consistent_lsn == initdb_lsn + 8
|
||||
assert last_record_lsn == disk_consistent_lsn
|
||||
# TODO: assert these values are the same everywhere
|
||||
|
||||
@@ -10,18 +10,14 @@ from typing import TYPE_CHECKING
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
StorageControllerMigrationConfig,
|
||||
)
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
|
||||
from fixtures.utils import run_only_on_default_postgres, skip_in_debug_build, wait_until
|
||||
from fixtures.utils import skip_in_debug_build, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -893,93 +889,3 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll
|
||||
assert progress_3["heatmap_mtime"] is not None
|
||||
assert progress_3["layers_total"] == progress_3["layers_downloaded"]
|
||||
assert progress_3["bytes_total"] == progress_3["bytes_downloaded"]
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@run_only_on_default_postgres("PG version is not interesting here")
|
||||
def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Attached":1}')
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
ps_attached = env.get_pageserver(attached_to_id)
|
||||
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
|
||||
|
||||
# Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis)
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.stop()
|
||||
|
||||
# Expect lots of layers
|
||||
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
|
||||
|
||||
# Simulate large data by making layer downloads artifically slow
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
|
||||
|
||||
# Upload a heatmap, so that secondaries have something to download
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
|
||||
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
|
||||
# However, it pulls the heatmap, which will be important later.
|
||||
http_client = env.storage_controller.pageserver_api()
|
||||
(status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
|
||||
assert status == 202
|
||||
assert progress["heatmap_mtime"] is not None
|
||||
assert progress["layers_downloaded"] > 0
|
||||
assert progress["bytes_downloaded"] > 0
|
||||
assert progress["layers_total"] > progress["layers_downloaded"]
|
||||
assert progress["bytes_total"] > progress["bytes_downloaded"]
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Timed out.*downloading layers.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Use a custom configuration that gives up earlier than usual.
|
||||
# We can't hydrate everything anyway because of the failpoints.
|
||||
config = StorageControllerMigrationConfig(
|
||||
secondary_warmup_timeout="5s", secondary_download_request_timeout="2s"
|
||||
)
|
||||
env.storage_controller.tenant_shard_migrate(
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id
|
||||
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0
|
||||
|
||||
# The new layer map should contain all the layers in the pre-migration one
|
||||
# and a new in memory layer
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) + 1 == len(
|
||||
heatmap_after_migration["timelines"][0]["layers"]
|
||||
)
|
||||
|
||||
log.info(
|
||||
f'Heatmap size after cold migration is {len(heatmap_after_migration["timelines"][0]["layers"])}'
|
||||
)
|
||||
|
||||
# TODO: Once we have an endpoint for rescuing the cold location, exercise it here.
|
||||
|
||||
@@ -261,7 +261,7 @@ def test_isolation(
|
||||
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)
|
||||
|
||||
# This fails with a mismatch on `pg_multixact/offsets/0000`
|
||||
post_checks(env, test_output_dir, DBNAME, endpoint)
|
||||
# post_checks(env, test_output_dir, DBNAME, endpoint)
|
||||
|
||||
|
||||
# Run extra Neon-specific pg_regress-based tests. The tests and their
|
||||
|
||||
@@ -287,7 +287,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
# Do some update so we can increment gc_cutoff
|
||||
# Do some update so we can increment latest_gc_cutoff
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
# Wait for the existing lease to expire.
|
||||
|
||||
@@ -1821,7 +1821,7 @@ def test_sharding_gc(
|
||||
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
".*could not find data for key.*",
|
||||
".*could not find data for key 020000000000000000000000000000000000.*",
|
||||
".*could not ingest record.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -3189,17 +3189,15 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
assert len(target.get_safekeepers()) == 0
|
||||
|
||||
sk_0 = env.safekeepers[0]
|
||||
|
||||
body = {
|
||||
"active": True,
|
||||
"id": fake_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "localhost",
|
||||
"port": sk_0.port.pg,
|
||||
"http_port": sk_0.port.http,
|
||||
"host": "safekeeper-333.us-east-2.aws.neon.build",
|
||||
"port": 6401,
|
||||
"http_port": 7676,
|
||||
"version": 5957,
|
||||
"availability_zone_id": "us-east-2b",
|
||||
}
|
||||
@@ -3245,13 +3243,6 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
# Ensure idempotency
|
||||
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
|
||||
|
||||
def storcon_heartbeat():
|
||||
assert env.storage_controller.log_contains(
|
||||
"Heartbeat round complete for 1 safekeepers, 0 offline"
|
||||
)
|
||||
|
||||
wait_until(storcon_heartbeat)
|
||||
|
||||
|
||||
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
compared = [dict(a), dict(b)]
|
||||
|
||||
@@ -318,7 +318,7 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
|
||||
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
".*could not find data for key.*",
|
||||
".*could not find data for key 020000000000000000000000000000000000.*",
|
||||
".*could not ingest record.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -566,14 +566,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
|
||||
assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix)
|
||||
|
||||
|
||||
# This test is flaky, probably because PUTs of local fs storage are not atomic.
|
||||
# Let's keep both remote storage kinds for a while to see if this is the case.
|
||||
# https://github.com/neondatabase/neon/issues/10761
|
||||
@pytest.mark.parametrize("remote_storage_kind", [s3_storage(), RemoteStorageKind.LOCAL_FS])
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 62a86dfc91...c0aedfd3ca
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 80ed91ce25...355a7c69d3
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 999cf81b10...13cf5d06c9
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 4d3a722312...4c45d78ad5
16
vendor/revisions.json
vendored
16
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.3",
|
||||
"4d3a722312b496ff7378156caa6d41c2e70c30e4"
|
||||
"17.2",
|
||||
"4c45d78ad587e4bcb4a5a7ef6931b88c6a3d575d"
|
||||
],
|
||||
"v16": [
|
||||
"16.7",
|
||||
"999cf81b101ead40e597d5cd729458d8200f4537"
|
||||
"16.6",
|
||||
"13cf5d06c98a8e9b0590ce6cdfd193a08d0a7792"
|
||||
],
|
||||
"v15": [
|
||||
"15.11",
|
||||
"80ed91ce255c765d25be0bb4a02c942fe6311fbf"
|
||||
"15.10",
|
||||
"355a7c69d3f907f3612eb406cc7b9c2f55d59b59"
|
||||
],
|
||||
"v14": [
|
||||
"14.16",
|
||||
"62a86dfc91e0c35a72f2ea5e99e6969b830c0c26"
|
||||
"14.15",
|
||||
"c0aedfd3cac447510a2db843b561f0c52901b679"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user