mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Compare commits
1 Commits
skyzh/imag
...
ci-run/pr-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c93ac95d25 |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -892,7 +892,7 @@ jobs:
|
||||
docker-hub-password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
push-neon-image-prod:
|
||||
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
|
||||
if: github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
|
||||
needs: [ generate-image-maps, neon-image, test-images ]
|
||||
uses: ./.github/workflows/_push-to-container-registry.yml
|
||||
with:
|
||||
@@ -909,7 +909,7 @@ jobs:
|
||||
docker-hub-password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
push-compute-image-prod:
|
||||
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
|
||||
if: github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
|
||||
needs: [ generate-image-maps, vm-compute-node-image, test-images ]
|
||||
uses: ./.github/workflows/_push-to-container-registry.yml
|
||||
with:
|
||||
|
||||
2
.github/workflows/trigger-e2e-tests.yml
vendored
2
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -88,7 +88,7 @@ jobs:
|
||||
BUILD_AND_TEST_RUN_ID=${TAG}
|
||||
while true; do
|
||||
gh run --repo ${GITHUB_REPOSITORY} view ${BUILD_AND_TEST_RUN_ID} --json jobs --jq '[.jobs[] | select((.name | startswith("push-neon-image-dev")) or (.name | startswith("push-compute-image-dev"))) | {"name": .name, "conclusion": .conclusion, "url": .url}]' > jobs.json
|
||||
if [ $(jq '[.[] | select(.conclusion == "success")] | length' jobs.json) -eq 2 ]; then
|
||||
if [ $(jq '[.[] | select(.conclusion == "success")]' jobs.json) -eq 2 ]; then
|
||||
break
|
||||
fi
|
||||
jq -c '.[]' jobs.json | while read -r job; do
|
||||
|
||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1293,7 +1293,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"jsonwebtoken",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
@@ -1321,7 +1320,6 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
@@ -6466,8 +6464,6 @@ dependencies = [
|
||||
"routerify",
|
||||
"rustls 0.23.18",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scoped-futures",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
|
||||
16
Dockerfile
16
Dockerfile
@@ -50,14 +50,6 @@ RUN set -e \
|
||||
&& rm -rf pg_install/build \
|
||||
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
|
||||
|
||||
# Prepare cargo-chef recipe
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS plan
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
# Build neon binaries
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS build
|
||||
WORKDIR /home/nonroot
|
||||
@@ -71,15 +63,9 @@ COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_i
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
|
||||
COPY --from=plan /home/nonroot/recipe.json recipe.json
|
||||
|
||||
ARG ADDITIONAL_RUSTFLAGS=""
|
||||
|
||||
RUN set -e \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
|
||||
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
ARG ADDITIONAL_RUSTFLAGS
|
||||
RUN set -e \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
|
||||
--bin pg_sni_router \
|
||||
|
||||
@@ -300,7 +300,6 @@ ARG CARGO_HAKARI_VERSION=0.9.33
|
||||
ARG CARGO_DENY_VERSION=0.16.2
|
||||
ARG CARGO_HACK_VERSION=0.6.33
|
||||
ARG CARGO_NEXTEST_VERSION=0.9.85
|
||||
ARG CARGO_CHEF_VERSION=0.1.71
|
||||
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
chmod +x rustup-init && \
|
||||
@@ -315,7 +314,6 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
|
||||
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
|
||||
cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \
|
||||
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \
|
||||
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
|
||||
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \
|
||||
--features postgres-bundled --no-default-features && \
|
||||
rm -rf /home/nonroot/.cargo/registry && \
|
||||
|
||||
@@ -1750,7 +1750,7 @@ COPY --from=pg_graphql-src /ext-src/ /ext-src/
|
||||
COPY --from=hypopg-src /ext-src/ /ext-src/
|
||||
COPY --from=pg_hashids-src /ext-src/ /ext-src/
|
||||
COPY --from=rum-src /ext-src/ /ext-src/
|
||||
COPY --from=pgtap-src /ext-src/ /ext-src/
|
||||
#COPY --from=pgtap-src /ext-src/ /ext-src/
|
||||
COPY --from=ip4r-src /ext-src/ /ext-src/
|
||||
COPY --from=prefix-src /ext-src/ /ext-src/
|
||||
COPY --from=hll-src /ext-src/ /ext-src/
|
||||
|
||||
@@ -24,7 +24,6 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
|
||||
@@ -55,7 +55,7 @@ use signal_hook::{consts::SIGINT, iterator::Signals};
|
||||
use tracing::{error, info, warn};
|
||||
use url::Url;
|
||||
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
|
||||
use compute_tools::compute::{
|
||||
@@ -281,7 +281,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
info!("got spec from cli argument {}", spec_json);
|
||||
return Ok(CliSpecParams {
|
||||
spec: Some(serde_json::from_str(spec_json)?),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
live_config_allowed: false,
|
||||
});
|
||||
}
|
||||
@@ -291,7 +290,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
let file = File::open(Path::new(spec_path))?;
|
||||
return Ok(CliSpecParams {
|
||||
spec: Some(serde_json::from_reader(file)?),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
live_config_allowed: true,
|
||||
});
|
||||
}
|
||||
@@ -301,9 +299,8 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
};
|
||||
|
||||
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
|
||||
Ok(resp) => Ok(CliSpecParams {
|
||||
spec: resp.0,
|
||||
compute_ctl_config: resp.1,
|
||||
Ok(spec) => Ok(CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed: true,
|
||||
}),
|
||||
Err(e) => {
|
||||
@@ -320,8 +317,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
struct CliSpecParams {
|
||||
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
|
||||
spec: Option<ComputeSpec>,
|
||||
#[allow(dead_code)]
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
live_config_allowed: bool,
|
||||
}
|
||||
|
||||
@@ -331,7 +326,6 @@ fn wait_spec(
|
||||
CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed,
|
||||
compute_ctl_config: _,
|
||||
}: CliSpecParams,
|
||||
) -> Result<Arc<ComputeNode>> {
|
||||
let mut new_state = ComputeState::new();
|
||||
|
||||
@@ -11,9 +11,7 @@ use crate::migration::MigrationRunner;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
|
||||
use compute_api::responses::{
|
||||
ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
|
||||
};
|
||||
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
|
||||
use compute_api::spec::ComputeSpec;
|
||||
|
||||
// Do control plane request and return response if any. In case of error it
|
||||
@@ -75,13 +73,14 @@ fn do_control_plane_request(
|
||||
pub fn get_spec_from_control_plane(
|
||||
base_uri: &str,
|
||||
compute_id: &str,
|
||||
) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
|
||||
) -> Result<Option<ComputeSpec>> {
|
||||
let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
let mut attempt = 1;
|
||||
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
|
||||
|
||||
info!("getting spec from control plane: {}", cp_uri);
|
||||
|
||||
@@ -91,7 +90,7 @@ pub fn get_spec_from_control_plane(
|
||||
// - no spec for compute yet (Empty state) -> return Ok(None)
|
||||
// - got spec -> return Ok(Some(spec))
|
||||
while attempt < 4 {
|
||||
let result = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
spec = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
Ok(spec_resp) => {
|
||||
CPLANE_REQUESTS_TOTAL
|
||||
.with_label_values(&[
|
||||
@@ -100,10 +99,10 @@ pub fn get_spec_from_control_plane(
|
||||
])
|
||||
.inc();
|
||||
match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
|
||||
ControlPlaneComputeStatus::Empty => Ok(None),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok((Some(spec), spec_resp.compute_ctl_config))
|
||||
Ok(Some(spec))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
}
|
||||
@@ -122,10 +121,10 @@ pub fn get_spec_from_control_plane(
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = &result {
|
||||
if let Err(e) = &spec {
|
||||
error!("attempt {} to get spec failed with: {}", attempt, e);
|
||||
} else {
|
||||
return result;
|
||||
return spec;
|
||||
}
|
||||
|
||||
attempt += 1;
|
||||
@@ -133,9 +132,7 @@ pub fn get_spec_from_control_plane(
|
||||
}
|
||||
|
||||
// All attempts failed, return error.
|
||||
Err(anyhow::anyhow!(
|
||||
"Exhausted all attempts to retrieve the spec from the control plane"
|
||||
))
|
||||
spec
|
||||
}
|
||||
|
||||
/// Check `pg_hba.conf` and update if needed to allow external connections.
|
||||
|
||||
@@ -48,8 +48,6 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::responses::ComputeCtlConfig;
|
||||
use compute_api::spec::Database;
|
||||
use compute_api::spec::PgIdent;
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
@@ -882,13 +880,10 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
))
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.body(
|
||||
serde_json::to_string(&ConfigurationRequest {
|
||||
spec,
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.body(format!(
|
||||
"{{\"spec\":{}}}",
|
||||
serde_json::to_string_pretty(&spec)?
|
||||
))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -838,10 +838,7 @@ impl StorageController {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
|
||||
Some(TenantShardMigrateRequest {
|
||||
node_id,
|
||||
migration_config: None,
|
||||
}),
|
||||
Some(TenantShardMigrateRequest { node_id }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -609,10 +609,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest {
|
||||
node_id: node,
|
||||
migration_config: None,
|
||||
};
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
@@ -626,10 +623,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest {
|
||||
node_id: node,
|
||||
migration_config: None,
|
||||
};
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
@@ -1088,10 +1082,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
|
||||
Some(TenantShardMigrateRequest {
|
||||
node_id: mv.to,
|
||||
migration_config: None,
|
||||
}),
|
||||
Some(TenantShardMigrateRequest { node_id: mv.to }),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
|
||||
|
||||
@@ -71,7 +71,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
|
||||
# We are running tests now
|
||||
rm -f testout.txt testout_contrib.txt
|
||||
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
|
||||
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
|
||||
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
diff --git a/test/schedule/create.sql b/test/schedule/create.sql
|
||||
index ba355ed..7e250f5 100644
|
||||
--- a/test/schedule/create.sql
|
||||
+++ b/test/schedule/create.sql
|
||||
@@ -1,3 +1,2 @@
|
||||
\unset ECHO
|
||||
\i test/psql.sql
|
||||
-CREATE EXTENSION pgtap;
|
||||
diff --git a/test/schedule/main.sch b/test/schedule/main.sch
|
||||
index a8a5fbc..0463fc4 100644
|
||||
--- a/test/schedule/main.sch
|
||||
+++ b/test/schedule/main.sch
|
||||
@@ -1,2 +1 @@
|
||||
-test: build
|
||||
test: create
|
||||
@@ -1,6 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -ex
|
||||
cd "$(dirname ${0})"
|
||||
patch -p1 <test-upgrade.patch
|
||||
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
|
||||
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=86 --schedule test/schedule/main.sch --schedule test/build/run.sch --dbname contrib_regression --use-existing
|
||||
@@ -41,8 +41,7 @@ EXTENSIONS='[
|
||||
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
|
||||
{"extname": "semver", "extdir": "pg_semver-src"},
|
||||
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"},
|
||||
{"extname": "pgtap", "extdir": "pgtap-src"}
|
||||
{"extname": "pgjwt", "extdir": "pgjwt-src"}
|
||||
]'
|
||||
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
|
||||
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d
|
||||
|
||||
@@ -7,7 +7,6 @@ license.workspace = true
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
regex.workspace = true
|
||||
|
||||
@@ -1,20 +1,18 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
use crate::{
|
||||
privilege::Privilege,
|
||||
responses::ComputeCtlConfig,
|
||||
spec::{ComputeSpec, ExtVersion, PgIdent},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Deserialize;
|
||||
|
||||
/// Request of the /configure API
|
||||
///
|
||||
/// We now pass only `spec` in the configuration request, but later we can
|
||||
/// extend it and something like `restart: bool` or something else. So put
|
||||
/// `spec` into a struct initially to be more flexible in the future.
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ConfigurationRequest {
|
||||
pub spec: ComputeSpec,
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use jsonwebtoken::jwk::JwkSet;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
|
||||
use crate::{
|
||||
@@ -136,27 +135,13 @@ pub struct CatalogObjects {
|
||||
pub databases: Vec<Database>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct ComputeCtlConfig {
|
||||
pub jwks: JwkSet,
|
||||
}
|
||||
|
||||
impl Default for ComputeCtlConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
jwks: JwkSet {
|
||||
keys: Vec::default(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||
/// This is not actually a compute API response, so consider moving
|
||||
/// to a different place.
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneSpecResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
pub status: ControlPlaneComputeStatus,
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
|
||||
|
||||
@@ -182,18 +182,6 @@ pub struct TenantDescribeResponseShard {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateRequest {
|
||||
pub node_id: NodeId,
|
||||
#[serde(default)]
|
||||
pub migration_config: Option<MigrationConfig>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MigrationConfig {
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub secondary_warmup_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub secondary_download_request_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Debug)]
|
||||
|
||||
@@ -27,6 +27,55 @@ impl std::fmt::Display for KeySpace {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct KeySpaceIter<'a> {
|
||||
ranges: &'a [Range<Key>],
|
||||
current_range: usize,
|
||||
current_key: Option<Key>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for KeySpaceIter<'a> {
|
||||
// (current_key, range_end)
|
||||
type Item = (Key, Key);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// if we've gone through all ranges, stop iteration
|
||||
if self.current_range >= self.ranges.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let range = &self.ranges[self.current_range];
|
||||
|
||||
// if current_key is None, initialize it with range.start
|
||||
if self.current_key.is_none() {
|
||||
self.current_key = Some(range.start);
|
||||
}
|
||||
|
||||
if let Some(current_key) = self.current_key {
|
||||
// check if the current_key is still within the range
|
||||
if current_key < range.end {
|
||||
// get the next key
|
||||
let next_key = current_key.next();
|
||||
// move current_key forward
|
||||
self.current_key = Some(next_key);
|
||||
// return the range from current_key to the current range_end
|
||||
return Some((current_key, range.end));
|
||||
}
|
||||
}
|
||||
|
||||
// move to the next range
|
||||
self.current_range += 1;
|
||||
|
||||
// if there are more ranges, initialize the next key from the start of the next range
|
||||
if self.current_range < self.ranges.len() {
|
||||
self.current_key = Some(self.ranges[self.current_range].start);
|
||||
self.next() // Recurse to continue iterating with the new range
|
||||
} else {
|
||||
// no more ranges, end iteration
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper type for sparse keyspaces.
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct SparseKeySpace(pub KeySpace);
|
||||
@@ -435,6 +484,14 @@ impl KeySpace {
|
||||
pub fn contains(&self, key: &Key) -> bool {
|
||||
self.overlaps(&(*key..key.next()))
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> KeySpaceIter {
|
||||
KeySpaceIter {
|
||||
ranges: &self.ranges,
|
||||
current_range: 0,
|
||||
current_key: self.ranges.first().map(|r| r.start),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -619,6 +676,92 @@ mod tests {
|
||||
use super::*;
|
||||
use std::fmt::Write;
|
||||
|
||||
fn key(field1: u8, field2: u32, field6: u32) -> Key {
|
||||
Key {
|
||||
field1,
|
||||
field2,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_single_range() {
|
||||
let key_range = Range {
|
||||
start: key(1, 0, 0),
|
||||
end: key(1, 0, 5),
|
||||
};
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
let collected_ranges: Vec<_> = keyspace.iter().collect();
|
||||
|
||||
assert_eq!(
|
||||
collected_ranges,
|
||||
vec![
|
||||
(key(1, 0, 0), key(1, 0, 5)),
|
||||
(key(1, 0, 1), key(1, 0, 5)),
|
||||
(key(1, 0, 2), key(1, 0, 5)),
|
||||
(key(1, 0, 3), key(1, 0, 5)),
|
||||
(key(1, 0, 4), key(1, 0, 5)), // Stops at field6 = 5
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_multiple_ranges() {
|
||||
let ranges = vec![
|
||||
Range {
|
||||
start: key(1, 0, 0),
|
||||
end: key(1, 0, 3),
|
||||
},
|
||||
Range {
|
||||
start: key(2, 0, 0),
|
||||
end: key(2, 0, 2),
|
||||
},
|
||||
];
|
||||
|
||||
let keyspace = KeySpace { ranges };
|
||||
let collected_ranges: Vec<_> = keyspace.iter().collect();
|
||||
|
||||
assert_eq!(
|
||||
collected_ranges,
|
||||
vec![
|
||||
(key(1, 0, 0), key(1, 0, 3)),
|
||||
(key(1, 0, 1), key(1, 0, 3)),
|
||||
(key(1, 0, 2), key(1, 0, 3)), // End of first range
|
||||
(key(2, 0, 0), key(2, 0, 2)),
|
||||
(key(2, 0, 1), key(2, 0, 2)), // End of second range
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_empty_keyspace() {
|
||||
let keyspace = KeySpace { ranges: vec![] };
|
||||
let mut iter = keyspace.iter();
|
||||
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_range_with_single_key() {
|
||||
let key_range = Range {
|
||||
start: key(1, 42, 0),
|
||||
end: key(1, 42, 1), // Only one step
|
||||
};
|
||||
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
|
||||
let mut iter = keyspace.iter();
|
||||
assert_eq!(iter.next(), Some((key(1, 42, 0), key(1, 42, 1))));
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
// Helper function to create a key range.
|
||||
//
|
||||
// Make the tests below less verbose.
|
||||
|
||||
@@ -1136,24 +1136,7 @@ pub struct TimelineInfo {
|
||||
pub ancestor_lsn: Option<Lsn>,
|
||||
pub last_record_lsn: Lsn,
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
|
||||
/// Legacy field for compat with control plane. Synonym of `min_readable_lsn`.
|
||||
/// TODO: remove once control plane no longer reads it.
|
||||
pub latest_gc_cutoff_lsn: Lsn,
|
||||
|
||||
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
|
||||
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
|
||||
/// as it is easier to reason about.
|
||||
pub applied_gc_cutoff_lsn: Lsn,
|
||||
|
||||
/// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
|
||||
/// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
|
||||
/// LSN at which it is legal to create a branch or ephemeral endpoint.
|
||||
///
|
||||
/// Note that holders of valid LSN leases may be able to create branches and read pages earlier
|
||||
/// than this LSN, but new leases may not be taken out earlier than this LSN.
|
||||
pub min_readable_lsn: Lsn,
|
||||
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
|
||||
/// The LSN that we have succesfully uploaded to remote storage
|
||||
|
||||
@@ -42,8 +42,8 @@ use utils::lsn::Lsn;
|
||||
pub enum BasebackupError {
|
||||
#[error("basebackup pageserver error {0:#}")]
|
||||
Server(#[from] anyhow::Error),
|
||||
#[error("basebackup client error {0:#} when {1}")]
|
||||
Client(#[source] io::Error, &'static str),
|
||||
#[error("basebackup client error {0:#}")]
|
||||
Client(#[source] io::Error),
|
||||
}
|
||||
|
||||
/// Create basebackup with non-rel data in it.
|
||||
@@ -234,7 +234,7 @@ where
|
||||
self.ar
|
||||
.append(&header, self.buf.as_slice())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "flush"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
self.total_blocks += nblocks;
|
||||
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
@@ -273,9 +273,9 @@ where
|
||||
for dir in subdirs.iter() {
|
||||
let header = new_tar_header_dir(dir)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball"))?;
|
||||
.context("could not add directory to basebackup tarball")?;
|
||||
}
|
||||
|
||||
// Send config files.
|
||||
@@ -286,13 +286,13 @@ where
|
||||
self.ar
|
||||
.append(&header, data)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
|
||||
.context("could not add config file to basebackup tarball")?;
|
||||
} else {
|
||||
let header = new_tar_header(filepath, 0)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_config_file"))?;
|
||||
.context("could not add config file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
if !lazy_slru_download {
|
||||
@@ -406,7 +406,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_aux_file"))?;
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
@@ -419,7 +419,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &data[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,restart.lsn"))?;
|
||||
.context("could not add restart.lsn file to basebackup tarball")?;
|
||||
}
|
||||
for xid in self
|
||||
.timeline
|
||||
@@ -451,9 +451,9 @@ where
|
||||
let crc32 = crc32c::crc32c(&content);
|
||||
content.extend_from_slice(&crc32.to_le_bytes());
|
||||
let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?;
|
||||
self.ar.append(&header, &*content).await.map_err(|e| {
|
||||
BasebackupError::Client(e, "send_tarball,pg_logical/replorigin_checkpoint")
|
||||
})?;
|
||||
self.ar.append(&header, &*content).await.context(
|
||||
"could not add pg_logical/replorigin_checkpoint file to basebackup tarball",
|
||||
)?;
|
||||
}
|
||||
|
||||
fail_point!("basebackup-before-control-file", |_| {
|
||||
@@ -464,10 +464,7 @@ where
|
||||
|
||||
// Generate pg_control and bootstrap WAL segment.
|
||||
self.add_pgcontrol_file().await?;
|
||||
self.ar
|
||||
.finish()
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "send_tarball,finish"))?;
|
||||
self.ar.finish().await.map_err(BasebackupError::Client)?;
|
||||
debug!("all tarred up!");
|
||||
Ok(())
|
||||
}
|
||||
@@ -485,9 +482,9 @@ where
|
||||
let file_name = dst.to_segfile_name(0);
|
||||
let header = new_tar_header(&file_name, 0)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_rel,empty"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -518,7 +515,7 @@ where
|
||||
self.ar
|
||||
.append(&header, segment_data.as_slice())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_rel,segment"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
seg += 1;
|
||||
startblk = endblk;
|
||||
@@ -569,7 +566,7 @@ where
|
||||
self.ar
|
||||
.append(&header, pg_version_str.as_bytes())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,PG_VERSION"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
info!("timeline.pg_version {}", self.timeline.pg_version);
|
||||
|
||||
@@ -579,7 +576,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &img[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,global/pg_filenode.map"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
} else {
|
||||
warn!("global/pg_filenode.map is missing");
|
||||
}
|
||||
@@ -615,9 +612,9 @@ where
|
||||
let path = format!("base/{}", dbnode);
|
||||
let header = new_tar_header_dir(&path)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
.append(&header, &mut io::empty())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
if let Some(img) = relmap_img {
|
||||
let dst_path = format!("base/{}/PG_VERSION", dbnode);
|
||||
@@ -630,14 +627,14 @@ where
|
||||
self.ar
|
||||
.append(&header, pg_version_str.as_bytes())
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
|
||||
let header = new_tar_header(&relmap_path, img.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &img[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/pg_filenode.map"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
@@ -666,7 +663,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &buf[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_twophase_file"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -696,7 +693,7 @@ where
|
||||
zenith_signal.as_bytes(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
@@ -721,7 +718,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &pg_control_bytes[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,pg_control"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
|
||||
//send wal segment
|
||||
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
|
||||
@@ -745,7 +742,7 @@ where
|
||||
self.ar
|
||||
.append(&header, &wal_seg[..])
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,wal_segment"))?;
|
||||
.map_err(BasebackupError::Client)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1080,10 +1080,7 @@ components:
|
||||
type: integer
|
||||
state:
|
||||
type: string
|
||||
min_readable_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
applied_gc_cutoff_lsn:
|
||||
latest_gc_cutoff_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
|
||||
@@ -482,11 +482,6 @@ async fn build_timeline_info_common(
|
||||
|
||||
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
|
||||
|
||||
let min_readable_lsn = std::cmp::max(
|
||||
timeline.get_gc_cutoff_lsn(),
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
);
|
||||
|
||||
let info = TimelineInfo {
|
||||
tenant_id: timeline.tenant_shard_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
@@ -498,12 +493,7 @@ async fn build_timeline_info_common(
|
||||
initdb_lsn,
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
// Externally, expose the lowest LSN that can be used to create a branch as the "GC cutoff", although internally
|
||||
// we distinguish between the "planned" GC cutoff (PITR point) and the "latest" GC cutoff (where we
|
||||
// actually trimmed data to), which can pass each other when PITR is changed.
|
||||
latest_gc_cutoff_lsn: min_readable_lsn,
|
||||
min_readable_lsn,
|
||||
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
|
||||
current_logical_size_is_accurate: match current_logical_size.accuracy() {
|
||||
tenant::timeline::logical_size::Accuracy::Approximate => false,
|
||||
|
||||
@@ -914,7 +914,7 @@ impl PageServerHandler {
|
||||
&shard,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
&shard.get_latest_gc_cutoff_lsn(),
|
||||
ctx,
|
||||
)
|
||||
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
|
||||
@@ -1810,7 +1810,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -1837,7 +1837,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -1864,7 +1864,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -1954,7 +1954,7 @@ impl PageServerHandler {
|
||||
req: &PagestreamGetSlruSegmentRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
req.hdr.request_lsn,
|
||||
@@ -2050,8 +2050,7 @@ impl PageServerHandler {
|
||||
{
|
||||
fn map_basebackup_error(err: BasebackupError) -> QueryError {
|
||||
match err {
|
||||
// TODO: passthrough the error site to the final error message?
|
||||
BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
|
||||
BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
|
||||
BasebackupError::Server(e) => QueryError::Other(e),
|
||||
}
|
||||
}
|
||||
@@ -2072,7 +2071,7 @@ impl PageServerHandler {
|
||||
//return Err(QueryError::NotFound("timeline is archived".into()))
|
||||
}
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
info!("waiting for {}", lsn);
|
||||
@@ -2152,12 +2151,10 @@ impl PageServerHandler {
|
||||
.await
|
||||
.map_err(map_basebackup_error)?;
|
||||
}
|
||||
writer.flush().await.map_err(|e| {
|
||||
map_basebackup_error(BasebackupError::Client(
|
||||
e,
|
||||
"handle_basebackup_request,flush",
|
||||
))
|
||||
})?;
|
||||
writer
|
||||
.flush()
|
||||
.await
|
||||
.map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)
|
||||
|
||||
@@ -611,7 +611,7 @@ impl Timeline {
|
||||
) -> Result<LsnForTimestamp, PageReconstructError> {
|
||||
pausable_failpoint!("find-lsn-for-timestamp-pausable");
|
||||
|
||||
let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
|
||||
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
|
||||
let gc_cutoff_planned = {
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
gc_info.min_cutoff()
|
||||
|
||||
@@ -40,8 +40,6 @@ use remote_timeline_client::manifest::{
|
||||
use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use remote_timeline_client::FAILED_REMOTE_OP_RETRIES;
|
||||
use remote_timeline_client::FAILED_UPLOAD_WARN_THRESHOLD;
|
||||
use secondary::heatmap::HeatMapTenant;
|
||||
use secondary::heatmap::HeatMapTimeline;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
@@ -57,7 +55,6 @@ use timeline::offload::OffloadError;
|
||||
use timeline::CompactFlags;
|
||||
use timeline::CompactOptions;
|
||||
use timeline::CompactionError;
|
||||
use timeline::PreviousHeatmap;
|
||||
use timeline::ShutdownMode;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
@@ -265,7 +262,6 @@ struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
client: RemoteTimelineClient,
|
||||
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
}
|
||||
|
||||
pub(crate) struct TenantPreload {
|
||||
@@ -283,53 +279,6 @@ pub(crate) enum SpawnMode {
|
||||
Lazy,
|
||||
}
|
||||
|
||||
/// A notifier that can be used to trigger compaction shared by all timelines of a tenant.
|
||||
///
|
||||
/// It is used to notify the compaction loop that there is work to be done. It is also used
|
||||
/// to balance the load of compaction between timelines. If there are pending L0 compaction in
|
||||
/// one of the timeline, it could preempt long-running compaction jobs (e.g., image compaction)
|
||||
/// on other timelines.
|
||||
pub struct CompactionNotifier {
|
||||
notify: Notify,
|
||||
l0_count: std::sync::Mutex<HashMap<TimelineId, usize>>,
|
||||
}
|
||||
|
||||
impl CompactionNotifier {
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
notify: Notify::new(),
|
||||
l0_count: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn notify_one(&self) {
|
||||
self.notify.notify_one();
|
||||
}
|
||||
|
||||
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
|
||||
self.notify.notified()
|
||||
}
|
||||
|
||||
pub fn on_l0_update(&self, timeline_id: TimelineId, l0_count: usize) {
|
||||
let mut guard = self.l0_count.lock().unwrap();
|
||||
guard.insert(timeline_id, l0_count);
|
||||
}
|
||||
|
||||
pub fn on_shutdown(&self, timeline_id: TimelineId) {
|
||||
let mut guard = self.l0_count.lock().unwrap();
|
||||
guard.remove(&timeline_id);
|
||||
}
|
||||
|
||||
pub fn get_max_l0_count(&self) -> Option<(usize, TimelineId)> {
|
||||
let guard = self.l0_count.lock().unwrap();
|
||||
guard
|
||||
.iter()
|
||||
.max_by_key(|(_, count)| *count)
|
||||
.map(|(timeline_id, count)| (*count, *timeline_id))
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -405,7 +354,7 @@ pub struct Tenant {
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
|
||||
pub(crate) l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
pub(crate) l0_compaction_trigger: Arc<Notify>,
|
||||
|
||||
/// Scheduled gc-compaction tasks.
|
||||
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
|
||||
@@ -1179,7 +1128,6 @@ impl Tenant {
|
||||
resources: TimelineResources,
|
||||
mut index_part: IndexPart,
|
||||
metadata: TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
cause: LoadTimelineCause,
|
||||
ctx: &RequestContext,
|
||||
@@ -1210,7 +1158,6 @@ impl Tenant {
|
||||
let timeline = self.create_timeline_struct(
|
||||
timeline_id,
|
||||
&metadata,
|
||||
previous_heatmap,
|
||||
ancestor.clone(),
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
@@ -1610,18 +1557,8 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(vlad): Could go to S3 if the secondary is freezing cold and hasn't even
|
||||
// pulled the first heatmap. Not entirely necessary since the storage controller
|
||||
// will kick the secondary in any case and cause a download.
|
||||
let maybe_heatmap_at = self.read_on_disk_heatmap().await;
|
||||
|
||||
let timelines = self
|
||||
.load_timelines_metadata(
|
||||
remote_timeline_ids,
|
||||
remote_storage,
|
||||
maybe_heatmap_at,
|
||||
cancel,
|
||||
)
|
||||
.load_timelines_metadata(remote_timeline_ids, remote_storage, cancel)
|
||||
.await?;
|
||||
|
||||
Ok(TenantPreload {
|
||||
@@ -1634,26 +1571,6 @@ impl Tenant {
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_on_disk_heatmap(&self) -> Option<(HeatMapTenant, std::time::Instant)> {
|
||||
let on_disk_heatmap_path = self.conf.tenant_heatmap_path(&self.tenant_shard_id);
|
||||
match tokio::fs::read_to_string(on_disk_heatmap_path).await {
|
||||
Ok(heatmap) => match serde_json::from_str::<HeatMapTenant>(&heatmap) {
|
||||
Ok(heatmap) => Some((heatmap, std::time::Instant::now())),
|
||||
Err(err) => {
|
||||
error!("Failed to deserialize old heatmap: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(err) => match err.kind() {
|
||||
std::io::ErrorKind::NotFound => None,
|
||||
_ => {
|
||||
error!("Unexpected IO error reading old heatmap: {err}");
|
||||
None
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
@@ -1741,10 +1658,7 @@ impl Tenant {
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
|
||||
remote_index_and_client.insert(
|
||||
timeline_id,
|
||||
(index_part, preload.client, preload.previous_heatmap),
|
||||
);
|
||||
remote_index_and_client.insert(timeline_id, (index_part, preload.client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
info!(
|
||||
@@ -1763,7 +1677,7 @@ impl Tenant {
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client, previous_heatmap) = remote_index_and_client
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
@@ -1783,7 +1697,6 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
previous_heatmap,
|
||||
self.get_timeline_resources_for(remote_client),
|
||||
LoadTimelineCause::Attach,
|
||||
ctx,
|
||||
@@ -1933,13 +1846,11 @@ impl Tenant {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn load_remote_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
resources: TimelineResources,
|
||||
cause: LoadTimelineCause,
|
||||
ctx: &RequestContext,
|
||||
@@ -1969,7 +1880,6 @@ impl Tenant {
|
||||
resources,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
previous_heatmap,
|
||||
ancestor,
|
||||
cause,
|
||||
ctx,
|
||||
@@ -1981,29 +1891,14 @@ impl Tenant {
|
||||
self: &Arc<Tenant>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
heatmap: Option<(HeatMapTenant, std::time::Instant)>,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
|
||||
let mut timeline_heatmaps = heatmap.map(|h| (h.0.into_timelines_index(), h.1));
|
||||
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in timeline_ids {
|
||||
let cancel_clone = cancel.clone();
|
||||
|
||||
let previous_timeline_heatmap = timeline_heatmaps.as_mut().and_then(|hs| {
|
||||
hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active {
|
||||
heatmap: h,
|
||||
read_at: hs.1,
|
||||
})
|
||||
});
|
||||
part_downloads.spawn(
|
||||
self.load_timeline_metadata(
|
||||
timeline_id,
|
||||
remote_storage.clone(),
|
||||
previous_timeline_heatmap,
|
||||
cancel_clone,
|
||||
)
|
||||
.instrument(info_span!("download_index_part", %timeline_id)),
|
||||
self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone)
|
||||
.instrument(info_span!("download_index_part", %timeline_id)),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2051,7 +1946,6 @@ impl Tenant {
|
||||
self: &Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
cancel: CancellationToken,
|
||||
) -> impl Future<Output = TimelinePreload> {
|
||||
let client = self.build_timeline_client(timeline_id, remote_storage);
|
||||
@@ -2067,7 +1961,6 @@ impl Tenant {
|
||||
client,
|
||||
timeline_id,
|
||||
index_part,
|
||||
previous_heatmap,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2179,12 +2072,7 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
let timeline_preload = self
|
||||
.load_timeline_metadata(
|
||||
timeline_id,
|
||||
self.remote_storage.clone(),
|
||||
None,
|
||||
cancel.clone(),
|
||||
)
|
||||
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
|
||||
.await;
|
||||
|
||||
let index_part = match timeline_preload.index_part {
|
||||
@@ -2218,7 +2106,6 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
None,
|
||||
timeline_resources,
|
||||
LoadTimelineCause::Unoffload,
|
||||
&ctx,
|
||||
@@ -2934,7 +2821,7 @@ impl Tenant {
|
||||
};
|
||||
let metadata = index_part.metadata.clone();
|
||||
self
|
||||
.load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{
|
||||
.load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{
|
||||
create_guard: timeline_create_guard, activate, }, &ctx)
|
||||
.await?
|
||||
.ready_to_activate()
|
||||
@@ -4143,7 +4030,6 @@ impl Tenant {
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
resources: TimelineResources,
|
||||
cause: CreateTimelineCause,
|
||||
@@ -4167,7 +4053,6 @@ impl Tenant {
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
new_metadata,
|
||||
previous_heatmap,
|
||||
ancestor,
|
||||
new_timeline_id,
|
||||
self.tenant_shard_id,
|
||||
@@ -4289,7 +4174,7 @@ impl Tenant {
|
||||
// use an extremely long backoff.
|
||||
Some(Duration::from_secs(3600 * 24)),
|
||||
)),
|
||||
l0_compaction_trigger: Arc::new(CompactionNotifier::new()),
|
||||
l0_compaction_trigger: Arc::new(Notify::new()),
|
||||
scheduled_compaction_tasks: Mutex::new(Default::default()),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
|
||||
@@ -4810,24 +4695,24 @@ impl Tenant {
|
||||
// We check it against both the planned GC cutoff stored in 'gc_info',
|
||||
// and the 'latest_gc_cutoff' of the last GC that was performed. The
|
||||
// planned GC cutoff in 'gc_info' is normally larger than
|
||||
// 'applied_gc_cutoff_lsn', but beware of corner cases like if you just
|
||||
// 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
|
||||
// changed the GC settings for the tenant to make the PITR window
|
||||
// larger, but some of the data was already removed by an earlier GC
|
||||
// iteration.
|
||||
|
||||
// check against last actual 'latest_gc_cutoff' first
|
||||
let applied_gc_cutoff_lsn = src_timeline.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
|
||||
{
|
||||
let gc_info = src_timeline.gc_info.read().unwrap();
|
||||
let planned_cutoff = gc_info.min_cutoff();
|
||||
if gc_info.lsn_covered_by_lease(start_lsn) {
|
||||
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *applied_gc_cutoff_lsn);
|
||||
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *latest_gc_cutoff_lsn);
|
||||
} else {
|
||||
src_timeline
|
||||
.check_lsn_is_in_scope(start_lsn, &applied_gc_cutoff_lsn)
|
||||
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
|
||||
.context(format!(
|
||||
"invalid branch start lsn: less than latest GC cutoff {}",
|
||||
*applied_gc_cutoff_lsn,
|
||||
*latest_gc_cutoff_lsn,
|
||||
))
|
||||
.map_err(CreateTimelineError::AncestorLsn)?;
|
||||
|
||||
@@ -4866,7 +4751,7 @@ impl Tenant {
|
||||
dst_prev,
|
||||
Some(src_id),
|
||||
start_lsn,
|
||||
*src_timeline.applied_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
src_timeline.initdb_lsn,
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
@@ -5239,7 +5124,6 @@ impl Tenant {
|
||||
.create_timeline_struct(
|
||||
new_timeline_id,
|
||||
new_metadata,
|
||||
None,
|
||||
ancestor,
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
@@ -6246,8 +6130,8 @@ mod tests {
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
|
||||
let applied_gc_cutoff_lsn = tline.get_applied_gc_cutoff_lsn();
|
||||
assert!(*applied_gc_cutoff_lsn > Lsn(0x25));
|
||||
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
|
||||
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
|
||||
match tline.get(*TEST_KEY, Lsn(0x25)) {
|
||||
Ok(_) => panic!("request for page should have failed"),
|
||||
Err(err) => assert!(err.to_string().contains("not found at")),
|
||||
@@ -8543,7 +8427,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -8651,7 +8535,7 @@ mod tests {
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x40))
|
||||
.wait()
|
||||
@@ -8819,8 +8703,8 @@ mod tests {
|
||||
|
||||
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
|
||||
info!(
|
||||
"applied_gc_cutoff_lsn: {}",
|
||||
*timeline.get_applied_gc_cutoff_lsn()
|
||||
"latest_gc_cutoff_lsn: {}",
|
||||
*timeline.get_latest_gc_cutoff_lsn()
|
||||
);
|
||||
timeline.force_set_disk_consistent_lsn(end_lsn);
|
||||
|
||||
@@ -8846,7 +8730,7 @@ mod tests {
|
||||
|
||||
// Make lease on a already GC-ed LSN.
|
||||
// 0/80 does not have a valid lease + is below latest_gc_cutoff
|
||||
assert!(Lsn(0x80) < *timeline.get_applied_gc_cutoff_lsn());
|
||||
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
|
||||
timeline
|
||||
.init_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx)
|
||||
.expect_err("lease request on GC-ed LSN should fail");
|
||||
@@ -9037,7 +8921,7 @@ mod tests {
|
||||
};
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -9124,7 +9008,7 @@ mod tests {
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x40))
|
||||
.wait()
|
||||
@@ -9577,7 +9461,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -9724,7 +9608,7 @@ mod tests {
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x38))
|
||||
.wait()
|
||||
@@ -9825,7 +9709,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -10076,7 +9960,7 @@ mod tests {
|
||||
|
||||
{
|
||||
parent_tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x10))
|
||||
.wait()
|
||||
@@ -10096,7 +9980,7 @@ mod tests {
|
||||
|
||||
{
|
||||
branch_tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x50))
|
||||
.wait()
|
||||
@@ -10452,7 +10336,7 @@ mod tests {
|
||||
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -10837,7 +10721,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
@@ -11088,7 +10972,7 @@ mod tests {
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.applied_gc_cutoff_lsn
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
|
||||
@@ -130,10 +130,7 @@ struct TimelineMetadataBodyV2 {
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<TimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
|
||||
// The LSN at which GC was last executed. Synonym of [`Timeline::applied_gc_cutoff_lsn`].
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::HashMap, time::SystemTime};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
|
||||
use utils::{generation::Generation, id::TimelineId};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapTenant {
|
||||
pub(super) struct HeatMapTenant {
|
||||
/// Generation of the attached location that uploaded the heatmap: this is not required
|
||||
/// for correctness, but acts as a hint to secondary locations in order to detect thrashing
|
||||
/// in the unlikely event that two attached locations are both uploading conflicting heatmaps.
|
||||
@@ -25,17 +25,8 @@ pub(crate) struct HeatMapTenant {
|
||||
pub(super) upload_period_ms: Option<u128>,
|
||||
}
|
||||
|
||||
impl HeatMapTenant {
|
||||
pub(crate) fn into_timelines_index(self) -> HashMap<TimelineId, HeatMapTimeline> {
|
||||
self.timelines
|
||||
.into_iter()
|
||||
.map(|htl| (htl.timeline_id, htl))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapTimeline {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
@@ -44,13 +35,13 @@ pub(crate) struct HeatMapTimeline {
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct HeatMapLayer {
|
||||
pub(crate) name: LayerName,
|
||||
pub(crate) metadata: LayerFileMetadata,
|
||||
|
||||
#[serde_as(as = "TimestampSeconds<i64>")]
|
||||
pub(crate) access_time: SystemTime,
|
||||
pub(super) access_time: SystemTime,
|
||||
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
|
||||
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
|
||||
}
|
||||
|
||||
@@ -394,7 +394,7 @@ pub(super) async fn gather_inputs(
|
||||
ancestor_lsn,
|
||||
last_record: last_record_lsn,
|
||||
// this is not used above, because it might not have updated recently enough
|
||||
latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
|
||||
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
next_pitr_cutoff,
|
||||
retention_param_cutoff,
|
||||
lease_points,
|
||||
|
||||
@@ -136,22 +136,6 @@ pub(crate) fn local_layer_path(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum LastEviction {
|
||||
Never,
|
||||
At(std::time::Instant),
|
||||
Evicting,
|
||||
}
|
||||
|
||||
impl LastEviction {
|
||||
pub(crate) fn happened_after(&self, timepoint: std::time::Instant) -> bool {
|
||||
match self {
|
||||
LastEviction::Never => false,
|
||||
LastEviction::At(evicted_at) => evicted_at > &timepoint,
|
||||
LastEviction::Evicting => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
/// Creates a layer value for a file we know to not be resident.
|
||||
pub(crate) fn for_evicted(
|
||||
@@ -421,17 +405,6 @@ impl Layer {
|
||||
self.0.metadata()
|
||||
}
|
||||
|
||||
pub(crate) fn last_evicted_at(&self) -> LastEviction {
|
||||
match self.0.last_evicted_at.try_lock() {
|
||||
Ok(lock) => match *lock {
|
||||
None => LastEviction::Never,
|
||||
Some(at) => LastEviction::At(at),
|
||||
},
|
||||
Err(std::sync::TryLockError::WouldBlock) => LastEviction::Evicting,
|
||||
Err(std::sync::TryLockError::Poisoned(p)) => panic!("Lock poisoned: {p}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.0
|
||||
.timeline
|
||||
@@ -683,9 +656,7 @@ struct LayerInner {
|
||||
|
||||
/// When the Layer was last evicted but has not been downloaded since.
|
||||
///
|
||||
/// This is used for skipping evicted layers from the previous heatmap (see
|
||||
/// `[Timeline::generate_heatmap]`) and for updating metrics
|
||||
/// (see [`LayerImplMetrics::redownload_after`]).
|
||||
/// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
|
||||
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -47,7 +47,7 @@ use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::{oneshot, watch};
|
||||
use tokio::sync::{oneshot, watch, Notify};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::critical;
|
||||
@@ -150,16 +150,16 @@ use super::{
|
||||
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
|
||||
MaybeOffloaded,
|
||||
};
|
||||
use super::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, CompactionNotifier,
|
||||
HeatMapTimeline,
|
||||
};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{
|
||||
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
|
||||
storage_layer::ReadableLayer,
|
||||
};
|
||||
use super::{secondary::heatmap::HeatMapLayer, GcError};
|
||||
use super::{
|
||||
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
|
||||
GcError,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
use pageserver_api::value::Value;
|
||||
@@ -225,7 +225,7 @@ pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
|
||||
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
|
||||
pub l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
@@ -352,11 +352,8 @@ pub struct Timeline {
|
||||
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
|
||||
|
||||
// The LSN at which we have executed GC: whereas [`Self::gc_info`] records the LSN at which
|
||||
// we _intend_ to GC (i.e. the PITR cutoff), this LSN records where we actually last did it.
|
||||
// Because PITR interval is mutable, it's possible for this LSN to be earlier or later than
|
||||
// the planned GC cutoff.
|
||||
pub applied_gc_cutoff_lsn: Rcu<Lsn>,
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
|
||||
|
||||
pub(crate) gc_compaction_layer_update_lock: tokio::sync::RwLock<()>,
|
||||
|
||||
@@ -429,7 +426,7 @@ pub struct Timeline {
|
||||
compaction_failed: AtomicBool,
|
||||
|
||||
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
|
||||
l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
l0_compaction_trigger: Arc<Notify>,
|
||||
|
||||
/// Make sure we only have one running gc at a time.
|
||||
///
|
||||
@@ -465,16 +462,6 @@ pub struct Timeline {
|
||||
|
||||
/// If Some, collects GetPage metadata for an ongoing PageTrace.
|
||||
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
|
||||
|
||||
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
Active {
|
||||
heatmap: HeatMapTimeline,
|
||||
read_at: std::time::Instant,
|
||||
},
|
||||
Obsolete,
|
||||
}
|
||||
|
||||
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
|
||||
@@ -1090,15 +1077,9 @@ impl Timeline {
|
||||
(history, gc_info.within_ancestor_pitr)
|
||||
}
|
||||
|
||||
/// Read timeline's GC cutoff: this is the LSN at which GC has started to happen
|
||||
pub(crate) fn get_applied_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.applied_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
/// Read timeline's planned GC cutoff: this is the logical end of history that users
|
||||
/// are allowed to read (based on configured PITR), even if physically we have more history.
|
||||
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
|
||||
self.gc_info.read().unwrap().cutoffs.time
|
||||
/// Lock and get timeline's GC cutoff
|
||||
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
/// Look up given page version.
|
||||
@@ -1606,7 +1587,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
if init || validate {
|
||||
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
|
||||
if lsn < *latest_gc_cutoff_lsn {
|
||||
bail!("tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
|
||||
}
|
||||
@@ -1966,8 +1947,6 @@ impl Timeline {
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
self.l0_compaction_trigger.on_shutdown(self.timeline_id);
|
||||
|
||||
if let ShutdownMode::FreezeAndFlush = mode {
|
||||
let do_flush = if let Some((open, frozen)) = self
|
||||
.layers
|
||||
@@ -2580,7 +2559,6 @@ impl Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
metadata: &TimelineMetadata,
|
||||
previous_heatmap: Option<PreviousHeatmap>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -2681,7 +2659,7 @@ impl Timeline {
|
||||
LastImageLayerCreationStatus::default(),
|
||||
)),
|
||||
|
||||
applied_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
|
||||
current_logical_size: if disk_consistent_lsn.is_valid() {
|
||||
@@ -2743,8 +2721,6 @@ impl Timeline {
|
||||
create_idempotency,
|
||||
|
||||
page_trace: Default::default(),
|
||||
|
||||
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -3483,52 +3459,12 @@ impl Timeline {
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
// Firstly, if there's any heatmap left over from when this location
|
||||
// was a secondary, take that into account. Keep layers that are:
|
||||
// * present in the layer map
|
||||
// * visible
|
||||
// * non-resident
|
||||
// * not evicted since we read the heatmap
|
||||
//
|
||||
// Without this, a new cold, attached location would clobber the previous
|
||||
// heatamp.
|
||||
let previous_heatmap = self.previous_heatmap.load();
|
||||
let visible_non_resident = match previous_heatmap.as_deref() {
|
||||
Some(PreviousHeatmap::Active { heatmap, read_at }) => {
|
||||
Some(heatmap.layers.iter().filter_map(|hl| {
|
||||
let desc: PersistentLayerDesc = hl.name.clone().into();
|
||||
let layer = guard.try_get_from_key(&desc.key())?;
|
||||
|
||||
if layer.visibility() == LayerVisibilityHint::Covered {
|
||||
return None;
|
||||
}
|
||||
|
||||
if layer.is_likely_resident() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if layer.last_evicted_at().happened_after(*read_at) {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((desc, hl.metadata.clone(), hl.access_time))
|
||||
}))
|
||||
}
|
||||
Some(PreviousHeatmap::Obsolete) => None,
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Secondly, all currently visible, resident layers are included.
|
||||
let resident = guard.likely_resident_layers().filter_map(|layer| {
|
||||
match layer.visibility() {
|
||||
LayerVisibilityHint::Visible => {
|
||||
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
|
||||
let last_activity_ts = layer.latest_activity();
|
||||
Some((
|
||||
layer.layer_desc().clone(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
))
|
||||
Some((layer.layer_desc(), layer.metadata(), last_activity_ts))
|
||||
}
|
||||
LayerVisibilityHint::Covered => {
|
||||
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
|
||||
@@ -3537,18 +3473,7 @@ impl Timeline {
|
||||
}
|
||||
});
|
||||
|
||||
let mut layers = match visible_non_resident {
|
||||
Some(non_resident) => {
|
||||
let mut non_resident = non_resident.peekable();
|
||||
if non_resident.peek().is_none() {
|
||||
self.previous_heatmap
|
||||
.store(Some(PreviousHeatmap::Obsolete.into()));
|
||||
}
|
||||
|
||||
non_resident.chain(resident).collect::<Vec<_>>()
|
||||
}
|
||||
None => resident.collect::<Vec<_>>(),
|
||||
};
|
||||
let mut layers = resident.collect::<Vec<_>>();
|
||||
|
||||
// Sort layers in order of which to download first. For a large set of layers to download, we
|
||||
// want to prioritize those layers which are most likely to still be in the resident many minutes
|
||||
@@ -3737,7 +3662,7 @@ impl Timeline {
|
||||
// the timeline, then it will remove layers that are required for fulfilling
|
||||
// the current get request (read-path cannot "look back" and notice the new
|
||||
// image layer).
|
||||
let _gc_cutoff_holder = timeline.get_applied_gc_cutoff_lsn();
|
||||
let _gc_cutoff_holder = timeline.get_latest_gc_cutoff_lsn();
|
||||
|
||||
// See `compaction::compact_with_gc` for why we need this.
|
||||
let _guard = timeline.gc_compaction_layer_update_lock.read().await;
|
||||
@@ -4122,8 +4047,6 @@ impl Timeline {
|
||||
if l0_count >= self.get_compaction_threshold() {
|
||||
self.l0_compaction_trigger.notify_one();
|
||||
}
|
||||
self.l0_compaction_trigger
|
||||
.on_l0_update(self.timeline_id, l0_count);
|
||||
|
||||
// Delay the next flush to backpressure if compaction can't keep up. We delay by the
|
||||
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
|
||||
@@ -4426,7 +4349,7 @@ impl Timeline {
|
||||
let update = crate::tenant::metadata::MetadataUpdate::new(
|
||||
disk_consistent_lsn,
|
||||
ondisk_prev_record_lsn,
|
||||
*self.applied_gc_cutoff_lsn.read(),
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
);
|
||||
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
@@ -4629,74 +4552,71 @@ impl Timeline {
|
||||
let mut wrote_keys = false;
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
// need to drop keys that don't belong to them. If we retain the key, add it
|
||||
// to `key_request_accum` for later issuing a vectored get
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
} else {
|
||||
key_request_accum.add_key(key);
|
||||
|
||||
for (key, range_end) in partition.iter() {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
// need to drop keys that don't belong to them. If we retain the key, add it
|
||||
// to `key_request_accum` for later issuing a vectored get
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
} else {
|
||||
key_request_accum.add_key(key);
|
||||
}
|
||||
|
||||
let last_key_in_range = key.next() == range_end;
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| (last_key_in_range && key_request_accum.raw_size() > 0)
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(
|
||||
key_request_accum.consume_keyspace(),
|
||||
lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
|
||||
let last_key_in_range = key.next() == range.end;
|
||||
key = key.next();
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| (last_key_in_range && key_request_accum.raw_size() > 0)
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(
|
||||
key_request_accum.consume_keyspace(),
|
||||
lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CreateImageLayersError::from(err));
|
||||
}
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CreateImageLayersError::from(err));
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5072,22 +4992,20 @@ impl Timeline {
|
||||
// image layer generation taking too long time and blocking L0 compaction. So in this
|
||||
// mode, we also inspect the current number of L0 layers and skip image layer generation
|
||||
// if there are too many of them.
|
||||
if let Some((max_num_of_l0_layers, timeline_id)) =
|
||||
self.l0_compaction_trigger.get_max_l0_count()
|
||||
{
|
||||
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0
|
||||
&& max_num_of_l0_layers >= image_preempt_threshold
|
||||
{
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {} on timeline {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), max_num_of_l0_layers, timeline_id
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
let num_of_l0_layers = {
|
||||
let layers = self.layers.read().await;
|
||||
layers.layer_map()?.level0_deltas().len()
|
||||
};
|
||||
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5466,13 +5384,8 @@ impl Timeline {
|
||||
self.remote_client
|
||||
.schedule_compaction_update(&remove_layers, new_deltas)?;
|
||||
|
||||
let l0_count = guard.layer_map()?.level0_deltas().len();
|
||||
|
||||
drop_wlock(guard);
|
||||
|
||||
self.l0_compaction_trigger
|
||||
.on_l0_update(self.timeline_id, l0_count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5661,7 +5574,7 @@ impl Timeline {
|
||||
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
|
||||
// cannot advance beyond what was already GC'd, and respect space-based retention
|
||||
GcCutoffs {
|
||||
time: *self.get_applied_gc_cutoff_lsn(),
|
||||
time: *self.get_latest_gc_cutoff_lsn(),
|
||||
space: space_cutoff,
|
||||
}
|
||||
}
|
||||
@@ -5782,7 +5695,7 @@ impl Timeline {
|
||||
let mut result: GcResult = GcResult::default();
|
||||
|
||||
// Nothing to GC. Return early.
|
||||
let latest_gc_cutoff = *self.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
|
||||
if latest_gc_cutoff >= new_gc_cutoff {
|
||||
info!(
|
||||
"Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
|
||||
@@ -5796,7 +5709,7 @@ impl Timeline {
|
||||
//
|
||||
// The GC cutoff should only ever move forwards.
|
||||
let waitlist = {
|
||||
let write_guard = self.applied_gc_cutoff_lsn.lock_for_write();
|
||||
let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
|
||||
if *write_guard > new_gc_cutoff {
|
||||
return Err(GcError::BadLsn {
|
||||
why: format!(
|
||||
@@ -6736,32 +6649,18 @@ fn is_send() {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::Instrument;
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::tenant::{
|
||||
harness::{test_img, TenantHarness},
|
||||
layer_map::LayerMap,
|
||||
storage_layer::{Layer, LayerName, LayerVisibilityHint},
|
||||
storage_layer::{Layer, LayerName},
|
||||
timeline::{DeltaLayerTestDesc, EvictionError},
|
||||
PreviousHeatmap, Timeline,
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use super::HeatMapTimeline;
|
||||
|
||||
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
|
||||
assert_eq!(lhs.layers.len(), rhs.layers.len());
|
||||
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
|
||||
for (l, r) in lhs_rhs {
|
||||
assert_eq!(l.name, r.name);
|
||||
assert_eq!(l.metadata, r.metadata);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_heatmap_generation() {
|
||||
let harness = TenantHarness::create("heatmap_generation").await.unwrap();
|
||||
@@ -6835,7 +6734,7 @@ mod tests {
|
||||
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
|
||||
|
||||
let mut last_lsn = Lsn::MAX;
|
||||
for layer in &heatmap.layers {
|
||||
for layer in heatmap.layers {
|
||||
// Covered layer should be omitted
|
||||
assert!(layer.name != covered_delta.layer_name());
|
||||
|
||||
@@ -6850,144 +6749,6 @@ mod tests {
|
||||
last_lsn = layer_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
// Evict all the layers and stash the old heatmap in the timeline.
|
||||
// This simulates a migration to a cold secondary location.
|
||||
|
||||
let guard = timeline.layers.read().await;
|
||||
let mut all_layers = Vec::new();
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
for layer in guard.likely_resident_layers() {
|
||||
all_layers.push(layer.clone());
|
||||
layer.evict_and_wait(forever).await.unwrap();
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
timeline
|
||||
.previous_heatmap
|
||||
.store(Some(Arc::new(PreviousHeatmap::Active {
|
||||
heatmap: heatmap.clone(),
|
||||
read_at: std::time::Instant::now(),
|
||||
})));
|
||||
|
||||
// Generate a new heatmap and assert that it contains the same layers as the old one.
|
||||
let post_migration_heatmap = timeline.generate_heatmap().await.unwrap();
|
||||
assert_heatmaps_have_same_layers(&heatmap, &post_migration_heatmap);
|
||||
|
||||
// Download each layer one by one. Generate the heatmap at each step and check
|
||||
// that it's stable.
|
||||
for layer in all_layers {
|
||||
if layer.visibility() == LayerVisibilityHint::Covered {
|
||||
continue;
|
||||
}
|
||||
|
||||
eprintln!("Downloading {layer} and re-generating heatmap");
|
||||
|
||||
let _resident = layer
|
||||
.download_and_keep_resident()
|
||||
.instrument(tracing::info_span!(
|
||||
parent: None,
|
||||
"download_layer",
|
||||
tenant_id = %timeline.tenant_shard_id.tenant_id,
|
||||
shard_id = %timeline.tenant_shard_id.shard_slug(),
|
||||
timeline_id = %timeline.timeline_id
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post_download_heatmap = timeline.generate_heatmap().await.unwrap();
|
||||
assert_heatmaps_have_same_layers(&heatmap, &post_download_heatmap);
|
||||
}
|
||||
|
||||
// Everything from the post-migration heatmap is now resident.
|
||||
// Check that we drop it from memory.
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_previous_heatmap_obsoletion() {
|
||||
let harness = TenantHarness::create("heatmap_previous_heatmap_obsoletion")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let l0_delta = DeltaLayerTestDesc::new(
|
||||
Lsn(0x20)..Lsn(0x30),
|
||||
Key::from_hex("000000000000000000000000000000000000").unwrap()
|
||||
..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(),
|
||||
vec![(
|
||||
Key::from_hex("720000000033333333444444445500000000").unwrap(),
|
||||
Lsn(0x25),
|
||||
Value::Image(test_img("foo")),
|
||||
)],
|
||||
);
|
||||
|
||||
let image_layer = (
|
||||
Lsn(0x40),
|
||||
vec![(
|
||||
Key::from_hex("620000000033333333444444445500000000").unwrap(),
|
||||
test_img("bar"),
|
||||
)],
|
||||
);
|
||||
|
||||
let delta_layers = vec![l0_delta];
|
||||
let image_layers = vec![image_layer];
|
||||
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TimelineId::generate(),
|
||||
Lsn(0x10),
|
||||
14,
|
||||
&ctx,
|
||||
delta_layers,
|
||||
image_layers,
|
||||
Lsn(0x100),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Layer visibility is an input to heatmap generation, so refresh it first
|
||||
timeline.update_layer_visibility().await.unwrap();
|
||||
|
||||
let heatmap = timeline
|
||||
.generate_heatmap()
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
// Both layers should be in the heatmap
|
||||
assert!(!heatmap.layers.is_empty());
|
||||
|
||||
// Now simulate a migration.
|
||||
timeline
|
||||
.previous_heatmap
|
||||
.store(Some(Arc::new(PreviousHeatmap::Active {
|
||||
heatmap: heatmap.clone(),
|
||||
read_at: std::time::Instant::now(),
|
||||
})));
|
||||
|
||||
// Evict all the layers in the previous heatmap
|
||||
let guard = timeline.layers.read().await;
|
||||
let forever = std::time::Duration::from_secs(120);
|
||||
for layer in guard.likely_resident_layers() {
|
||||
layer.evict_and_wait(forever).await.unwrap();
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
// Generate a new heatmap and check that the previous heatmap
|
||||
// has been marked obsolete.
|
||||
let post_eviction_heatmap = timeline
|
||||
.generate_heatmap()
|
||||
.await
|
||||
.expect("Infallible while timeline is not shut down");
|
||||
|
||||
assert!(post_eviction_heatmap.layers.is_empty());
|
||||
assert!(matches!(
|
||||
timeline.previous_heatmap.load().as_deref(),
|
||||
Some(PreviousHeatmap::Obsolete)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -852,7 +852,7 @@ impl Timeline {
|
||||
//
|
||||
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
|
||||
// are rewriting layers.
|
||||
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
|
||||
let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
|
||||
|
||||
tracing::info!(
|
||||
"latest_gc_cutoff: {}, pitr cutoff {}",
|
||||
@@ -2202,7 +2202,7 @@ impl Timeline {
|
||||
|
||||
// TODO: ensure the child branches will not use anything below the watermark, or consider
|
||||
// them when computing the watermark.
|
||||
gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
|
||||
gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
|
||||
}
|
||||
|
||||
/// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
|
||||
|
||||
@@ -294,7 +294,6 @@ impl DeleteTimelineFlow {
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
None, // Ancestor is not needed for deletion.
|
||||
None, // Previous heatmap is not needed for deletion
|
||||
tenant.get_timeline_resources_for(remote_client),
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
#include "utils/guc.h"
|
||||
|
||||
#include "extension_server.h"
|
||||
#include "extension_server.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
static int extension_server_port = 0;
|
||||
@@ -45,7 +45,7 @@ neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
handle = alloc_curl_handle();
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 60L /* seconds */ );
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ );
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||
|
||||
@@ -3765,7 +3765,7 @@ neon_dbsize(Oid dbNode)
|
||||
* neon_truncate() -- Truncate relation to specified number of blocks.
|
||||
*/
|
||||
static void
|
||||
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
|
||||
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
|
||||
@@ -3780,7 +3780,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdtruncate(reln, forknum, old_blocks, nblocks);
|
||||
mdtruncate(reln, forknum, nblocks);
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -3818,7 +3818,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
mdtruncate(reln, forknum, old_blocks, nblocks);
|
||||
mdtruncate(reln, forknum, nblocks);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ static void inmem_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, BlockNumber nblocks);
|
||||
static BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
|
||||
static void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber old_blocks, BlockNumber nblocks);
|
||||
BlockNumber nblocks);
|
||||
static void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static void inmem_registersync(SMgrRelation reln, ForkNumber forknum);
|
||||
@@ -345,7 +345,7 @@ inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
* inmem_truncate() -- Truncate relation to specified number of blocks.
|
||||
*/
|
||||
static void
|
||||
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
|
||||
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@@ -501,7 +501,7 @@ impl Session {
|
||||
_guard: Metrics::get()
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
.guard(RedisMsgKind::HDel),
|
||||
.guard(RedisMsgKind::HSet),
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
|
||||
use http_utils::error::HttpErrorBody;
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
|
||||
use std::error::Error as _;
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
@@ -32,9 +32,6 @@ pub enum Error {
|
||||
/// Status is not ok; parsed error in body as `HttpErrorBody`.
|
||||
#[error("safekeeper API: {1}")]
|
||||
ApiError(StatusCode, String),
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -127,10 +124,9 @@ impl Client {
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
|
||||
pub async fn utilization(&self) -> Result<reqwest::Response> {
|
||||
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
|
||||
let resp = self.get(&uri).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
self.get(&uri).await
|
||||
}
|
||||
|
||||
async fn post<B: serde::Serialize, U: IntoUrl>(
|
||||
|
||||
@@ -310,12 +310,9 @@ impl WalBackupTask {
|
||||
retry_attempt = 0;
|
||||
}
|
||||
Err(e) => {
|
||||
// We might have managed to upload some segment even though
|
||||
// some later in the range failed, so log backup_lsn
|
||||
// separately.
|
||||
error!(
|
||||
"failed while offloading range {}-{}, backup_lsn {}: {:?}",
|
||||
backup_lsn, commit_lsn, backup_lsn, e
|
||||
"failed while offloading range {}-{}: {:?}",
|
||||
backup_lsn, commit_lsn, e
|
||||
);
|
||||
|
||||
retry_attempt = retry_attempt.saturating_add(1);
|
||||
@@ -341,13 +338,6 @@ async fn backup_lsn_range(
|
||||
let start_lsn = *backup_lsn;
|
||||
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
|
||||
|
||||
info!(
|
||||
"offloading segnos {:?} of range [{}-{})",
|
||||
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
);
|
||||
|
||||
// Pool of concurrent upload tasks. We use `FuturesOrdered` to
|
||||
// preserve order of uploads, and update `backup_lsn` only after
|
||||
// all previous uploads are finished.
|
||||
@@ -384,10 +374,10 @@ async fn backup_lsn_range(
|
||||
}
|
||||
|
||||
info!(
|
||||
"offloaded segnos {:?} of range [{}-{})",
|
||||
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
|
||||
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
start_lsn,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -32,8 +32,6 @@ postgres_connection.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
rustls-native-certs.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use safekeeper_api::models::SafekeeperUtilization;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
future::Future,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -13,15 +9,15 @@ use tokio_util::sync::CancellationToken;
|
||||
use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
|
||||
|
||||
use thiserror::Error;
|
||||
use utils::{id::NodeId, logging::SecretString};
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::{node::Node, safekeeper::Safekeeper};
|
||||
use crate::node::Node;
|
||||
|
||||
struct HeartbeaterTask<Server, State> {
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
|
||||
struct HeartbeaterTask {
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
|
||||
cancel: CancellationToken,
|
||||
|
||||
state: HashMap<NodeId, State>,
|
||||
state: HashMap<NodeId, PageserverState>,
|
||||
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
@@ -40,17 +36,8 @@ pub(crate) enum PageserverState {
|
||||
Offline,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum SafekeeperState {
|
||||
Available {
|
||||
last_seen_at: Instant,
|
||||
utilization: SafekeeperUtilization,
|
||||
},
|
||||
Offline,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
|
||||
pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum HeartbeaterError {
|
||||
@@ -58,28 +45,23 @@ pub(crate) enum HeartbeaterError {
|
||||
Cancel,
|
||||
}
|
||||
|
||||
struct HeartbeatRequest<Server, State> {
|
||||
servers: Arc<HashMap<NodeId, Server>>,
|
||||
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
|
||||
struct HeartbeatRequest {
|
||||
pageservers: Arc<HashMap<NodeId, Node>>,
|
||||
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
|
||||
}
|
||||
|
||||
pub(crate) struct Heartbeater<Server, State> {
|
||||
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
|
||||
pub(crate) struct Heartbeater {
|
||||
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
|
||||
}
|
||||
|
||||
#[allow(private_bounds)]
|
||||
impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
|
||||
where
|
||||
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
|
||||
{
|
||||
impl Heartbeater {
|
||||
pub(crate) fn new(
|
||||
jwt_token: Option<String>,
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
let (sender, receiver) =
|
||||
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
|
||||
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
|
||||
let mut heartbeater = HeartbeaterTask::new(
|
||||
receiver,
|
||||
jwt_token,
|
||||
@@ -94,12 +76,12 @@ where
|
||||
|
||||
pub(crate) async fn heartbeat(
|
||||
&self,
|
||||
servers: Arc<HashMap<NodeId, Server>>,
|
||||
) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
|
||||
pageservers: Arc<HashMap<NodeId, Node>>,
|
||||
) -> Result<AvailablityDeltas, HeartbeaterError> {
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
self.sender
|
||||
.send(HeartbeatRequest {
|
||||
servers,
|
||||
pageservers,
|
||||
reply: sender,
|
||||
})
|
||||
.map_err(|_| HeartbeaterError::Cancel)?;
|
||||
@@ -111,12 +93,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Server, State: Debug> HeartbeaterTask<Server, State>
|
||||
where
|
||||
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
|
||||
{
|
||||
impl HeartbeaterTask {
|
||||
fn new(
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
|
||||
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
|
||||
jwt_token: Option<String>,
|
||||
max_offline_interval: Duration,
|
||||
max_warming_up_interval: Duration,
|
||||
@@ -131,13 +110,14 @@ where
|
||||
jwt_token,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(&mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
request = self.receiver.recv() => {
|
||||
match request {
|
||||
Some(req) => {
|
||||
let res = self.heartbeat(req.servers).await;
|
||||
let res = self.heartbeat(req.pageservers).await;
|
||||
req.reply.send(res).unwrap();
|
||||
},
|
||||
None => { return; }
|
||||
@@ -147,20 +127,11 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait HeartBeat<Server, State> {
|
||||
fn heartbeat(
|
||||
&mut self,
|
||||
pageservers: Arc<HashMap<NodeId, Server>>,
|
||||
) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
|
||||
}
|
||||
|
||||
impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
|
||||
async fn heartbeat(
|
||||
&mut self,
|
||||
pageservers: Arc<HashMap<NodeId, Node>>,
|
||||
) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
|
||||
) -> Result<AvailablityDeltas, HeartbeaterError> {
|
||||
let mut new_state = HashMap::new();
|
||||
|
||||
let mut heartbeat_futs = FuturesUnordered::new();
|
||||
@@ -301,121 +272,3 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
Ok(AvailablityDeltas(deltas))
|
||||
}
|
||||
}
|
||||
|
||||
impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
|
||||
async fn heartbeat(
|
||||
&mut self,
|
||||
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
|
||||
) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
|
||||
let mut new_state = HashMap::new();
|
||||
|
||||
let mut heartbeat_futs = FuturesUnordered::new();
|
||||
for (node_id, sk) in &*safekeepers {
|
||||
heartbeat_futs.push({
|
||||
let jwt_token = self
|
||||
.jwt_token
|
||||
.as_ref()
|
||||
.map(|t| SecretString::from(t.to_owned()));
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
async move {
|
||||
let response = sk
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_utilization().await },
|
||||
&jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(1),
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
let status = match response {
|
||||
Ok(utilization) => SafekeeperState::Available {
|
||||
last_seen_at: Instant::now(),
|
||||
utilization,
|
||||
},
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
// This indicates cancellation of the request.
|
||||
// We ignore the node in this case.
|
||||
return None;
|
||||
}
|
||||
Err(_) => SafekeeperState::Offline,
|
||||
};
|
||||
|
||||
Some((*node_id, status))
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
let maybe_status = tokio::select! {
|
||||
next = heartbeat_futs.next() => {
|
||||
match next {
|
||||
Some(result) => result,
|
||||
None => { break; }
|
||||
}
|
||||
},
|
||||
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
|
||||
};
|
||||
|
||||
if let Some((node_id, status)) = maybe_status {
|
||||
new_state.insert(node_id, status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut offline = 0;
|
||||
for state in new_state.values() {
|
||||
match state {
|
||||
SafekeeperState::Offline { .. } => offline += 1,
|
||||
SafekeeperState::Available { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Heartbeat round complete for {} safekeepers, {} offline",
|
||||
new_state.len(),
|
||||
offline
|
||||
);
|
||||
|
||||
let mut deltas = Vec::new();
|
||||
let now = Instant::now();
|
||||
for (node_id, sk_state) in new_state.iter_mut() {
|
||||
use std::collections::hash_map::Entry::*;
|
||||
let entry = self.state.entry(*node_id);
|
||||
|
||||
let mut needs_update = false;
|
||||
match entry {
|
||||
Occupied(ref occ) => match (occ.get(), &sk_state) {
|
||||
(SafekeeperState::Offline, SafekeeperState::Offline) => {}
|
||||
(SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
|
||||
if now - *last_seen_at >= self.max_offline_interval {
|
||||
deltas.push((*node_id, sk_state.clone()));
|
||||
needs_update = true;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
deltas.push((*node_id, sk_state.clone()));
|
||||
needs_update = true;
|
||||
}
|
||||
},
|
||||
Vacant(_) => {
|
||||
// This is a new node. Don't generate a delta for it.
|
||||
deltas.push((*node_id, sk_state.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
match entry {
|
||||
Occupied(mut occ) if needs_update => {
|
||||
(*occ.get_mut()) = sk_state.clone();
|
||||
}
|
||||
Vacant(vac) => {
|
||||
vac.insert(sk_state.clone());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AvailablityDeltas(deltas))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,8 +17,6 @@ mod pageserver_client;
|
||||
mod peer_client;
|
||||
pub mod persistence;
|
||||
mod reconciler;
|
||||
mod safekeeper;
|
||||
mod safekeeper_client;
|
||||
mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
|
||||
@@ -80,11 +80,6 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Count of HTTP requests to the safekeeper that resulted in an error,
|
||||
/// broken down by the safekeeper node id, request name and method
|
||||
pub(crate) storage_controller_safekeeper_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Latency of HTTP requests to the pageserver, broken down by pageserver
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
@@ -92,13 +87,6 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
|
||||
pub(crate) storage_controller_safekeeper_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
|
||||
/// broken down by the pageserver node id, request name and method
|
||||
pub(crate) storage_controller_passthrough_request_error:
|
||||
|
||||
@@ -1185,6 +1185,23 @@ impl Persistence {
|
||||
Ok(safekeepers)
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeeper_get(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<SafekeeperPersistence, DatabaseError> {
|
||||
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
|
||||
self.with_conn(move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(safekeepers
|
||||
.filter(id_column.eq(&id))
|
||||
.select(SafekeeperPersistence::as_select())
|
||||
.get_result(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeeper_upsert(
|
||||
&self,
|
||||
record: SafekeeperUpsert,
|
||||
@@ -1537,21 +1554,6 @@ pub(crate) struct SafekeeperPersistence {
|
||||
}
|
||||
|
||||
impl SafekeeperPersistence {
|
||||
pub(crate) fn from_upsert(
|
||||
upsert: SafekeeperUpsert,
|
||||
scheduling_policy: SkSchedulingPolicy,
|
||||
) -> Self {
|
||||
crate::persistence::SafekeeperPersistence {
|
||||
id: upsert.id,
|
||||
region_id: upsert.region_id,
|
||||
version: upsert.version,
|
||||
host: upsert.host,
|
||||
port: upsert.port,
|
||||
http_port: upsert.http_port,
|
||||
availability_zone_id: upsert.availability_zone_id,
|
||||
scheduling_policy: String::from(scheduling_policy),
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
let scheduling_policy =
|
||||
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
};
|
||||
@@ -162,22 +162,6 @@ impl ReconcilerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&MigrationConfig> for ReconcilerConfig {
|
||||
fn from(value: &MigrationConfig) -> Self {
|
||||
let mut builder = ReconcilerConfigBuilder::new();
|
||||
|
||||
if let Some(timeout) = value.secondary_warmup_timeout {
|
||||
builder = builder.secondary_warmup_timeout(timeout)
|
||||
}
|
||||
|
||||
if let Some(timeout) = value.secondary_download_request_timeout {
|
||||
builder = builder.secondary_download_request_timeout(timeout)
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
|
||||
pub(crate) struct ReconcileUnits {
|
||||
_sem_units: tokio::sync::OwnedSemaphorePermit,
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
use std::{str::FromStr, time::Duration};
|
||||
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
use reqwest::StatusCode;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{backoff, id::NodeId, logging::SecretString};
|
||||
|
||||
use crate::{
|
||||
heartbeater::SafekeeperState,
|
||||
persistence::{DatabaseError, SafekeeperPersistence},
|
||||
safekeeper_client::SafekeeperClient,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Safekeeper {
|
||||
pub(crate) skp: SafekeeperPersistence,
|
||||
cancel: CancellationToken,
|
||||
listen_http_addr: String,
|
||||
listen_http_port: u16,
|
||||
id: NodeId,
|
||||
availability: SafekeeperState,
|
||||
}
|
||||
|
||||
impl Safekeeper {
|
||||
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
|
||||
Self {
|
||||
cancel,
|
||||
listen_http_addr: skp.host.clone(),
|
||||
listen_http_port: skp.http_port as u16,
|
||||
id: NodeId(skp.id as u64),
|
||||
skp,
|
||||
availability: SafekeeperState::Offline,
|
||||
}
|
||||
}
|
||||
pub(crate) fn base_url(&self) -> String {
|
||||
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
|
||||
}
|
||||
|
||||
pub(crate) fn get_id(&self) -> NodeId {
|
||||
self.id
|
||||
}
|
||||
pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
self.skp.as_describe_response()
|
||||
}
|
||||
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
|
||||
self.availability = availability;
|
||||
}
|
||||
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
&self,
|
||||
mut op: O,
|
||||
jwt: &Option<SecretString>,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> mgmt_api::Result<T>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F,
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>>,
|
||||
{
|
||||
fn is_fatal(e: &mgmt_api::Error) -> bool {
|
||||
use mgmt_api::Error::*;
|
||||
match e {
|
||||
ReceiveBody(_) | ReceiveErrorBody(_) => false,
|
||||
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
|
||||
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
|
||||
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
|
||||
ApiError(_, _) => true,
|
||||
Cancelled => true,
|
||||
}
|
||||
}
|
||||
|
||||
backoff::retry(
|
||||
|| {
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.timeout(timeout)
|
||||
.build()
|
||||
.expect("Failed to construct HTTP client");
|
||||
|
||||
let client = SafekeeperClient::from_client(
|
||||
self.get_id(),
|
||||
http_client,
|
||||
self.base_url(),
|
||||
jwt.clone(),
|
||||
);
|
||||
|
||||
let node_cancel_fut = self.cancel.cancelled();
|
||||
|
||||
let op_fut = op(client);
|
||||
|
||||
async {
|
||||
tokio::select! {
|
||||
r = op_fut=> {r},
|
||||
_ = node_cancel_fut => {
|
||||
Err(mgmt_api::Error::Cancelled)
|
||||
}}
|
||||
}
|
||||
},
|
||||
is_fatal,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
&format!(
|
||||
"Call to node {} ({}:{}) management API",
|
||||
self.id, self.listen_http_addr, self.listen_http_port
|
||||
),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(mgmt_api::Error::Cancelled))
|
||||
}
|
||||
|
||||
pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
|
||||
let crate::persistence::SafekeeperUpsert {
|
||||
active: _,
|
||||
availability_zone_id: _,
|
||||
host,
|
||||
http_port,
|
||||
id,
|
||||
port: _,
|
||||
region_id: _,
|
||||
version: _,
|
||||
} = record.clone();
|
||||
if id != self.id.0 as i64 {
|
||||
// The way the function is called ensures this. If we regress on that, it's a bug.
|
||||
panic!(
|
||||
"id can't be changed via update_from_record function: {id} != {}",
|
||||
self.id.0
|
||||
);
|
||||
}
|
||||
self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
|
||||
record,
|
||||
SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
|
||||
);
|
||||
self.listen_http_port = http_port as u16;
|
||||
self.listen_http_addr = host;
|
||||
}
|
||||
}
|
||||
@@ -1,105 +0,0 @@
|
||||
use crate::metrics::PageserverRequestLabelGroup;
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_client::mgmt_api::{Client, Result};
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
///
|
||||
/// Analogous to [`crate::pageserver_client::PageserverClient`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SafekeeperClient {
|
||||
inner: Client,
|
||||
node_id_label: String,
|
||||
}
|
||||
|
||||
macro_rules! measured_request {
|
||||
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
|
||||
let labels = PageserverRequestLabelGroup {
|
||||
pageserver_id: $node_id,
|
||||
path: $name,
|
||||
method: $method,
|
||||
};
|
||||
|
||||
let latency = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safekeeper_request_latency;
|
||||
let _timer_guard = latency.start_timer(labels.clone());
|
||||
|
||||
let res = $invoke;
|
||||
|
||||
if res.is_err() {
|
||||
let error_counters = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_pageserver_request_error;
|
||||
error_counters.inc(labels)
|
||||
}
|
||||
|
||||
res
|
||||
}};
|
||||
}
|
||||
|
||||
impl SafekeeperClient {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(
|
||||
node_id: NodeId,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_client(
|
||||
node_id: NodeId,
|
||||
raw_client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
req: &TimelineCreateRequest,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"create_timeline",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.create_timeline(req).await
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"delete_timeline",
|
||||
crate::metrics::Method::Delete,
|
||||
&self.node_id_label,
|
||||
self.inner.delete_timeline(tenant_id, timeline_id).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_utilization(&self) -> Result<SafekeeperUtilization> {
|
||||
measured_request!(
|
||||
"utilization",
|
||||
crate::metrics::Method::Get,
|
||||
&self.node_id_label,
|
||||
self.inner.utilization().await
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
|
||||
use hyper::Uri;
|
||||
use safekeeper_api::models::SafekeeperUtilization;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
@@ -21,7 +20,6 @@ use crate::{
|
||||
},
|
||||
compute_hook::{self, NotifyError},
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
heartbeater::SafekeeperState,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
leadership::Leadership,
|
||||
metrics,
|
||||
@@ -31,7 +29,6 @@ use crate::{
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
safekeeper::Safekeeper,
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
|
||||
@@ -209,8 +206,6 @@ struct ServiceState {
|
||||
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
|
||||
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
|
||||
|
||||
scheduler: Scheduler,
|
||||
|
||||
/// Ongoing background operation on the cluster if any is running.
|
||||
@@ -277,7 +272,6 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
impl ServiceState {
|
||||
fn new(
|
||||
nodes: HashMap<NodeId, Node>,
|
||||
safekeepers: HashMap<NodeId, Safekeeper>,
|
||||
tenants: BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: Scheduler,
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
@@ -289,7 +283,6 @@ impl ServiceState {
|
||||
leadership_status: initial_leadership_status,
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
safekeepers: Arc::new(safekeepers),
|
||||
scheduler,
|
||||
ongoing_operation: None,
|
||||
delayed_reconcile_rx,
|
||||
@@ -306,23 +299,6 @@ impl ServiceState {
|
||||
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn parts_mut_sk(
|
||||
&mut self,
|
||||
) -> (
|
||||
&mut Arc<HashMap<NodeId, Node>>,
|
||||
&mut Arc<HashMap<NodeId, Safekeeper>>,
|
||||
&mut BTreeMap<TenantShardId, TenantShard>,
|
||||
&mut Scheduler,
|
||||
) {
|
||||
(
|
||||
&mut self.nodes,
|
||||
&mut self.safekeepers,
|
||||
&mut self.tenants,
|
||||
&mut self.scheduler,
|
||||
)
|
||||
}
|
||||
|
||||
fn get_leadership_status(&self) -> LeadershipStatus {
|
||||
self.leadership_status
|
||||
}
|
||||
@@ -421,8 +397,7 @@ pub struct Service {
|
||||
compute_hook: Arc<ComputeHook>,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
|
||||
heartbeater_ps: Heartbeater<Node, PageserverState>,
|
||||
heartbeater_sk: Heartbeater<Safekeeper, SafekeeperState>,
|
||||
heartbeater: Heartbeater,
|
||||
|
||||
// Channel for background cleanup from failed operations that require cleanup, such as shard split
|
||||
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
|
||||
@@ -632,8 +607,7 @@ impl Service {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.nodes.clone()
|
||||
};
|
||||
let (mut nodes_online, mut sks_online) =
|
||||
self.initial_heartbeat_round(all_nodes.keys()).await;
|
||||
let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
|
||||
|
||||
// List of tenants for which we will attempt to notify compute of their location at startup
|
||||
let mut compute_notifications = Vec::new();
|
||||
@@ -642,7 +616,7 @@ impl Service {
|
||||
tracing::info!("Populating tenant shards' states from initial pageserver scan...");
|
||||
let shard_count = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, safekeepers, tenants, scheduler) = locked.parts_mut_sk();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
@@ -654,17 +628,6 @@ impl Service {
|
||||
}
|
||||
*nodes = Arc::new(new_nodes);
|
||||
|
||||
let mut new_sks = (**safekeepers).clone();
|
||||
for (node_id, node) in new_sks.iter_mut() {
|
||||
if let Some((utilization, last_seen_at)) = sks_online.remove(node_id) {
|
||||
node.set_availability(SafekeeperState::Available {
|
||||
utilization,
|
||||
last_seen_at,
|
||||
});
|
||||
}
|
||||
}
|
||||
*safekeepers = Arc::new(new_sks);
|
||||
|
||||
for (tenant_shard_id, observed_state) in observed.0 {
|
||||
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
for node_id in observed_state.locations.keys() {
|
||||
@@ -773,10 +736,7 @@ impl Service {
|
||||
async fn initial_heartbeat_round<'a>(
|
||||
&self,
|
||||
node_ids: impl Iterator<Item = &'a NodeId>,
|
||||
) -> (
|
||||
HashMap<NodeId, PageserverUtilization>,
|
||||
HashMap<NodeId, (SafekeeperUtilization, Instant)>,
|
||||
) {
|
||||
) -> HashMap<NodeId, PageserverUtilization> {
|
||||
assert!(!self.startup_complete.is_ready());
|
||||
|
||||
let all_nodes = {
|
||||
@@ -796,20 +756,14 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
let all_sks = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
tracing::info!("Sending initial heartbeats...");
|
||||
let res_ps = self
|
||||
.heartbeater_ps
|
||||
let res = self
|
||||
.heartbeater
|
||||
.heartbeat(Arc::new(nodes_to_heartbeat))
|
||||
.await;
|
||||
let res_sk = self.heartbeater_sk.heartbeat(all_sks).await;
|
||||
|
||||
let mut online_nodes = HashMap::new();
|
||||
if let Ok(deltas) = res_ps {
|
||||
if let Ok(deltas) = res {
|
||||
for (node_id, status) in deltas.0 {
|
||||
match status {
|
||||
PageserverState::Available { utilization, .. } => {
|
||||
@@ -823,22 +777,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
let mut online_sks = HashMap::new();
|
||||
if let Ok(deltas) = res_sk {
|
||||
for (node_id, status) in deltas.0 {
|
||||
match status {
|
||||
SafekeeperState::Available {
|
||||
utilization,
|
||||
last_seen_at,
|
||||
} => {
|
||||
online_sks.insert(node_id, (utilization, last_seen_at));
|
||||
}
|
||||
SafekeeperState::Offline => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(online_nodes, online_sks)
|
||||
online_nodes
|
||||
}
|
||||
|
||||
/// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
|
||||
@@ -1045,14 +984,8 @@ impl Service {
|
||||
locked.nodes.clone()
|
||||
};
|
||||
|
||||
let safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
let res_ps = self.heartbeater_ps.heartbeat(nodes).await;
|
||||
let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await;
|
||||
if let Ok(deltas) = res_ps {
|
||||
let res = self.heartbeater.heartbeat(nodes).await;
|
||||
if let Ok(deltas) = res {
|
||||
let mut to_handle = Vec::default();
|
||||
|
||||
for (node_id, state) in deltas.0 {
|
||||
@@ -1153,18 +1086,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Ok(deltas) = res_sk {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
for (id, state) in deltas.0 {
|
||||
let Some(sk) = safekeepers.get_mut(&id) else {
|
||||
tracing::info!("Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}");
|
||||
continue;
|
||||
};
|
||||
sk.set_availability(state);
|
||||
}
|
||||
locked.safekeepers = Arc::new(safekeepers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1390,17 +1311,6 @@ impl Service {
|
||||
.storage_controller_pageserver_nodes
|
||||
.set(nodes.len() as i64);
|
||||
|
||||
tracing::info!("Loading safekeepers from database...");
|
||||
let safekeepers = persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
|
||||
.collect::<Vec<_>>();
|
||||
let safekeepers: HashMap<NodeId, Safekeeper> =
|
||||
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
|
||||
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
|
||||
|
||||
tracing::info!("Loading shards from database...");
|
||||
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
|
||||
tracing::info!(
|
||||
@@ -1527,14 +1437,7 @@ impl Service {
|
||||
let cancel = CancellationToken::new();
|
||||
let reconcilers_cancel = cancel.child_token();
|
||||
|
||||
let heartbeater_ps = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let heartbeater_sk = Heartbeater::new(
|
||||
let heartbeater = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
@@ -1550,7 +1453,6 @@ impl Service {
|
||||
let this = Arc::new(Self {
|
||||
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
|
||||
nodes,
|
||||
safekeepers,
|
||||
tenants,
|
||||
scheduler,
|
||||
delayed_reconcile_rx,
|
||||
@@ -1560,8 +1462,7 @@ impl Service {
|
||||
persistence,
|
||||
compute_hook: Arc::new(ComputeHook::new(config.clone())),
|
||||
result_tx,
|
||||
heartbeater_ps,
|
||||
heartbeater_sk,
|
||||
heartbeater,
|
||||
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
|
||||
config.reconciler_concurrency,
|
||||
)),
|
||||
@@ -5213,12 +5114,7 @@ impl Service {
|
||||
shard.sequence = shard.sequence.next();
|
||||
}
|
||||
|
||||
let reconciler_config = match migrate_req.migration_config {
|
||||
Some(cfg) => (&cfg).into(),
|
||||
None => ReconcilerConfig::default(),
|
||||
};
|
||||
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
|
||||
self.maybe_reconcile_shard(shard, nodes)
|
||||
};
|
||||
|
||||
if let Some(waiter) = waiter {
|
||||
@@ -7765,54 +7661,29 @@ impl Service {
|
||||
pub(crate) async fn safekeepers_list(
|
||||
&self,
|
||||
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut list = locked
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.1.describe_response())
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
list.sort_by_key(|v| v.id);
|
||||
Ok(list)
|
||||
self.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|v| v.as_describe_response())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
}
|
||||
|
||||
pub(crate) async fn get_safekeeper(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let sk = locked
|
||||
.safekeepers
|
||||
.get(&NodeId(id as u64))
|
||||
.ok_or(diesel::result::Error::NotFound)?;
|
||||
sk.describe_response()
|
||||
self.persistence
|
||||
.safekeeper_get(id)
|
||||
.await
|
||||
.and_then(|v| v.as_describe_response())
|
||||
}
|
||||
|
||||
pub(crate) async fn upsert_safekeeper(
|
||||
&self,
|
||||
record: crate::persistence::SafekeeperUpsert,
|
||||
) -> Result<(), DatabaseError> {
|
||||
let node_id = NodeId(record.id as u64);
|
||||
self.persistence.safekeeper_upsert(record.clone()).await?;
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
match safekeepers.entry(node_id) {
|
||||
std::collections::hash_map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().update_from_record(record);
|
||||
}
|
||||
std::collections::hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert(Safekeeper::from_persistence(
|
||||
crate::persistence::SafekeeperPersistence::from_upsert(
|
||||
record,
|
||||
SkSchedulingPolicy::Pause,
|
||||
),
|
||||
CancellationToken::new(),
|
||||
));
|
||||
}
|
||||
}
|
||||
locked.safekeepers = Arc::new(safekeepers);
|
||||
}
|
||||
Ok(())
|
||||
self.persistence.safekeeper_upsert(record).await
|
||||
}
|
||||
|
||||
pub(crate) async fn set_safekeeper_scheduling_policy(
|
||||
@@ -7822,20 +7693,7 @@ impl Service {
|
||||
) -> Result<(), DatabaseError> {
|
||||
self.persistence
|
||||
.set_safekeeper_scheduling_policy(id, scheduling_policy)
|
||||
.await?;
|
||||
let node_id = NodeId(id as u64);
|
||||
// After the change has been persisted successfully, update the in-memory state
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut safekeepers = (*locked.safekeepers).clone();
|
||||
let sk = safekeepers
|
||||
.get_mut(&node_id)
|
||||
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
|
||||
sk.skp.scheduling_policy = String::from(scheduling_policy);
|
||||
|
||||
locked.safekeepers = Arc::new(safekeepers);
|
||||
}
|
||||
Ok(())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn update_shards_preferred_azs(
|
||||
|
||||
@@ -3,7 +3,6 @@ from __future__ import annotations
|
||||
import abc
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import dataclasses
|
||||
import filecmp
|
||||
import json
|
||||
import os
|
||||
@@ -1676,12 +1675,6 @@ class StorageControllerLeadershipStatus(StrEnum):
|
||||
CANDIDATE = "candidate"
|
||||
|
||||
|
||||
@dataclass
|
||||
class StorageControllerMigrationConfig:
|
||||
secondary_warmup_timeout: str | None
|
||||
secondary_download_request_timeout: str | None
|
||||
|
||||
|
||||
class NeonStorageController(MetricsGetter, LogUtils):
|
||||
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
|
||||
self.env = env
|
||||
@@ -2075,20 +2068,11 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
shards: list[TenantShardId] = body["new_shards"]
|
||||
return shards
|
||||
|
||||
def tenant_shard_migrate(
|
||||
self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
dest_ps_id: int,
|
||||
config: StorageControllerMigrationConfig | None = None,
|
||||
):
|
||||
payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}
|
||||
if config is not None:
|
||||
payload["migration_config"] = dataclasses.asdict(config)
|
||||
|
||||
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate",
|
||||
json=payload,
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
|
||||
@@ -4988,13 +4972,8 @@ def check_restored_datadir_content(
|
||||
|
||||
restored_files = list_files_to_compare(restored_dir_path)
|
||||
|
||||
# pg_notify files are always ignored
|
||||
pgdata_files = [f for f in pgdata_files if not f.startswith("pg_notify")]
|
||||
restored_files = [f for f in restored_files if not f.startswith("pg_notify")]
|
||||
|
||||
# pg_xact and pg_multixact files are optional in basebackup: depending on our configuration they
|
||||
# may be omitted and loaded on demand.
|
||||
if pgdata_files != restored_files:
|
||||
# filter pg_xact and multixact files which are downloaded on demand
|
||||
pgdata_files = [
|
||||
f
|
||||
for f in pgdata_files
|
||||
|
||||
@@ -231,14 +231,14 @@ def test_pgdata_import_smoke(
|
||||
shard_zero_http = shard_zero_ps.http_client()
|
||||
shard_zero_timeline_info = shard_zero_http.timeline_detail(shard_zero["shard_id"], timeline_id)
|
||||
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
|
||||
min_readable_lsn = Lsn(shard_zero_timeline_info["min_readable_lsn"])
|
||||
latest_gc_cutoff_lsn = Lsn(shard_zero_timeline_info["latest_gc_cutoff_lsn"])
|
||||
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
|
||||
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
|
||||
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
|
||||
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
|
||||
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
|
||||
assert remote_consistent_lsn_visible == disk_consistent_lsn
|
||||
assert initdb_lsn == min_readable_lsn
|
||||
assert initdb_lsn == latest_gc_cutoff_lsn
|
||||
assert disk_consistent_lsn == initdb_lsn + 8
|
||||
assert last_record_lsn == disk_consistent_lsn
|
||||
# TODO: assert these values are the same everywhere
|
||||
|
||||
@@ -10,18 +10,14 @@ from typing import TYPE_CHECKING
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
StorageControllerMigrationConfig,
|
||||
)
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
|
||||
from fixtures.utils import run_only_on_default_postgres, skip_in_debug_build, wait_until
|
||||
from fixtures.utils import skip_in_debug_build, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -893,93 +889,3 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll
|
||||
assert progress_3["heatmap_mtime"] is not None
|
||||
assert progress_3["layers_total"] == progress_3["layers_downloaded"]
|
||||
assert progress_3["bytes_total"] == progress_3["bytes_downloaded"]
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@run_only_on_default_postgres("PG version is not interesting here")
|
||||
def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Attached":1}')
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
ps_attached = env.get_pageserver(attached_to_id)
|
||||
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
|
||||
|
||||
# Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis)
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.write_rows(128, upload=True)
|
||||
workload.stop()
|
||||
|
||||
# Expect lots of layers
|
||||
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
|
||||
|
||||
# Simulate large data by making layer downloads artifically slow
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
|
||||
|
||||
# Upload a heatmap, so that secondaries have something to download
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
|
||||
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
|
||||
# However, it pulls the heatmap, which will be important later.
|
||||
http_client = env.storage_controller.pageserver_api()
|
||||
(status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
|
||||
assert status == 202
|
||||
assert progress["heatmap_mtime"] is not None
|
||||
assert progress["layers_downloaded"] > 0
|
||||
assert progress["bytes_downloaded"] > 0
|
||||
assert progress["layers_total"] > progress["layers_downloaded"]
|
||||
assert progress["bytes_total"] > progress["bytes_downloaded"]
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Timed out.*downloading layers.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Use a custom configuration that gives up earlier than usual.
|
||||
# We can't hydrate everything anyway because of the failpoints.
|
||||
config = StorageControllerMigrationConfig(
|
||||
secondary_warmup_timeout="5s", secondary_download_request_timeout="2s"
|
||||
)
|
||||
env.storage_controller.tenant_shard_migrate(
|
||||
TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id
|
||||
|
||||
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
|
||||
heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0
|
||||
|
||||
# The new layer map should contain all the layers in the pre-migration one
|
||||
# and a new in memory layer
|
||||
assert len(heatmap_before_migration["timelines"][0]["layers"]) + 1 == len(
|
||||
heatmap_after_migration["timelines"][0]["layers"]
|
||||
)
|
||||
|
||||
log.info(
|
||||
f'Heatmap size after cold migration is {len(heatmap_after_migration["timelines"][0]["layers"])}'
|
||||
)
|
||||
|
||||
# TODO: Once we have an endpoint for rescuing the cold location, exercise it here.
|
||||
|
||||
@@ -261,7 +261,7 @@ def test_isolation(
|
||||
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)
|
||||
|
||||
# This fails with a mismatch on `pg_multixact/offsets/0000`
|
||||
post_checks(env, test_output_dir, DBNAME, endpoint)
|
||||
# post_checks(env, test_output_dir, DBNAME, endpoint)
|
||||
|
||||
|
||||
# Run extra Neon-specific pg_regress-based tests. The tests and their
|
||||
|
||||
@@ -287,7 +287,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
# Do some update so we can increment gc_cutoff
|
||||
# Do some update so we can increment latest_gc_cutoff
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
# Wait for the existing lease to expire.
|
||||
|
||||
@@ -1821,7 +1821,7 @@ def test_sharding_gc(
|
||||
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
".*could not find data for key.*",
|
||||
".*could not find data for key 020000000000000000000000000000000000.*",
|
||||
".*could not ingest record.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -318,7 +318,7 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
|
||||
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
".*could not find data for key.*",
|
||||
".*could not find data for key 020000000000000000000000000000000000.*",
|
||||
".*could not ingest record.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -566,14 +566,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
|
||||
assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix)
|
||||
|
||||
|
||||
# This test is flaky, probably because PUTs of local fs storage are not atomic.
|
||||
# Let's keep both remote storage kinds for a while to see if this is the case.
|
||||
# https://github.com/neondatabase/neon/issues/10761
|
||||
@pytest.mark.parametrize("remote_storage_kind", [s3_storage(), RemoteStorageKind.LOCAL_FS])
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 62a86dfc91...c0aedfd3ca
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 80ed91ce25...355a7c69d3
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 999cf81b10...13cf5d06c9
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 4d3a722312...4c45d78ad5
16
vendor/revisions.json
vendored
16
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.3",
|
||||
"4d3a722312b496ff7378156caa6d41c2e70c30e4"
|
||||
"17.2",
|
||||
"4c45d78ad587e4bcb4a5a7ef6931b88c6a3d575d"
|
||||
],
|
||||
"v16": [
|
||||
"16.7",
|
||||
"999cf81b101ead40e597d5cd729458d8200f4537"
|
||||
"16.6",
|
||||
"13cf5d06c98a8e9b0590ce6cdfd193a08d0a7792"
|
||||
],
|
||||
"v15": [
|
||||
"15.11",
|
||||
"80ed91ce255c765d25be0bb4a02c942fe6311fbf"
|
||||
"15.10",
|
||||
"355a7c69d3f907f3612eb406cc7b9c2f55d59b59"
|
||||
],
|
||||
"v14": [
|
||||
"14.16",
|
||||
"62a86dfc91e0c35a72f2ea5e99e6969b830c0c26"
|
||||
"14.15",
|
||||
"c0aedfd3cac447510a2db843b561f0c52901b679"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user