Compare commits

..

1 Commits

Author SHA1 Message Date
Owen Brady
c93ac95d25 pageserver_api: implement keyspace range iterator (#6435) 2025-02-12 20:07:39 +00:00
63 changed files with 460 additions and 1545 deletions

View File

@@ -892,7 +892,7 @@ jobs:
docker-hub-password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
push-neon-image-prod:
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
if: github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
needs: [ generate-image-maps, neon-image, test-images ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:
@@ -909,7 +909,7 @@ jobs:
docker-hub-password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
push-compute-image-prod:
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
if: github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
needs: [ generate-image-maps, vm-compute-node-image, test-images ]
uses: ./.github/workflows/_push-to-container-registry.yml
with:

View File

@@ -88,7 +88,7 @@ jobs:
BUILD_AND_TEST_RUN_ID=${TAG}
while true; do
gh run --repo ${GITHUB_REPOSITORY} view ${BUILD_AND_TEST_RUN_ID} --json jobs --jq '[.jobs[] | select((.name | startswith("push-neon-image-dev")) or (.name | startswith("push-compute-image-dev"))) | {"name": .name, "conclusion": .conclusion, "url": .url}]' > jobs.json
if [ $(jq '[.[] | select(.conclusion == "success")] | length' jobs.json) -eq 2 ]; then
if [ $(jq '[.[] | select(.conclusion == "success")]' jobs.json) -eq 2 ]; then
break
fi
jq -c '.[]' jobs.json | while read -r job; do

4
Cargo.lock generated
View File

@@ -1293,7 +1293,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"jsonwebtoken",
"regex",
"remote_storage",
"serde",
@@ -1321,7 +1320,6 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"notify",
@@ -6466,8 +6464,6 @@ dependencies = [
"routerify",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"safekeeper_api",
"safekeeper_client",
"scoped-futures",
"scopeguard",
"serde",

View File

@@ -50,14 +50,6 @@ RUN set -e \
&& rm -rf pg_install/build \
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
# Prepare cargo-chef recipe
FROM $REPOSITORY/$IMAGE:$TAG AS plan
WORKDIR /home/nonroot
COPY --chown=nonroot . .
RUN cargo chef prepare --recipe-path recipe.json
# Build neon binaries
FROM $REPOSITORY/$IMAGE:$TAG AS build
WORKDIR /home/nonroot
@@ -71,15 +63,9 @@ COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_i
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
COPY --from=plan /home/nonroot/recipe.json recipe.json
ARG ADDITIONAL_RUSTFLAGS=""
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
COPY --chown=nonroot . .
ARG ADDITIONAL_RUSTFLAGS
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--bin pg_sni_router \

View File

@@ -300,7 +300,6 @@ ARG CARGO_HAKARI_VERSION=0.9.33
ARG CARGO_DENY_VERSION=0.16.2
ARG CARGO_HACK_VERSION=0.6.33
ARG CARGO_NEXTEST_VERSION=0.9.85
ARG CARGO_CHEF_VERSION=0.1.71
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
@@ -315,7 +314,6 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \

View File

@@ -1750,7 +1750,7 @@ COPY --from=pg_graphql-src /ext-src/ /ext-src/
COPY --from=hypopg-src /ext-src/ /ext-src/
COPY --from=pg_hashids-src /ext-src/ /ext-src/
COPY --from=rum-src /ext-src/ /ext-src/
COPY --from=pgtap-src /ext-src/ /ext-src/
#COPY --from=pgtap-src /ext-src/ /ext-src/
COPY --from=ip4r-src /ext-src/ /ext-src/
COPY --from=prefix-src /ext-src/ /ext-src/
COPY --from=hll-src /ext-src/ /ext-src/

View File

@@ -24,7 +24,6 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true
notify.workspace = true

View File

@@ -55,7 +55,7 @@ use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
use url::Url;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{
@@ -281,7 +281,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
info!("got spec from cli argument {}", spec_json);
return Ok(CliSpecParams {
spec: Some(serde_json::from_str(spec_json)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: false,
});
}
@@ -291,7 +290,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
let file = File::open(Path::new(spec_path))?;
return Ok(CliSpecParams {
spec: Some(serde_json::from_reader(file)?),
compute_ctl_config: ComputeCtlConfig::default(),
live_config_allowed: true,
});
}
@@ -301,9 +299,8 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
};
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
Ok(resp) => Ok(CliSpecParams {
spec: resp.0,
compute_ctl_config: resp.1,
Ok(spec) => Ok(CliSpecParams {
spec,
live_config_allowed: true,
}),
Err(e) => {
@@ -320,8 +317,6 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
struct CliSpecParams {
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
spec: Option<ComputeSpec>,
#[allow(dead_code)]
compute_ctl_config: ComputeCtlConfig,
live_config_allowed: bool,
}
@@ -331,7 +326,6 @@ fn wait_spec(
CliSpecParams {
spec,
live_config_allowed,
compute_ctl_config: _,
}: CliSpecParams,
) -> Result<Arc<ComputeNode>> {
let mut new_state = ComputeState::new();

View File

@@ -11,9 +11,7 @@ use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::responses::{
ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
};
use compute_api::responses::{ControlPlaneComputeStatus, ControlPlaneSpecResponse};
use compute_api::spec::ComputeSpec;
// Do control plane request and return response if any. In case of error it
@@ -75,13 +73,14 @@ fn do_control_plane_request(
pub fn get_spec_from_control_plane(
base_uri: &str,
compute_id: &str,
) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
) -> Result<Option<ComputeSpec>> {
let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
Ok(v) => v,
Err(_) => "".to_string(),
};
let mut attempt = 1;
let mut spec: Result<Option<ComputeSpec>> = Ok(None);
info!("getting spec from control plane: {}", cp_uri);
@@ -91,7 +90,7 @@ pub fn get_spec_from_control_plane(
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
while attempt < 4 {
let result = match do_control_plane_request(&cp_uri, &jwt) {
spec = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => {
CPLANE_REQUESTS_TOTAL
.with_label_values(&[
@@ -100,10 +99,10 @@ pub fn get_spec_from_control_plane(
])
.inc();
match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
ControlPlaneComputeStatus::Empty => Ok(None),
ControlPlaneComputeStatus::Attached => {
if let Some(spec) = spec_resp.spec {
Ok((Some(spec), spec_resp.compute_ctl_config))
Ok(Some(spec))
} else {
bail!("compute is attached, but spec is empty")
}
@@ -122,10 +121,10 @@ pub fn get_spec_from_control_plane(
}
};
if let Err(e) = &result {
if let Err(e) = &spec {
error!("attempt {} to get spec failed with: {}", attempt, e);
} else {
return result;
return spec;
}
attempt += 1;
@@ -133,9 +132,7 @@ pub fn get_spec_from_control_plane(
}
// All attempts failed, return error.
Err(anyhow::anyhow!(
"Exhausted all attempts to retrieve the spec from the control plane"
))
spec
}
/// Check `pg_hba.conf` and update if needed to allow external connections.

View File

@@ -48,8 +48,6 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::Database;
use compute_api::spec::PgIdent;
use compute_api::spec::RemoteExtSpec;
@@ -882,13 +880,10 @@ impl Endpoint {
self.external_http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.body(
serde_json::to_string(&ConfigurationRequest {
spec,
compute_ctl_config: ComputeCtlConfig::default(),
})
.unwrap(),
)
.body(format!(
"{{\"spec\":{}}}",
serde_json::to_string_pretty(&spec)?
))
.send()
.await?;

View File

@@ -838,10 +838,7 @@ impl StorageController {
self.dispatch(
Method::PUT,
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
Some(TenantShardMigrateRequest {
node_id,
migration_config: None,
}),
Some(TenantShardMigrateRequest { node_id }),
)
.await
}

View File

@@ -609,10 +609,7 @@ async fn main() -> anyhow::Result<()> {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest {
node_id: node,
migration_config: None,
};
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
@@ -626,10 +623,7 @@ async fn main() -> anyhow::Result<()> {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest {
node_id: node,
migration_config: None,
};
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
@@ -1088,10 +1082,7 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
Some(TenantShardMigrateRequest {
node_id: mv.to,
migration_config: None,
}),
Some(TenantShardMigrateRequest { node_id: mv.to }),
)
.await
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))

View File

@@ -71,7 +71,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
# We are running tests now
rm -f testout.txt testout_contrib.txt
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src \
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0

View File

@@ -1,15 +0,0 @@
diff --git a/test/schedule/create.sql b/test/schedule/create.sql
index ba355ed..7e250f5 100644
--- a/test/schedule/create.sql
+++ b/test/schedule/create.sql
@@ -1,3 +1,2 @@
\unset ECHO
\i test/psql.sql
-CREATE EXTENSION pgtap;
diff --git a/test/schedule/main.sch b/test/schedule/main.sch
index a8a5fbc..0463fc4 100644
--- a/test/schedule/main.sch
+++ b/test/schedule/main.sch
@@ -1,2 +1 @@
-test: build
test: create

View File

@@ -1,6 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
patch -p1 <test-upgrade.patch
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
${PG_REGRESS} --inputdir=./ --bindir='/usr/local/pgsql/bin' --inputdir=test --max-connections=86 --schedule test/schedule/main.sch --schedule test/build/run.sch --dbname contrib_regression --use-existing

View File

@@ -41,8 +41,7 @@ EXTENSIONS='[
{"extname": "roaringbitmap", "extdir": "pg_roaringbitmap-src"},
{"extname": "semver", "extdir": "pg_semver-src"},
{"extname": "pg_ivm", "extdir": "pg_ivm-src"},
{"extname": "pgjwt", "extdir": "pgjwt-src"},
{"extname": "pgtap", "extdir": "pgtap-src"}
{"extname": "pgjwt", "extdir": "pgjwt-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEWTAG} docker compose --profile test-extensions up --quiet-pull --build -d

View File

@@ -7,7 +7,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
chrono.workspace = true
jsonwebtoken.workspace = true
serde.workspace = true
serde_json.workspace = true
regex.workspace = true

View File

@@ -1,20 +1,18 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use crate::{
privilege::Privilege,
responses::ComputeCtlConfig,
spec::{ComputeSpec, ExtVersion, PgIdent},
};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
/// Request of the /configure API
///
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Deserialize, Debug)]
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
pub compute_ctl_config: ComputeCtlConfig,
}
#[derive(Deserialize, Debug)]

View File

@@ -3,7 +3,6 @@
use std::fmt::Display;
use chrono::{DateTime, Utc};
use jsonwebtoken::jwk::JwkSet;
use serde::{Deserialize, Serialize, Serializer};
use crate::{
@@ -136,27 +135,13 @@ pub struct CatalogObjects {
pub databases: Vec<Database>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct ComputeCtlConfig {
pub jwks: JwkSet,
}
impl Default for ComputeCtlConfig {
fn default() -> Self {
Self {
jwks: JwkSet {
keys: Vec::default(),
},
}
}
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.
/// This is not actually a compute API response, so consider moving
/// to a different place.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub spec: Option<ComputeSpec>,
pub status: ControlPlaneComputeStatus,
pub compute_ctl_config: ComputeCtlConfig,
}
#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)]

View File

@@ -182,18 +182,6 @@ pub struct TenantDescribeResponseShard {
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateRequest {
pub node_id: NodeId,
#[serde(default)]
pub migration_config: Option<MigrationConfig>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MigrationConfig {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub secondary_warmup_timeout: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub secondary_download_request_timeout: Option<Duration>,
}
#[derive(Serialize, Clone, Debug)]

View File

@@ -27,6 +27,55 @@ impl std::fmt::Display for KeySpace {
}
}
pub struct KeySpaceIter<'a> {
ranges: &'a [Range<Key>],
current_range: usize,
current_key: Option<Key>,
}
impl<'a> Iterator for KeySpaceIter<'a> {
// (current_key, range_end)
type Item = (Key, Key);
fn next(&mut self) -> Option<Self::Item> {
// if we've gone through all ranges, stop iteration
if self.current_range >= self.ranges.len() {
return None;
}
let range = &self.ranges[self.current_range];
// if current_key is None, initialize it with range.start
if self.current_key.is_none() {
self.current_key = Some(range.start);
}
if let Some(current_key) = self.current_key {
// check if the current_key is still within the range
if current_key < range.end {
// get the next key
let next_key = current_key.next();
// move current_key forward
self.current_key = Some(next_key);
// return the range from current_key to the current range_end
return Some((current_key, range.end));
}
}
// move to the next range
self.current_range += 1;
// if there are more ranges, initialize the next key from the start of the next range
if self.current_range < self.ranges.len() {
self.current_key = Some(self.ranges[self.current_range].start);
self.next() // Recurse to continue iterating with the new range
} else {
// no more ranges, end iteration
None
}
}
}
/// A wrapper type for sparse keyspaces.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SparseKeySpace(pub KeySpace);
@@ -435,6 +484,14 @@ impl KeySpace {
pub fn contains(&self, key: &Key) -> bool {
self.overlaps(&(*key..key.next()))
}
pub fn iter(&self) -> KeySpaceIter {
KeySpaceIter {
ranges: &self.ranges,
current_range: 0,
current_key: self.ranges.first().map(|r| r.start),
}
}
}
///
@@ -619,6 +676,92 @@ mod tests {
use super::*;
use std::fmt::Write;
fn key(field1: u8, field2: u32, field6: u32) -> Key {
Key {
field1,
field2,
field3: 0,
field4: 0,
field5: 0,
field6,
}
}
#[test]
fn test_iter_single_range() {
let key_range = Range {
start: key(1, 0, 0),
end: key(1, 0, 5),
};
let keyspace = KeySpace {
ranges: vec![key_range],
};
let collected_ranges: Vec<_> = keyspace.iter().collect();
assert_eq!(
collected_ranges,
vec![
(key(1, 0, 0), key(1, 0, 5)),
(key(1, 0, 1), key(1, 0, 5)),
(key(1, 0, 2), key(1, 0, 5)),
(key(1, 0, 3), key(1, 0, 5)),
(key(1, 0, 4), key(1, 0, 5)), // Stops at field6 = 5
]
);
}
#[test]
fn test_iter_multiple_ranges() {
let ranges = vec![
Range {
start: key(1, 0, 0),
end: key(1, 0, 3),
},
Range {
start: key(2, 0, 0),
end: key(2, 0, 2),
},
];
let keyspace = KeySpace { ranges };
let collected_ranges: Vec<_> = keyspace.iter().collect();
assert_eq!(
collected_ranges,
vec![
(key(1, 0, 0), key(1, 0, 3)),
(key(1, 0, 1), key(1, 0, 3)),
(key(1, 0, 2), key(1, 0, 3)), // End of first range
(key(2, 0, 0), key(2, 0, 2)),
(key(2, 0, 1), key(2, 0, 2)), // End of second range
]
);
}
#[test]
fn test_iter_empty_keyspace() {
let keyspace = KeySpace { ranges: vec![] };
let mut iter = keyspace.iter();
assert_eq!(iter.next(), None);
}
#[test]
fn test_iter_range_with_single_key() {
let key_range = Range {
start: key(1, 42, 0),
end: key(1, 42, 1), // Only one step
};
let keyspace = KeySpace {
ranges: vec![key_range],
};
let mut iter = keyspace.iter();
assert_eq!(iter.next(), Some((key(1, 42, 0), key(1, 42, 1))));
assert_eq!(iter.next(), None);
}
// Helper function to create a key range.
//
// Make the tests below less verbose.

View File

@@ -1136,24 +1136,7 @@ pub struct TimelineInfo {
pub ancestor_lsn: Option<Lsn>,
pub last_record_lsn: Lsn,
pub prev_record_lsn: Option<Lsn>,
/// Legacy field for compat with control plane. Synonym of `min_readable_lsn`.
/// TODO: remove once control plane no longer reads it.
pub latest_gc_cutoff_lsn: Lsn,
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
/// as it is easier to reason about.
pub applied_gc_cutoff_lsn: Lsn,
/// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
/// This LSN represents the "end of history" for this timeline, and callers should use it to figure out the oldest
/// LSN at which it is legal to create a branch or ephemeral endpoint.
///
/// Note that holders of valid LSN leases may be able to create branches and read pages earlier
/// than this LSN, but new leases may not be taken out earlier than this LSN.
pub min_readable_lsn: Lsn,
pub disk_consistent_lsn: Lsn,
/// The LSN that we have succesfully uploaded to remote storage

View File

@@ -42,8 +42,8 @@ use utils::lsn::Lsn;
pub enum BasebackupError {
#[error("basebackup pageserver error {0:#}")]
Server(#[from] anyhow::Error),
#[error("basebackup client error {0:#} when {1}")]
Client(#[source] io::Error, &'static str),
#[error("basebackup client error {0:#}")]
Client(#[source] io::Error),
}
/// Create basebackup with non-rel data in it.
@@ -234,7 +234,7 @@ where
self.ar
.append(&header, self.buf.as_slice())
.await
.map_err(|e| BasebackupError::Client(e, "flush"))?;
.map_err(BasebackupError::Client)?;
self.total_blocks += nblocks;
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
@@ -273,9 +273,9 @@ where
for dir in subdirs.iter() {
let header = new_tar_header_dir(dir)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball"))?;
.context("could not add directory to basebackup tarball")?;
}
// Send config files.
@@ -286,13 +286,13 @@ where
self.ar
.append(&header, data)
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
.context("could not add config file to basebackup tarball")?;
} else {
let header = new_tar_header(filepath, 0)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_config_file"))?;
.context("could not add config file to basebackup tarball")?;
}
}
if !lazy_slru_download {
@@ -406,7 +406,7 @@ where
self.ar
.append(&header, &*content)
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,add_aux_file"))?;
.context("could not add aux file to basebackup tarball")?;
}
if min_restart_lsn != Lsn::MAX {
@@ -419,7 +419,7 @@ where
self.ar
.append(&header, &data[..])
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,restart.lsn"))?;
.context("could not add restart.lsn file to basebackup tarball")?;
}
for xid in self
.timeline
@@ -451,9 +451,9 @@ where
let crc32 = crc32c::crc32c(&content);
content.extend_from_slice(&crc32.to_le_bytes());
let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?;
self.ar.append(&header, &*content).await.map_err(|e| {
BasebackupError::Client(e, "send_tarball,pg_logical/replorigin_checkpoint")
})?;
self.ar.append(&header, &*content).await.context(
"could not add pg_logical/replorigin_checkpoint file to basebackup tarball",
)?;
}
fail_point!("basebackup-before-control-file", |_| {
@@ -464,10 +464,7 @@ where
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
self.ar
.finish()
.await
.map_err(|e| BasebackupError::Client(e, "send_tarball,finish"))?;
self.ar.finish().await.map_err(BasebackupError::Client)?;
debug!("all tarred up!");
Ok(())
}
@@ -485,9 +482,9 @@ where
let file_name = dst.to_segfile_name(0);
let header = new_tar_header(&file_name, 0)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "add_rel,empty"))?;
.map_err(BasebackupError::Client)?;
return Ok(());
}
@@ -518,7 +515,7 @@ where
self.ar
.append(&header, segment_data.as_slice())
.await
.map_err(|e| BasebackupError::Client(e, "add_rel,segment"))?;
.map_err(BasebackupError::Client)?;
seg += 1;
startblk = endblk;
@@ -569,7 +566,7 @@ where
self.ar
.append(&header, pg_version_str.as_bytes())
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,PG_VERSION"))?;
.map_err(BasebackupError::Client)?;
info!("timeline.pg_version {}", self.timeline.pg_version);
@@ -579,7 +576,7 @@ where
self.ar
.append(&header, &img[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,global/pg_filenode.map"))?;
.map_err(BasebackupError::Client)?;
} else {
warn!("global/pg_filenode.map is missing");
}
@@ -615,9 +612,9 @@ where
let path = format!("base/{}", dbnode);
let header = new_tar_header_dir(&path)?;
self.ar
.append(&header, io::empty())
.append(&header, &mut io::empty())
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
.map_err(BasebackupError::Client)?;
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
@@ -630,14 +627,14 @@ where
self.ar
.append(&header, pg_version_str.as_bytes())
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
.map_err(BasebackupError::Client)?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let header = new_tar_header(&relmap_path, img.len() as u64)?;
self.ar
.append(&header, &img[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/pg_filenode.map"))?;
.map_err(BasebackupError::Client)?;
}
};
Ok(())
@@ -666,7 +663,7 @@ where
self.ar
.append(&header, &buf[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_twophase_file"))?;
.map_err(BasebackupError::Client)?;
Ok(())
}
@@ -696,7 +693,7 @@ where
zenith_signal.as_bytes(),
)
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
.map_err(BasebackupError::Client)?;
let checkpoint_bytes = self
.timeline
@@ -721,7 +718,7 @@ where
self.ar
.append(&header, &pg_control_bytes[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,pg_control"))?;
.map_err(BasebackupError::Client)?;
//send wal segment
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
@@ -745,7 +742,7 @@ where
self.ar
.append(&header, &wal_seg[..])
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,wal_segment"))?;
.map_err(BasebackupError::Client)?;
Ok(())
}
}

View File

@@ -1080,10 +1080,7 @@ components:
type: integer
state:
type: string
min_readable_lsn:
type: string
format: hex
applied_gc_cutoff_lsn:
latest_gc_cutoff_lsn:
type: string
format: hex

View File

@@ -482,11 +482,6 @@ async fn build_timeline_info_common(
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
let min_readable_lsn = std::cmp::max(
timeline.get_gc_cutoff_lsn(),
*timeline.get_applied_gc_cutoff_lsn(),
);
let info = TimelineInfo {
tenant_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
@@ -498,12 +493,7 @@ async fn build_timeline_info_common(
initdb_lsn,
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
// Externally, expose the lowest LSN that can be used to create a branch as the "GC cutoff", although internally
// we distinguish between the "planned" GC cutoff (PITR point) and the "latest" GC cutoff (where we
// actually trimmed data to), which can pass each other when PITR is changed.
latest_gc_cutoff_lsn: min_readable_lsn,
min_readable_lsn,
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
current_logical_size_is_accurate: match current_logical_size.accuracy() {
tenant::timeline::logical_size::Accuracy::Approximate => false,

View File

@@ -914,7 +914,7 @@ impl PageServerHandler {
&shard,
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
&shard.get_latest_gc_cutoff_lsn(),
ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
@@ -1810,7 +1810,7 @@ impl PageServerHandler {
req: &PagestreamExistsRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -1837,7 +1837,7 @@ impl PageServerHandler {
req: &PagestreamNblocksRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -1864,7 +1864,7 @@ impl PageServerHandler {
req: &PagestreamDbSizeRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -1954,7 +1954,7 @@ impl PageServerHandler {
req: &PagestreamGetSlruSegmentRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
timeline,
req.hdr.request_lsn,
@@ -2050,8 +2050,7 @@ impl PageServerHandler {
{
fn map_basebackup_error(err: BasebackupError) -> QueryError {
match err {
// TODO: passthrough the error site to the final error message?
BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
BasebackupError::Server(e) => QueryError::Other(e),
}
}
@@ -2072,7 +2071,7 @@ impl PageServerHandler {
//return Err(QueryError::NotFound("timeline is archived".into()))
}
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
info!("waiting for {}", lsn);
@@ -2152,12 +2151,10 @@ impl PageServerHandler {
.await
.map_err(map_basebackup_error)?;
}
writer.flush().await.map_err(|e| {
map_basebackup_error(BasebackupError::Client(
e,
"handle_basebackup_request,flush",
))
})?;
writer
.flush()
.await
.map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
}
pgb.write_message_noflush(&BeMessage::CopyDone)

View File

@@ -611,7 +611,7 @@ impl Timeline {
) -> Result<LsnForTimestamp, PageReconstructError> {
pausable_failpoint!("find-lsn-for-timestamp-pausable");
let gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let gc_cutoff_planned = {
let gc_info = self.gc_info.read().unwrap();
gc_info.min_cutoff()

View File

@@ -40,8 +40,6 @@ use remote_timeline_client::manifest::{
use remote_timeline_client::UploadQueueNotReadyError;
use remote_timeline_client::FAILED_REMOTE_OP_RETRIES;
use remote_timeline_client::FAILED_UPLOAD_WARN_THRESHOLD;
use secondary::heatmap::HeatMapTenant;
use secondary::heatmap::HeatMapTimeline;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
@@ -57,7 +55,6 @@ use timeline::offload::OffloadError;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::PreviousHeatmap;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -265,7 +262,6 @@ struct TimelinePreload {
timeline_id: TimelineId,
client: RemoteTimelineClient,
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
previous_heatmap: Option<PreviousHeatmap>,
}
pub(crate) struct TenantPreload {
@@ -283,53 +279,6 @@ pub(crate) enum SpawnMode {
Lazy,
}
/// A notifier that can be used to trigger compaction shared by all timelines of a tenant.
///
/// It is used to notify the compaction loop that there is work to be done. It is also used
/// to balance the load of compaction between timelines. If there are pending L0 compaction in
/// one of the timeline, it could preempt long-running compaction jobs (e.g., image compaction)
/// on other timelines.
pub struct CompactionNotifier {
notify: Notify,
l0_count: std::sync::Mutex<HashMap<TimelineId, usize>>,
}
impl CompactionNotifier {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
notify: Notify::new(),
l0_count: std::sync::Mutex::new(HashMap::new()),
}
}
pub fn notify_one(&self) {
self.notify.notify_one();
}
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}
pub fn on_l0_update(&self, timeline_id: TimelineId, l0_count: usize) {
let mut guard = self.l0_count.lock().unwrap();
guard.insert(timeline_id, l0_count);
}
pub fn on_shutdown(&self, timeline_id: TimelineId) {
let mut guard = self.l0_count.lock().unwrap();
guard.remove(&timeline_id);
}
pub fn get_max_l0_count(&self) -> Option<(usize, TimelineId)> {
let guard = self.l0_count.lock().unwrap();
guard
.iter()
.max_by_key(|(_, count)| *count)
.map(|(timeline_id, count)| (*count, *timeline_id))
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -405,7 +354,7 @@ pub struct Tenant {
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
pub(crate) l0_compaction_trigger: Arc<CompactionNotifier>,
pub(crate) l0_compaction_trigger: Arc<Notify>,
/// Scheduled gc-compaction tasks.
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
@@ -1179,7 +1128,6 @@ impl Tenant {
resources: TimelineResources,
mut index_part: IndexPart,
metadata: TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
cause: LoadTimelineCause,
ctx: &RequestContext,
@@ -1210,7 +1158,6 @@ impl Tenant {
let timeline = self.create_timeline_struct(
timeline_id,
&metadata,
previous_heatmap,
ancestor.clone(),
resources,
CreateTimelineCause::Load,
@@ -1610,18 +1557,8 @@ impl Tenant {
}
}
// TODO(vlad): Could go to S3 if the secondary is freezing cold and hasn't even
// pulled the first heatmap. Not entirely necessary since the storage controller
// will kick the secondary in any case and cause a download.
let maybe_heatmap_at = self.read_on_disk_heatmap().await;
let timelines = self
.load_timelines_metadata(
remote_timeline_ids,
remote_storage,
maybe_heatmap_at,
cancel,
)
.load_timelines_metadata(remote_timeline_ids, remote_storage, cancel)
.await?;
Ok(TenantPreload {
@@ -1634,26 +1571,6 @@ impl Tenant {
})
}
async fn read_on_disk_heatmap(&self) -> Option<(HeatMapTenant, std::time::Instant)> {
let on_disk_heatmap_path = self.conf.tenant_heatmap_path(&self.tenant_shard_id);
match tokio::fs::read_to_string(on_disk_heatmap_path).await {
Ok(heatmap) => match serde_json::from_str::<HeatMapTenant>(&heatmap) {
Ok(heatmap) => Some((heatmap, std::time::Instant::now())),
Err(err) => {
error!("Failed to deserialize old heatmap: {err}");
None
}
},
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => None,
_ => {
error!("Unexpected IO error reading old heatmap: {err}");
None
}
},
}
}
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
@@ -1741,10 +1658,7 @@ impl Tenant {
match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => {
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
remote_index_and_client.insert(
timeline_id,
(index_part, preload.client, preload.previous_heatmap),
);
remote_index_and_client.insert(timeline_id, (index_part, preload.client));
}
MaybeDeletedIndexPart::Deleted(index_part) => {
info!(
@@ -1763,7 +1677,7 @@ impl Tenant {
// layer file.
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
for (timeline_id, remote_metadata) in sorted_timelines {
let (index_part, remote_client, previous_heatmap) = remote_index_and_client
let (index_part, remote_client) = remote_index_and_client
.remove(&timeline_id)
.expect("just put it in above");
@@ -1783,7 +1697,6 @@ impl Tenant {
timeline_id,
index_part,
remote_metadata,
previous_heatmap,
self.get_timeline_resources_for(remote_client),
LoadTimelineCause::Attach,
ctx,
@@ -1933,13 +1846,11 @@ impl Tenant {
}
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
#[allow(clippy::too_many_arguments)]
async fn load_remote_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
index_part: IndexPart,
remote_metadata: TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
resources: TimelineResources,
cause: LoadTimelineCause,
ctx: &RequestContext,
@@ -1969,7 +1880,6 @@ impl Tenant {
resources,
index_part,
remote_metadata,
previous_heatmap,
ancestor,
cause,
ctx,
@@ -1981,29 +1891,14 @@ impl Tenant {
self: &Arc<Tenant>,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
heatmap: Option<(HeatMapTenant, std::time::Instant)>,
cancel: CancellationToken,
) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
let mut timeline_heatmaps = heatmap.map(|h| (h.0.into_timelines_index(), h.1));
let mut part_downloads = JoinSet::new();
for timeline_id in timeline_ids {
let cancel_clone = cancel.clone();
let previous_timeline_heatmap = timeline_heatmaps.as_mut().and_then(|hs| {
hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active {
heatmap: h,
read_at: hs.1,
})
});
part_downloads.spawn(
self.load_timeline_metadata(
timeline_id,
remote_storage.clone(),
previous_timeline_heatmap,
cancel_clone,
)
.instrument(info_span!("download_index_part", %timeline_id)),
self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone)
.instrument(info_span!("download_index_part", %timeline_id)),
);
}
@@ -2051,7 +1946,6 @@ impl Tenant {
self: &Arc<Tenant>,
timeline_id: TimelineId,
remote_storage: GenericRemoteStorage,
previous_heatmap: Option<PreviousHeatmap>,
cancel: CancellationToken,
) -> impl Future<Output = TimelinePreload> {
let client = self.build_timeline_client(timeline_id, remote_storage);
@@ -2067,7 +1961,6 @@ impl Tenant {
client,
timeline_id,
index_part,
previous_heatmap,
}
}
}
@@ -2179,12 +2072,7 @@ impl Tenant {
})?;
let timeline_preload = self
.load_timeline_metadata(
timeline_id,
self.remote_storage.clone(),
None,
cancel.clone(),
)
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
.await;
let index_part = match timeline_preload.index_part {
@@ -2218,7 +2106,6 @@ impl Tenant {
timeline_id,
index_part,
remote_metadata,
None,
timeline_resources,
LoadTimelineCause::Unoffload,
&ctx,
@@ -2934,7 +2821,7 @@ impl Tenant {
};
let metadata = index_part.metadata.clone();
self
.load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{
.load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{
create_guard: timeline_create_guard, activate, }, &ctx)
.await?
.ready_to_activate()
@@ -4143,7 +4030,6 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
@@ -4167,7 +4053,6 @@ impl Tenant {
self.conf,
Arc::clone(&self.tenant_conf),
new_metadata,
previous_heatmap,
ancestor,
new_timeline_id,
self.tenant_shard_id,
@@ -4289,7 +4174,7 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
l0_compaction_trigger: Arc::new(CompactionNotifier::new()),
l0_compaction_trigger: Arc::new(Notify::new()),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
@@ -4810,24 +4695,24 @@ impl Tenant {
// We check it against both the planned GC cutoff stored in 'gc_info',
// and the 'latest_gc_cutoff' of the last GC that was performed. The
// planned GC cutoff in 'gc_info' is normally larger than
// 'applied_gc_cutoff_lsn', but beware of corner cases like if you just
// 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
// changed the GC settings for the tenant to make the PITR window
// larger, but some of the data was already removed by an earlier GC
// iteration.
// check against last actual 'latest_gc_cutoff' first
let applied_gc_cutoff_lsn = src_timeline.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
{
let gc_info = src_timeline.gc_info.read().unwrap();
let planned_cutoff = gc_info.min_cutoff();
if gc_info.lsn_covered_by_lease(start_lsn) {
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *applied_gc_cutoff_lsn);
tracing::info!("skipping comparison of {start_lsn} with gc cutoff {} and planned gc cutoff {planned_cutoff} due to lsn lease", *latest_gc_cutoff_lsn);
} else {
src_timeline
.check_lsn_is_in_scope(start_lsn, &applied_gc_cutoff_lsn)
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context(format!(
"invalid branch start lsn: less than latest GC cutoff {}",
*applied_gc_cutoff_lsn,
*latest_gc_cutoff_lsn,
))
.map_err(CreateTimelineError::AncestorLsn)?;
@@ -4866,7 +4751,7 @@ impl Tenant {
dst_prev,
Some(src_id),
start_lsn,
*src_timeline.applied_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
@@ -5239,7 +5124,6 @@ impl Tenant {
.create_timeline_struct(
new_timeline_id,
new_metadata,
None,
ancestor,
resources,
CreateTimelineCause::Load,
@@ -6246,8 +6130,8 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
let applied_gc_cutoff_lsn = tline.get_applied_gc_cutoff_lsn();
assert!(*applied_gc_cutoff_lsn > Lsn(0x25));
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
match tline.get(*TEST_KEY, Lsn(0x25)) {
Ok(_) => panic!("request for page should have failed"),
Err(err) => assert!(err.to_string().contains("not found at")),
@@ -8543,7 +8427,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -8651,7 +8535,7 @@ mod tests {
// increase GC horizon and compact again
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
@@ -8819,8 +8703,8 @@ mod tests {
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
info!(
"applied_gc_cutoff_lsn: {}",
*timeline.get_applied_gc_cutoff_lsn()
"latest_gc_cutoff_lsn: {}",
*timeline.get_latest_gc_cutoff_lsn()
);
timeline.force_set_disk_consistent_lsn(end_lsn);
@@ -8846,7 +8730,7 @@ mod tests {
// Make lease on a already GC-ed LSN.
// 0/80 does not have a valid lease + is below latest_gc_cutoff
assert!(Lsn(0x80) < *timeline.get_applied_gc_cutoff_lsn());
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
timeline
.init_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx)
.expect_err("lease request on GC-ed LSN should fail");
@@ -9037,7 +8921,7 @@ mod tests {
};
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -9124,7 +9008,7 @@ mod tests {
// increase GC horizon and compact again
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
@@ -9577,7 +9461,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -9724,7 +9608,7 @@ mod tests {
// increase GC horizon and compact again
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x38))
.wait()
@@ -9825,7 +9709,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -10076,7 +9960,7 @@ mod tests {
{
parent_tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x10))
.wait()
@@ -10096,7 +9980,7 @@ mod tests {
{
branch_tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x50))
.wait()
@@ -10452,7 +10336,7 @@ mod tests {
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -10837,7 +10721,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
@@ -11088,7 +10972,7 @@ mod tests {
.await?;
{
tline
.applied_gc_cutoff_lsn
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()

View File

@@ -130,10 +130,7 @@ struct TimelineMetadataBodyV2 {
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<TimelineId>,
ancestor_lsn: Lsn,
// The LSN at which GC was last executed. Synonym of [`Timeline::applied_gc_cutoff_lsn`].
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
}

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, time::SystemTime};
use std::time::SystemTime;
use crate::tenant::{remote_timeline_client::index::LayerFileMetadata, storage_layer::LayerName};
@@ -8,7 +8,7 @@ use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
use utils::{generation::Generation, id::TimelineId};
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTenant {
pub(super) struct HeatMapTenant {
/// Generation of the attached location that uploaded the heatmap: this is not required
/// for correctness, but acts as a hint to secondary locations in order to detect thrashing
/// in the unlikely event that two attached locations are both uploading conflicting heatmaps.
@@ -25,17 +25,8 @@ pub(crate) struct HeatMapTenant {
pub(super) upload_period_ms: Option<u128>,
}
impl HeatMapTenant {
pub(crate) fn into_timelines_index(self) -> HashMap<TimelineId, HeatMapTimeline> {
self.timelines
.into_iter()
.map(|htl| (htl.timeline_id, htl))
.collect()
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(crate) timeline_id: TimelineId,
@@ -44,13 +35,13 @@ pub(crate) struct HeatMapTimeline {
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(crate) name: LayerName,
pub(crate) metadata: LayerFileMetadata,
#[serde_as(as = "TimestampSeconds<i64>")]
pub(crate) access_time: SystemTime,
pub(super) access_time: SystemTime,
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}

View File

@@ -394,7 +394,7 @@ pub(super) async fn gather_inputs(
ancestor_lsn,
last_record: last_record_lsn,
// this is not used above, because it might not have updated recently enough
latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
next_pitr_cutoff,
retention_param_cutoff,
lease_points,

View File

@@ -136,22 +136,6 @@ pub(crate) fn local_layer_path(
}
}
pub(crate) enum LastEviction {
Never,
At(std::time::Instant),
Evicting,
}
impl LastEviction {
pub(crate) fn happened_after(&self, timepoint: std::time::Instant) -> bool {
match self {
LastEviction::Never => false,
LastEviction::At(evicted_at) => evicted_at > &timepoint,
LastEviction::Evicting => true,
}
}
}
impl Layer {
/// Creates a layer value for a file we know to not be resident.
pub(crate) fn for_evicted(
@@ -421,17 +405,6 @@ impl Layer {
self.0.metadata()
}
pub(crate) fn last_evicted_at(&self) -> LastEviction {
match self.0.last_evicted_at.try_lock() {
Ok(lock) => match *lock {
None => LastEviction::Never,
Some(at) => LastEviction::At(at),
},
Err(std::sync::TryLockError::WouldBlock) => LastEviction::Evicting,
Err(std::sync::TryLockError::Poisoned(p)) => panic!("Lock poisoned: {p}"),
}
}
pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
self.0
.timeline
@@ -683,9 +656,7 @@ struct LayerInner {
/// When the Layer was last evicted but has not been downloaded since.
///
/// This is used for skipping evicted layers from the previous heatmap (see
/// `[Timeline::generate_heatmap]`) and for updating metrics
/// (see [`LayerImplMetrics::redownload_after`]).
/// This is used solely for updating metrics. See [`LayerImplMetrics::redownload_after`].
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
#[cfg(test)]

View File

@@ -47,7 +47,7 @@ use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, watch};
use tokio::sync::{oneshot, watch, Notify};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::critical;
@@ -150,16 +150,16 @@ use super::{
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
MaybeOffloaded,
};
use super::{
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, CompactionNotifier,
HeatMapTimeline,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
storage_layer::ReadableLayer,
};
use super::{secondary::heatmap::HeatMapLayer, GcError};
use super::{
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
GcError,
};
#[cfg(test)]
use pageserver_api::value::Value;
@@ -225,7 +225,7 @@ pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_compaction_trigger: Arc<CompactionNotifier>,
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
@@ -352,11 +352,8 @@ pub struct Timeline {
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
// The LSN at which we have executed GC: whereas [`Self::gc_info`] records the LSN at which
// we _intend_ to GC (i.e. the PITR cutoff), this LSN records where we actually last did it.
// Because PITR interval is mutable, it's possible for this LSN to be earlier or later than
// the planned GC cutoff.
pub applied_gc_cutoff_lsn: Rcu<Lsn>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
pub(crate) gc_compaction_layer_update_lock: tokio::sync::RwLock<()>,
@@ -429,7 +426,7 @@ pub struct Timeline {
compaction_failed: AtomicBool,
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
l0_compaction_trigger: Arc<CompactionNotifier>,
l0_compaction_trigger: Arc<Notify>,
/// Make sure we only have one running gc at a time.
///
@@ -465,16 +462,6 @@ pub struct Timeline {
/// If Some, collects GetPage metadata for an ongoing PageTrace.
pub(crate) page_trace: ArcSwapOption<Sender<PageTraceEvent>>,
previous_heatmap: ArcSwapOption<PreviousHeatmap>,
}
pub(crate) enum PreviousHeatmap {
Active {
heatmap: HeatMapTimeline,
read_at: std::time::Instant,
},
Obsolete,
}
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
@@ -1090,15 +1077,9 @@ impl Timeline {
(history, gc_info.within_ancestor_pitr)
}
/// Read timeline's GC cutoff: this is the LSN at which GC has started to happen
pub(crate) fn get_applied_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.applied_gc_cutoff_lsn.read()
}
/// Read timeline's planned GC cutoff: this is the logical end of history that users
/// are allowed to read (based on configured PITR), even if physically we have more history.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
self.gc_info.read().unwrap().cutoffs.time
/// Lock and get timeline's GC cutoff
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read()
}
/// Look up given page version.
@@ -1606,7 +1587,7 @@ impl Timeline {
};
if init || validate {
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!("tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
}
@@ -1966,8 +1947,6 @@ impl Timeline {
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
self.l0_compaction_trigger.on_shutdown(self.timeline_id);
if let ShutdownMode::FreezeAndFlush = mode {
let do_flush = if let Some((open, frozen)) = self
.layers
@@ -2580,7 +2559,6 @@ impl Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
metadata: &TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -2681,7 +2659,7 @@ impl Timeline {
LastImageLayerCreationStatus::default(),
)),
applied_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
current_logical_size: if disk_consistent_lsn.is_valid() {
@@ -2743,8 +2721,6 @@ impl Timeline {
create_idempotency,
page_trace: Default::default(),
previous_heatmap: ArcSwapOption::from_pointee(previous_heatmap),
};
result.repartition_threshold =
@@ -3483,52 +3459,12 @@ impl Timeline {
let guard = self.layers.read().await;
// Firstly, if there's any heatmap left over from when this location
// was a secondary, take that into account. Keep layers that are:
// * present in the layer map
// * visible
// * non-resident
// * not evicted since we read the heatmap
//
// Without this, a new cold, attached location would clobber the previous
// heatamp.
let previous_heatmap = self.previous_heatmap.load();
let visible_non_resident = match previous_heatmap.as_deref() {
Some(PreviousHeatmap::Active { heatmap, read_at }) => {
Some(heatmap.layers.iter().filter_map(|hl| {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
if layer.visibility() == LayerVisibilityHint::Covered {
return None;
}
if layer.is_likely_resident() {
return None;
}
if layer.last_evicted_at().happened_after(*read_at) {
return None;
}
Some((desc, hl.metadata.clone(), hl.access_time))
}))
}
Some(PreviousHeatmap::Obsolete) => None,
None => None,
};
// Secondly, all currently visible, resident layers are included.
let resident = guard.likely_resident_layers().filter_map(|layer| {
match layer.visibility() {
LayerVisibilityHint::Visible => {
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
let last_activity_ts = layer.latest_activity();
Some((
layer.layer_desc().clone(),
layer.metadata(),
last_activity_ts,
))
Some((layer.layer_desc(), layer.metadata(), last_activity_ts))
}
LayerVisibilityHint::Covered => {
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
@@ -3537,18 +3473,7 @@ impl Timeline {
}
});
let mut layers = match visible_non_resident {
Some(non_resident) => {
let mut non_resident = non_resident.peekable();
if non_resident.peek().is_none() {
self.previous_heatmap
.store(Some(PreviousHeatmap::Obsolete.into()));
}
non_resident.chain(resident).collect::<Vec<_>>()
}
None => resident.collect::<Vec<_>>(),
};
let mut layers = resident.collect::<Vec<_>>();
// Sort layers in order of which to download first. For a large set of layers to download, we
// want to prioritize those layers which are most likely to still be in the resident many minutes
@@ -3737,7 +3662,7 @@ impl Timeline {
// the timeline, then it will remove layers that are required for fulfilling
// the current get request (read-path cannot "look back" and notice the new
// image layer).
let _gc_cutoff_holder = timeline.get_applied_gc_cutoff_lsn();
let _gc_cutoff_holder = timeline.get_latest_gc_cutoff_lsn();
// See `compaction::compact_with_gc` for why we need this.
let _guard = timeline.gc_compaction_layer_update_lock.read().await;
@@ -4122,8 +4047,6 @@ impl Timeline {
if l0_count >= self.get_compaction_threshold() {
self.l0_compaction_trigger.notify_one();
}
self.l0_compaction_trigger
.on_l0_update(self.timeline_id, l0_count);
// Delay the next flush to backpressure if compaction can't keep up. We delay by the
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
@@ -4426,7 +4349,7 @@ impl Timeline {
let update = crate::tenant::metadata::MetadataUpdate::new(
disk_consistent_lsn,
ondisk_prev_record_lsn,
*self.applied_gc_cutoff_lsn.read(),
*self.latest_gc_cutoff_lsn.read(),
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
@@ -4629,74 +4552,71 @@ impl Timeline {
let mut wrote_keys = false;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
for (key, range_end) in partition.iter() {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
}
let last_key_in_range = key.next() == range_end;
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(
key_request_accum.consume_keyspace(),
lsn,
io_concurrency.clone(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
let last_key_in_range = key.next() == range.end;
key = key.next();
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(
key_request_accum.consume_keyspace(),
lsn,
io_concurrency.clone(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::from(err));
}
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::from(err));
}
};
}
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
}
@@ -5072,22 +4992,20 @@ impl Timeline {
// image layer generation taking too long time and blocking L0 compaction. So in this
// mode, we also inspect the current number of L0 layers and skip image layer generation
// if there are too many of them.
if let Some((max_num_of_l0_layers, timeline_id)) =
self.l0_compaction_trigger.get_max_l0_count()
{
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
* self.get_compaction_threshold();
if image_preempt_threshold != 0
&& max_num_of_l0_layers >= image_preempt_threshold
{
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {} on timeline {}",
partition.start().unwrap(), partition.end().unwrap(), max_num_of_l0_layers, timeline_id
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
let num_of_l0_layers = {
let layers = self.layers.read().await;
layers.layer_map()?.level0_deltas().len()
};
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
* self.get_compaction_threshold();
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
tracing::info!(
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
);
last_partition_processed = Some(partition.clone());
all_generated = false;
break;
}
}
}
@@ -5466,13 +5384,8 @@ impl Timeline {
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
let l0_count = guard.layer_map()?.level0_deltas().len();
drop_wlock(guard);
self.l0_compaction_trigger
.on_l0_update(self.timeline_id, l0_count);
Ok(())
}
@@ -5661,7 +5574,7 @@ impl Timeline {
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
// cannot advance beyond what was already GC'd, and respect space-based retention
GcCutoffs {
time: *self.get_applied_gc_cutoff_lsn(),
time: *self.get_latest_gc_cutoff_lsn(),
space: space_cutoff,
}
}
@@ -5782,7 +5695,7 @@ impl Timeline {
let mut result: GcResult = GcResult::default();
// Nothing to GC. Return early.
let latest_gc_cutoff = *self.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
if latest_gc_cutoff >= new_gc_cutoff {
info!(
"Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
@@ -5796,7 +5709,7 @@ impl Timeline {
//
// The GC cutoff should only ever move forwards.
let waitlist = {
let write_guard = self.applied_gc_cutoff_lsn.lock_for_write();
let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
if *write_guard > new_gc_cutoff {
return Err(GcError::BadLsn {
why: format!(
@@ -6736,32 +6649,18 @@ fn is_send() {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use pageserver_api::key::Key;
use pageserver_api::value::Value;
use tracing::Instrument;
use utils::{id::TimelineId, lsn::Lsn};
use crate::tenant::{
harness::{test_img, TenantHarness},
layer_map::LayerMap,
storage_layer::{Layer, LayerName, LayerVisibilityHint},
storage_layer::{Layer, LayerName},
timeline::{DeltaLayerTestDesc, EvictionError},
PreviousHeatmap, Timeline,
Timeline,
};
use super::HeatMapTimeline;
fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) {
assert_eq!(lhs.layers.len(), rhs.layers.len());
let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter());
for (l, r) in lhs_rhs {
assert_eq!(l.name, r.name);
assert_eq!(l.metadata, r.metadata);
}
}
#[tokio::test]
async fn test_heatmap_generation() {
let harness = TenantHarness::create("heatmap_generation").await.unwrap();
@@ -6835,7 +6734,7 @@ mod tests {
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
let mut last_lsn = Lsn::MAX;
for layer in &heatmap.layers {
for layer in heatmap.layers {
// Covered layer should be omitted
assert!(layer.name != covered_delta.layer_name());
@@ -6850,144 +6749,6 @@ mod tests {
last_lsn = layer_lsn;
}
}
// Evict all the layers and stash the old heatmap in the timeline.
// This simulates a migration to a cold secondary location.
let guard = timeline.layers.read().await;
let mut all_layers = Vec::new();
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
all_layers.push(layer.clone());
layer.evict_and_wait(forever).await.unwrap();
}
drop(guard);
timeline
.previous_heatmap
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
})));
// Generate a new heatmap and assert that it contains the same layers as the old one.
let post_migration_heatmap = timeline.generate_heatmap().await.unwrap();
assert_heatmaps_have_same_layers(&heatmap, &post_migration_heatmap);
// Download each layer one by one. Generate the heatmap at each step and check
// that it's stable.
for layer in all_layers {
if layer.visibility() == LayerVisibilityHint::Covered {
continue;
}
eprintln!("Downloading {layer} and re-generating heatmap");
let _resident = layer
.download_and_keep_resident()
.instrument(tracing::info_span!(
parent: None,
"download_layer",
tenant_id = %timeline.tenant_shard_id.tenant_id,
shard_id = %timeline.tenant_shard_id.shard_slug(),
timeline_id = %timeline.timeline_id
))
.await
.unwrap();
let post_download_heatmap = timeline.generate_heatmap().await.unwrap();
assert_heatmaps_have_same_layers(&heatmap, &post_download_heatmap);
}
// Everything from the post-migration heatmap is now resident.
// Check that we drop it from memory.
assert!(matches!(
timeline.previous_heatmap.load().as_deref(),
Some(PreviousHeatmap::Obsolete)
));
}
#[tokio::test]
async fn test_previous_heatmap_obsoletion() {
let harness = TenantHarness::create("heatmap_previous_heatmap_obsoletion")
.await
.unwrap();
let l0_delta = DeltaLayerTestDesc::new(
Lsn(0x20)..Lsn(0x30),
Key::from_hex("000000000000000000000000000000000000").unwrap()
..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(),
vec![(
Key::from_hex("720000000033333333444444445500000000").unwrap(),
Lsn(0x25),
Value::Image(test_img("foo")),
)],
);
let image_layer = (
Lsn(0x40),
vec![(
Key::from_hex("620000000033333333444444445500000000").unwrap(),
test_img("bar"),
)],
);
let delta_layers = vec![l0_delta];
let image_layers = vec![image_layer];
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
14,
&ctx,
delta_layers,
image_layers,
Lsn(0x100),
)
.await
.unwrap();
// Layer visibility is an input to heatmap generation, so refresh it first
timeline.update_layer_visibility().await.unwrap();
let heatmap = timeline
.generate_heatmap()
.await
.expect("Infallible while timeline is not shut down");
// Both layers should be in the heatmap
assert!(!heatmap.layers.is_empty());
// Now simulate a migration.
timeline
.previous_heatmap
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: heatmap.clone(),
read_at: std::time::Instant::now(),
})));
// Evict all the layers in the previous heatmap
let guard = timeline.layers.read().await;
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
layer.evict_and_wait(forever).await.unwrap();
}
drop(guard);
// Generate a new heatmap and check that the previous heatmap
// has been marked obsolete.
let post_eviction_heatmap = timeline
.generate_heatmap()
.await
.expect("Infallible while timeline is not shut down");
assert!(post_eviction_heatmap.layers.is_empty());
assert!(matches!(
timeline.previous_heatmap.load().as_deref(),
Some(PreviousHeatmap::Obsolete)
));
}
#[tokio::test]

View File

@@ -852,7 +852,7 @@ impl Timeline {
//
// Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
// are rewriting layers.
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
tracing::info!(
"latest_gc_cutoff: {}, pitr cutoff {}",
@@ -2202,7 +2202,7 @@ impl Timeline {
// TODO: ensure the child branches will not use anything below the watermark, or consider
// them when computing the watermark.
gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
}
/// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.

View File

@@ -294,7 +294,6 @@ impl DeleteTimelineFlow {
timeline_id,
local_metadata,
None, // Ancestor is not needed for deletion.
None, // Previous heatmap is not needed for deletion
tenant.get_timeline_resources_for(remote_client),
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.

View File

@@ -14,7 +14,7 @@
#include "utils/guc.h"
#include "extension_server.h"
#include "extension_server.h"
#include "neon_utils.h"
static int extension_server_port = 0;
@@ -45,7 +45,7 @@ neon_download_extension_file_http(const char *filename, bool is_library)
handle = alloc_curl_handle();
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 60L /* seconds */ );
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ );
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",

View File

@@ -3765,7 +3765,7 @@ neon_dbsize(Oid dbNode)
* neon_truncate() -- Truncate relation to specified number of blocks.
*/
static void
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
{
XLogRecPtr lsn;
@@ -3780,7 +3780,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdtruncate(reln, forknum, old_blocks, nblocks);
mdtruncate(reln, forknum, nblocks);
return;
default:
@@ -3818,7 +3818,7 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
mdtruncate(reln, forknum, old_blocks, nblocks);
mdtruncate(reln, forknum, nblocks);
#endif
}

View File

@@ -96,7 +96,7 @@ static void inmem_writeback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
static BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum);
static void inmem_truncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber old_blocks, BlockNumber nblocks);
BlockNumber nblocks);
static void inmem_immedsync(SMgrRelation reln, ForkNumber forknum);
#if PG_MAJORVERSION_NUM >= 17
static void inmem_registersync(SMgrRelation reln, ForkNumber forknum);
@@ -345,7 +345,7 @@ inmem_nblocks(SMgrRelation reln, ForkNumber forknum)
* inmem_truncate() -- Truncate relation to specified number of blocks.
*/
static void
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks)
inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
{
}

View File

@@ -501,7 +501,7 @@ impl Session {
_guard: Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HDel),
.guard(RedisMsgKind::HSet),
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {

View File

@@ -5,7 +5,7 @@
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
use std::error::Error as _;
use utils::{
id::{NodeId, TenantId, TimelineId},
@@ -32,9 +32,6 @@ pub enum Error {
/// Status is not ok; parsed error in body as `HttpErrorBody`.
#[error("safekeeper API: {1}")]
ApiError(StatusCode, String),
#[error("Cancelled")]
Cancelled,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -127,10 +124,9 @@ impl Client {
self.get(&uri).await
}
pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
pub async fn utilization(&self) -> Result<reqwest::Response> {
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
self.get(&uri).await
}
async fn post<B: serde::Serialize, U: IntoUrl>(

View File

@@ -310,12 +310,9 @@ impl WalBackupTask {
retry_attempt = 0;
}
Err(e) => {
// We might have managed to upload some segment even though
// some later in the range failed, so log backup_lsn
// separately.
error!(
"failed while offloading range {}-{}, backup_lsn {}: {:?}",
backup_lsn, commit_lsn, backup_lsn, e
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = retry_attempt.saturating_add(1);
@@ -341,13 +338,6 @@ async fn backup_lsn_range(
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
info!(
"offloading segnos {:?} of range [{}-{})",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
start_lsn,
end_lsn,
);
// Pool of concurrent upload tasks. We use `FuturesOrdered` to
// preserve order of uploads, and update `backup_lsn` only after
// all previous uploads are finished.
@@ -384,10 +374,10 @@ async fn backup_lsn_range(
}
info!(
"offloaded segnos {:?} of range [{}-{})",
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
start_lsn,
end_lsn,
start_lsn,
);
Ok(())
}

View File

@@ -32,8 +32,6 @@ postgres_connection.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
rustls-native-certs.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -1,10 +1,6 @@
use futures::{stream::FuturesUnordered, StreamExt};
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use std::{
collections::HashMap,
fmt::Debug,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
@@ -13,15 +9,15 @@ use tokio_util::sync::CancellationToken;
use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
use thiserror::Error;
use utils::{id::NodeId, logging::SecretString};
use utils::id::NodeId;
use crate::{node::Node, safekeeper::Safekeeper};
use crate::node::Node;
struct HeartbeaterTask<Server, State> {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
struct HeartbeaterTask {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
cancel: CancellationToken,
state: HashMap<NodeId, State>,
state: HashMap<NodeId, PageserverState>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
@@ -40,17 +36,8 @@ pub(crate) enum PageserverState {
Offline,
}
#[derive(Debug, Clone)]
pub(crate) enum SafekeeperState {
Available {
last_seen_at: Instant,
utilization: SafekeeperUtilization,
},
Offline,
}
#[derive(Debug)]
pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
#[derive(Debug, Error)]
pub(crate) enum HeartbeaterError {
@@ -58,28 +45,23 @@ pub(crate) enum HeartbeaterError {
Cancel,
}
struct HeartbeatRequest<Server, State> {
servers: Arc<HashMap<NodeId, Server>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
struct HeartbeatRequest {
pageservers: Arc<HashMap<NodeId, Node>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
}
pub(crate) struct Heartbeater<Server, State> {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
pub(crate) struct Heartbeater {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
}
#[allow(private_bounds)]
impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
impl Heartbeater {
pub(crate) fn new(
jwt_token: Option<String>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
) -> Self {
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
let mut heartbeater = HeartbeaterTask::new(
receiver,
jwt_token,
@@ -94,12 +76,12 @@ where
pub(crate) async fn heartbeat(
&self,
servers: Arc<HashMap<NodeId, Server>>,
) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
.send(HeartbeatRequest {
servers,
pageservers,
reply: sender,
})
.map_err(|_| HeartbeaterError::Cancel)?;
@@ -111,12 +93,9 @@ where
}
}
impl<Server, State: Debug> HeartbeaterTask<Server, State>
where
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
{
impl HeartbeaterTask {
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
jwt_token: Option<String>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
@@ -131,13 +110,14 @@ where
jwt_token,
}
}
async fn run(&mut self) {
loop {
tokio::select! {
request = self.receiver.recv() => {
match request {
Some(req) => {
let res = self.heartbeat(req.servers).await;
let res = self.heartbeat(req.pageservers).await;
req.reply.send(res).unwrap();
},
None => { return; }
@@ -147,20 +127,11 @@ where
}
}
}
}
pub(crate) trait HeartBeat<Server, State> {
fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Server>>,
) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
}
impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
async fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
) -> Result<AvailablityDeltas, HeartbeaterError> {
let mut new_state = HashMap::new();
let mut heartbeat_futs = FuturesUnordered::new();
@@ -301,121 +272,3 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
Ok(AvailablityDeltas(deltas))
}
}
impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
async fn heartbeat(
&mut self,
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
let mut new_state = HashMap::new();
let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, sk) in &*safekeepers {
heartbeat_futs.push({
let jwt_token = self
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let cancel = self.cancel.clone();
async move {
let response = sk
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
3,
3,
Duration::from_secs(1),
&cancel,
)
.await;
let status = match response {
Ok(utilization) => SafekeeperState::Available {
last_seen_at: Instant::now(),
utilization,
},
Err(mgmt_api::Error::Cancelled) => {
// This indicates cancellation of the request.
// We ignore the node in this case.
return None;
}
Err(_) => SafekeeperState::Offline,
};
Some((*node_id, status))
}
});
loop {
let maybe_status = tokio::select! {
next = heartbeat_futs.next() => {
match next {
Some(result) => result,
None => { break; }
}
},
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
};
if let Some((node_id, status)) = maybe_status {
new_state.insert(node_id, status);
}
}
}
let mut offline = 0;
for state in new_state.values() {
match state {
SafekeeperState::Offline { .. } => offline += 1,
SafekeeperState::Available { .. } => {}
}
}
tracing::info!(
"Heartbeat round complete for {} safekeepers, {} offline",
new_state.len(),
offline
);
let mut deltas = Vec::new();
let now = Instant::now();
for (node_id, sk_state) in new_state.iter_mut() {
use std::collections::hash_map::Entry::*;
let entry = self.state.entry(*node_id);
let mut needs_update = false;
match entry {
Occupied(ref occ) => match (occ.get(), &sk_state) {
(SafekeeperState::Offline, SafekeeperState::Offline) => {}
(SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
if now - *last_seen_at >= self.max_offline_interval {
deltas.push((*node_id, sk_state.clone()));
needs_update = true;
}
}
_ => {
deltas.push((*node_id, sk_state.clone()));
needs_update = true;
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((*node_id, sk_state.clone()));
}
}
match entry {
Occupied(mut occ) if needs_update => {
(*occ.get_mut()) = sk_state.clone();
}
Vacant(vac) => {
vac.insert(sk_state.clone());
}
_ => {}
}
}
Ok(AvailablityDeltas(deltas))
}
}

View File

@@ -17,8 +17,6 @@ mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod safekeeper;
mod safekeeper_client;
mod scheduler;
mod schema;
pub mod service;

View File

@@ -80,11 +80,6 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
@@ -92,13 +87,6 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_passthrough_request_error:

View File

@@ -1185,6 +1185,23 @@ impl Persistence {
Ok(safekeepers)
}
pub(crate) async fn safekeeper_get(
&self,
id: i64,
) -> Result<SafekeeperPersistence, DatabaseError> {
use crate::schema::safekeepers::dsl::{id as id_column, safekeepers};
self.with_conn(move |conn| {
Box::pin(async move {
Ok(safekeepers
.filter(id_column.eq(&id))
.select(SafekeeperPersistence::as_select())
.get_result(conn)
.await?)
})
})
.await
}
pub(crate) async fn safekeeper_upsert(
&self,
record: SafekeeperUpsert,
@@ -1537,21 +1554,6 @@ pub(crate) struct SafekeeperPersistence {
}
impl SafekeeperPersistence {
pub(crate) fn from_upsert(
upsert: SafekeeperUpsert,
scheduling_policy: SkSchedulingPolicy,
) -> Self {
crate::persistence::SafekeeperPersistence {
id: upsert.id,
region_id: upsert.region_id,
version: upsert.version,
host: upsert.host,
port: upsert.port,
http_port: upsert.http_port,
availability_zone_id: upsert.availability_zone_id,
scheduling_policy: String::from(scheduling_policy),
}
}
pub(crate) fn as_describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let scheduling_policy =
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {

View File

@@ -1,7 +1,7 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::{compute_hook, service};
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
};
@@ -162,22 +162,6 @@ impl ReconcilerConfig {
}
}
impl From<&MigrationConfig> for ReconcilerConfig {
fn from(value: &MigrationConfig) -> Self {
let mut builder = ReconcilerConfigBuilder::new();
if let Some(timeout) = value.secondary_warmup_timeout {
builder = builder.secondary_warmup_timeout(timeout)
}
if let Some(timeout) = value.secondary_download_request_timeout {
builder = builder.secondary_download_request_timeout(timeout)
}
builder.build()
}
}
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
pub(crate) struct ReconcileUnits {
_sem_units: tokio::sync::OwnedSemaphorePermit,

View File

@@ -1,139 +0,0 @@
use std::{str::FromStr, time::Duration};
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use reqwest::StatusCode;
use safekeeper_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::NodeId, logging::SecretString};
use crate::{
heartbeater::SafekeeperState,
persistence::{DatabaseError, SafekeeperPersistence},
safekeeper_client::SafekeeperClient,
};
#[derive(Clone)]
pub struct Safekeeper {
pub(crate) skp: SafekeeperPersistence,
cancel: CancellationToken,
listen_http_addr: String,
listen_http_port: u16,
id: NodeId,
availability: SafekeeperState,
}
impl Safekeeper {
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
Self {
cancel,
listen_http_addr: skp.host.clone(),
listen_http_port: skp.http_port as u16,
id: NodeId(skp.id as u64),
skp,
availability: SafekeeperState::Offline,
}
}
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}
pub(crate) fn get_id(&self) -> NodeId {
self.id
}
pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.skp.as_describe_response()
}
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
self.availability = availability;
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
jwt: &Option<SecretString>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> mgmt_api::Result<T>
where
O: FnMut(SafekeeperClient) -> F,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
}
}
backoff::retry(
|| {
let http_client = reqwest::ClientBuilder::new()
.timeout(timeout)
.build()
.expect("Failed to construct HTTP client");
let client = SafekeeperClient::from_client(
self.get_id(),
http_client,
self.base_url(),
jwt.clone(),
);
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
async {
tokio::select! {
r = op_fut=> {r},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}
}
},
is_fatal,
warn_threshold,
max_retries,
&format!(
"Call to node {} ({}:{}) management API",
self.id, self.listen_http_addr, self.listen_http_port
),
cancel,
)
.await
.unwrap_or(Err(mgmt_api::Error::Cancelled))
}
pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
let crate::persistence::SafekeeperUpsert {
active: _,
availability_zone_id: _,
host,
http_port,
id,
port: _,
region_id: _,
version: _,
} = record.clone();
if id != self.id.0 as i64 {
// The way the function is called ensures this. If we regress on that, it's a bug.
panic!(
"id can't be changed via update_from_record function: {id} != {}",
self.id.0
);
}
self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
);
self.listen_http_port = http_port as u16;
self.listen_http_addr = host;
}
}

View File

@@ -1,105 +0,0 @@
use crate::metrics::PageserverRequestLabelGroup;
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
///
/// Analogous to [`crate::pageserver_client::PageserverClient`].
#[derive(Debug, Clone)]
pub(crate) struct SafekeeperClient {
inner: Client,
node_id_label: String,
}
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_latency;
let _timer_guard = latency.start_timer(labels.clone());
let res = $invoke;
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}
res
}};
}
impl SafekeeperClient {
#[allow(dead_code)]
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
#[allow(dead_code)]
pub(crate) async fn create_timeline(
&self,
req: &TimelineCreateRequest,
) -> Result<TimelineStatus> {
measured_request!(
"create_timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.create_timeline(req).await
)
}
#[allow(dead_code)]
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineStatus> {
measured_request!(
"delete_timeline",
crate::metrics::Method::Delete,
&self.node_id_label,
self.inner.delete_timeline(tenant_id, timeline_id).await
)
}
pub(crate) async fn get_utilization(&self) -> Result<SafekeeperUtilization> {
measured_request!(
"utilization",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.utilization().await
)
}
}

View File

@@ -2,7 +2,6 @@ pub mod chaos_injector;
mod context_iterator;
use hyper::Uri;
use safekeeper_api::models::SafekeeperUtilization;
use std::{
borrow::Cow,
cmp::Ordering,
@@ -21,7 +20,6 @@ use crate::{
},
compute_hook::{self, NotifyError},
drain_utils::{self, TenantShardDrain, TenantShardIterator},
heartbeater::SafekeeperState,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
leadership::Leadership,
metrics,
@@ -31,7 +29,6 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper::Safekeeper,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
@@ -209,8 +206,6 @@ struct ServiceState {
nodes: Arc<HashMap<NodeId, Node>>,
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
scheduler: Scheduler,
/// Ongoing background operation on the cluster if any is running.
@@ -277,7 +272,6 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
impl ServiceState {
fn new(
nodes: HashMap<NodeId, Node>,
safekeepers: HashMap<NodeId, Safekeeper>,
tenants: BTreeMap<TenantShardId, TenantShard>,
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
@@ -289,7 +283,6 @@ impl ServiceState {
leadership_status: initial_leadership_status,
tenants,
nodes: Arc::new(nodes),
safekeepers: Arc::new(safekeepers),
scheduler,
ongoing_operation: None,
delayed_reconcile_rx,
@@ -306,23 +299,6 @@ impl ServiceState {
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
}
#[allow(clippy::type_complexity)]
fn parts_mut_sk(
&mut self,
) -> (
&mut Arc<HashMap<NodeId, Node>>,
&mut Arc<HashMap<NodeId, Safekeeper>>,
&mut BTreeMap<TenantShardId, TenantShard>,
&mut Scheduler,
) {
(
&mut self.nodes,
&mut self.safekeepers,
&mut self.tenants,
&mut self.scheduler,
)
}
fn get_leadership_status(&self) -> LeadershipStatus {
self.leadership_status
}
@@ -421,8 +397,7 @@ pub struct Service {
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
heartbeater_ps: Heartbeater<Node, PageserverState>,
heartbeater_sk: Heartbeater<Safekeeper, SafekeeperState>,
heartbeater: Heartbeater,
// Channel for background cleanup from failed operations that require cleanup, such as shard split
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
@@ -632,8 +607,7 @@ impl Service {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let (mut nodes_online, mut sks_online) =
self.initial_heartbeat_round(all_nodes.keys()).await;
let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
// List of tenants for which we will attempt to notify compute of their location at startup
let mut compute_notifications = Vec::new();
@@ -642,7 +616,7 @@ impl Service {
tracing::info!("Populating tenant shards' states from initial pageserver scan...");
let shard_count = {
let mut locked = self.inner.write().unwrap();
let (nodes, safekeepers, tenants, scheduler) = locked.parts_mut_sk();
let (nodes, tenants, scheduler) = locked.parts_mut();
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
let mut new_nodes = (**nodes).clone();
@@ -654,17 +628,6 @@ impl Service {
}
*nodes = Arc::new(new_nodes);
let mut new_sks = (**safekeepers).clone();
for (node_id, node) in new_sks.iter_mut() {
if let Some((utilization, last_seen_at)) = sks_online.remove(node_id) {
node.set_availability(SafekeeperState::Available {
utilization,
last_seen_at,
});
}
}
*safekeepers = Arc::new(new_sks);
for (tenant_shard_id, observed_state) in observed.0 {
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
for node_id in observed_state.locations.keys() {
@@ -773,10 +736,7 @@ impl Service {
async fn initial_heartbeat_round<'a>(
&self,
node_ids: impl Iterator<Item = &'a NodeId>,
) -> (
HashMap<NodeId, PageserverUtilization>,
HashMap<NodeId, (SafekeeperUtilization, Instant)>,
) {
) -> HashMap<NodeId, PageserverUtilization> {
assert!(!self.startup_complete.is_ready());
let all_nodes = {
@@ -796,20 +756,14 @@ impl Service {
}
}
let all_sks = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
tracing::info!("Sending initial heartbeats...");
let res_ps = self
.heartbeater_ps
let res = self
.heartbeater
.heartbeat(Arc::new(nodes_to_heartbeat))
.await;
let res_sk = self.heartbeater_sk.heartbeat(all_sks).await;
let mut online_nodes = HashMap::new();
if let Ok(deltas) = res_ps {
if let Ok(deltas) = res {
for (node_id, status) in deltas.0 {
match status {
PageserverState::Available { utilization, .. } => {
@@ -823,22 +777,7 @@ impl Service {
}
}
let mut online_sks = HashMap::new();
if let Ok(deltas) = res_sk {
for (node_id, status) in deltas.0 {
match status {
SafekeeperState::Available {
utilization,
last_seen_at,
} => {
online_sks.insert(node_id, (utilization, last_seen_at));
}
SafekeeperState::Offline => {}
}
}
}
(online_nodes, online_sks)
online_nodes
}
/// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
@@ -1045,14 +984,8 @@ impl Service {
locked.nodes.clone()
};
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
let res_ps = self.heartbeater_ps.heartbeat(nodes).await;
let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await;
if let Ok(deltas) = res_ps {
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
let mut to_handle = Vec::default();
for (node_id, state) in deltas.0 {
@@ -1153,18 +1086,6 @@ impl Service {
}
}
}
if let Ok(deltas) = res_sk {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {
let Some(sk) = safekeepers.get_mut(&id) else {
tracing::info!("Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}");
continue;
};
sk.set_availability(state);
}
locked.safekeepers = Arc::new(safekeepers);
}
}
}
@@ -1390,17 +1311,6 @@ impl Service {
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
tracing::info!("Loading safekeepers from database...");
let safekeepers = persistence
.list_safekeepers()
.await?
.into_iter()
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
.collect::<Vec<_>>();
let safekeepers: HashMap<NodeId, Safekeeper> =
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
tracing::info!(
@@ -1527,14 +1437,7 @@ impl Service {
let cancel = CancellationToken::new();
let reconcilers_cancel = cancel.child_token();
let heartbeater_ps = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
);
let heartbeater_sk = Heartbeater::new(
let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
@@ -1550,7 +1453,6 @@ impl Service {
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes,
safekeepers,
tenants,
scheduler,
delayed_reconcile_rx,
@@ -1560,8 +1462,7 @@ impl Service {
persistence,
compute_hook: Arc::new(ComputeHook::new(config.clone())),
result_tx,
heartbeater_ps,
heartbeater_sk,
heartbeater,
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
@@ -5213,12 +5114,7 @@ impl Service {
shard.sequence = shard.sequence.next();
}
let reconciler_config = match migrate_req.migration_config {
Some(cfg) => (&cfg).into(),
None => ReconcilerConfig::default(),
};
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
@@ -7765,54 +7661,29 @@ impl Service {
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
let locked = self.inner.read().unwrap();
let mut list = locked
.safekeepers
.iter()
.map(|sk| sk.1.describe_response())
.collect::<Result<Vec<_>, _>>()?;
list.sort_by_key(|v| v.id);
Ok(list)
self.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|v| v.as_describe_response())
.collect::<Result<Vec<_>, _>>()
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let locked = self.inner.read().unwrap();
let sk = locked
.safekeepers
.get(&NodeId(id as u64))
.ok_or(diesel::result::Error::NotFound)?;
sk.describe_response()
self.persistence
.safekeeper_get(id)
.await
.and_then(|v| v.as_describe_response())
}
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), DatabaseError> {
let node_id = NodeId(record.id as u64);
self.persistence.safekeeper_upsert(record.clone()).await?;
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
match safekeepers.entry(node_id) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().update_from_record(record);
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
),
CancellationToken::new(),
));
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
self.persistence.safekeeper_upsert(record).await
}
pub(crate) async fn set_safekeeper_scheduling_policy(
@@ -7822,20 +7693,7 @@ impl Service {
) -> Result<(), DatabaseError> {
self.persistence
.set_safekeeper_scheduling_policy(id, scheduling_policy)
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.skp.scheduling_policy = String::from(scheduling_policy);
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
.await
}
pub(crate) async fn update_shards_preferred_azs(

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
import abc
import asyncio
import concurrent.futures
import dataclasses
import filecmp
import json
import os
@@ -1676,12 +1675,6 @@ class StorageControllerLeadershipStatus(StrEnum):
CANDIDATE = "candidate"
@dataclass
class StorageControllerMigrationConfig:
secondary_warmup_timeout: str | None
secondary_download_request_timeout: str | None
class NeonStorageController(MetricsGetter, LogUtils):
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
self.env = env
@@ -2075,20 +2068,11 @@ class NeonStorageController(MetricsGetter, LogUtils):
shards: list[TenantShardId] = body["new_shards"]
return shards
def tenant_shard_migrate(
self,
tenant_shard_id: TenantShardId,
dest_ps_id: int,
config: StorageControllerMigrationConfig | None = None,
):
payload = {"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}
if config is not None:
payload["migration_config"] = dataclasses.asdict(config)
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
self.request(
"PUT",
f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate",
json=payload,
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
headers=self.headers(TokenScope.ADMIN),
)
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
@@ -4988,13 +4972,8 @@ def check_restored_datadir_content(
restored_files = list_files_to_compare(restored_dir_path)
# pg_notify files are always ignored
pgdata_files = [f for f in pgdata_files if not f.startswith("pg_notify")]
restored_files = [f for f in restored_files if not f.startswith("pg_notify")]
# pg_xact and pg_multixact files are optional in basebackup: depending on our configuration they
# may be omitted and loaded on demand.
if pgdata_files != restored_files:
# filter pg_xact and multixact files which are downloaded on demand
pgdata_files = [
f
for f in pgdata_files

View File

@@ -231,14 +231,14 @@ def test_pgdata_import_smoke(
shard_zero_http = shard_zero_ps.http_client()
shard_zero_timeline_info = shard_zero_http.timeline_detail(shard_zero["shard_id"], timeline_id)
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
min_readable_lsn = Lsn(shard_zero_timeline_info["min_readable_lsn"])
latest_gc_cutoff_lsn = Lsn(shard_zero_timeline_info["latest_gc_cutoff_lsn"])
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
assert remote_consistent_lsn_visible == disk_consistent_lsn
assert initdb_lsn == min_readable_lsn
assert initdb_lsn == latest_gc_cutoff_lsn
assert disk_consistent_lsn == initdb_lsn + 8
assert last_record_lsn == disk_consistent_lsn
# TODO: assert these values are the same everywhere

View File

@@ -10,18 +10,14 @@ from typing import TYPE_CHECKING
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonPageserver,
StorageControllerMigrationConfig,
)
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import (
assert_prefix_empty,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
from fixtures.utils import run_only_on_default_postgres, skip_in_debug_build, wait_until
from fixtures.utils import skip_in_debug_build, wait_until
from fixtures.workload import Workload
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -893,93 +889,3 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll
assert progress_3["heatmap_mtime"] is not None
assert progress_3["layers_total"] == progress_3["layers_downloaded"]
assert progress_3["bytes_total"] == progress_3["bytes_downloaded"]
@skip_in_debug_build("only run with release build")
@run_only_on_default_postgres("PG version is not interesting here")
def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_configs()
env.start()
assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Attached":1}')
env.storage_controller.reconcile_until_idle()
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
# Generate a bunch of small layers (we will apply a slowdown failpoint that works on a per-layer basis)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(128, upload=True)
workload.write_rows(128, upload=True)
workload.write_rows(128, upload=True)
workload.write_rows(128, upload=True)
workload.stop()
# Expect lots of layers
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
# Simulate large data by making layer downloads artifically slow
for ps in env.pageservers:
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
# Upload a heatmap, so that secondaries have something to download
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
# This has no chance to succeed: we have lots of layers and each one takes at least 1000ms.
# However, it pulls the heatmap, which will be important later.
http_client = env.storage_controller.pageserver_api()
(status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000)
assert status == 202
assert progress["heatmap_mtime"] is not None
assert progress["layers_downloaded"] > 0
assert progress["bytes_downloaded"] > 0
assert progress["layers_total"] > progress["layers_downloaded"]
assert progress["bytes_total"] > progress["bytes_downloaded"]
env.storage_controller.allowed_errors.extend(
[
".*Timed out.*downloading layers.*",
]
)
# Use a custom configuration that gives up earlier than usual.
# We can't hydrate everything anyway because of the failpoints.
config = StorageControllerMigrationConfig(
secondary_warmup_timeout="5s", secondary_download_request_timeout="2s"
)
env.storage_controller.tenant_shard_migrate(
TenantShardId(tenant_id, shard_number=0, shard_count=0), ps_secondary.id, config
)
env.storage_controller.reconcile_until_idle()
assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id
ps_secondary.http_client().tenant_heatmap_upload(tenant_id)
heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id)
assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0
# The new layer map should contain all the layers in the pre-migration one
# and a new in memory layer
assert len(heatmap_before_migration["timelines"][0]["layers"]) + 1 == len(
heatmap_after_migration["timelines"][0]["layers"]
)
log.info(
f'Heatmap size after cold migration is {len(heatmap_after_migration["timelines"][0]["layers"])}'
)
# TODO: Once we have an endpoint for rescuing the cold location, exercise it here.

View File

@@ -261,7 +261,7 @@ def test_isolation(
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)
# This fails with a mismatch on `pg_multixact/offsets/0000`
post_checks(env, test_output_dir, DBNAME, endpoint)
# post_checks(env, test_output_dir, DBNAME, endpoint)
# Run extra Neon-specific pg_regress-based tests. The tests and their

View File

@@ -287,7 +287,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
offset=offset,
)
# Do some update so we can increment gc_cutoff
# Do some update so we can increment latest_gc_cutoff
generate_updates_on_main(env, ep_main, i, end=100)
# Wait for the existing lease to expire.

View File

@@ -1821,7 +1821,7 @@ def test_sharding_gc(
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
ps.allowed_errors.extend(
[
".*could not find data for key.*",
".*could not find data for key 020000000000000000000000000000000000.*",
".*could not ingest record.*",
]
)

View File

@@ -318,7 +318,7 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
ps.allowed_errors.extend(
[
".*could not find data for key.*",
".*could not find data for key 020000000000000000000000000000000000.*",
".*could not ingest record.*",
]
)

View File

@@ -566,14 +566,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix)
# This test is flaky, probably because PUTs of local fs storage are not atomic.
# Let's keep both remote storage kinds for a while to see if this is the case.
# https://github.com/neondatabase/neon/issues/10761
@pytest.mark.parametrize("remote_storage_kind", [s3_storage(), RemoteStorageKind.LOCAL_FS])
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant

16
vendor/revisions.json vendored
View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.3",
"4d3a722312b496ff7378156caa6d41c2e70c30e4"
"17.2",
"4c45d78ad587e4bcb4a5a7ef6931b88c6a3d575d"
],
"v16": [
"16.7",
"999cf81b101ead40e597d5cd729458d8200f4537"
"16.6",
"13cf5d06c98a8e9b0590ce6cdfd193a08d0a7792"
],
"v15": [
"15.11",
"80ed91ce255c765d25be0bb4a02c942fe6311fbf"
"15.10",
"355a7c69d3f907f3612eb406cc7b9c2f55d59b59"
],
"v14": [
"14.16",
"62a86dfc91e0c35a72f2ea5e99e6969b830c0c26"
"14.15",
"c0aedfd3cac447510a2db843b561f0c52901b679"
]
}