mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
Compare commits
1 Commits
erik/safek
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3dd61ede6f |
116
.github/dependabot.yml
vendored
Normal file
116
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,116 @@
|
||||
version: 2
|
||||
|
||||
updates:
|
||||
- directory: /
|
||||
package-ecosystem: cargo
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: /
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: /
|
||||
package-ecosystem: github-actions
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: /
|
||||
package-ecosystem: pip
|
||||
schedule:
|
||||
interval: daily
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/csharp/npgsql
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/csharp/npgsql
|
||||
package-ecosystem: nuget
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/java/jdbc/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/python/asyncpg/
|
||||
package-ecosystem: pip
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/python/pg8000/
|
||||
package-ecosystem: pip
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/rust/tokio-postgres/
|
||||
package-ecosystem: cargo
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/rust/tokio-postgres/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresNIOExample/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresNIOExample/
|
||||
package-ecosystem: swift
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresClientKitExample/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/swift/PostgresClientKitExample/
|
||||
package-ecosystem: swift
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/postgresql-client/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/postgresql-client/
|
||||
package-ecosystem: npm
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/serverless-driver/
|
||||
package-ecosystem: docker
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
|
||||
- directory: test_runner/pg_clients/typescript/serverless-driver/
|
||||
package-ecosystem: npm
|
||||
schedule:
|
||||
interval: weekly
|
||||
rebase-strategy: auto
|
||||
@@ -431,11 +431,14 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
COPY compute/patches/rum.patch /rum.patch
|
||||
|
||||
# supports v17 since https://github.com/postgrespro/rum/commit/cb1edffc57736cd2a4455f8d0feab0d69928da25
|
||||
# doesn't use releases since 1.3.13 - Sep 19, 2022
|
||||
# use latest commit from the master branch
|
||||
RUN wget https://github.com/postgrespro/rum/archive/cb1edffc57736cd2a4455f8d0feab0d69928da25.tar.gz -O rum.tar.gz && \
|
||||
echo "65e0a752e99f4c3226400c9b899f997049e93503db8bf5c8072efa136d32fd83 rum.tar.gz" | sha256sum --check && \
|
||||
# maybe version-specific
|
||||
# support for v17 is unknown
|
||||
# last release 1.3.13 - Sep 19, 2022
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
|
||||
echo "6ab370532c965568df6210bd844ac6ba649f53055e48243525b0b7e5c4d69a7d rum.tar.gz" | sha256sum --check && \
|
||||
mkdir rum-src && cd rum-src && tar xzf ../rum.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /rum.patch && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
@@ -956,31 +959,21 @@ RUN apt-get install -y protobuf-compiler && \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-jsonschema-pg-build
|
||||
FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
ARG PG_VERSION
|
||||
# version 0.3.3 supports v17
|
||||
# last release v0.3.3 - Oct 16, 2024
|
||||
#
|
||||
# there were no breaking changes
|
||||
# so we can use the same version for all postgres versions
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16" | "v17") \
|
||||
export PG_JSONSCHEMA_VERSION=0.3.3 \
|
||||
export PG_JSONSCHEMA_CHECKSUM=40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_jsonschema does not yet have a release that supports pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v${PG_JSONSCHEMA_VERSION}.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "${PG_JSONSCHEMA_CHECKSUM} pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
|
||||
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
|
||||
# `unsafe-postgres` feature allows to build pgx extensions
|
||||
# against postgres forks that decided to change their ABI name (like us).
|
||||
# With that we can build extensions without forking them and using stock
|
||||
# pgx. As this feature is new few manual version bumps were required.
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
@@ -991,27 +984,16 @@ RUN case "${PG_VERSION}" in \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-graphql-pg-build
|
||||
FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# version 1.5.9 supports v17
|
||||
# last release v1.5.9 - Oct 16, 2024
|
||||
#
|
||||
# there were no breaking changes
|
||||
# so we can use the same version for all postgres versions
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16" | "v17") \
|
||||
export PG_GRAPHQL_VERSION=1.5.9 \
|
||||
export PG_GRAPHQL_CHECKSUM=cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_graphql does not yet have a release that supports pg17 as of now" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v${PG_GRAPHQL_VERSION}.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "${PG_GRAPHQL_CHECKSUM} pg_graphql.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.7.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 pg_graphql.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "=0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
|
||||
@@ -1024,13 +1006,15 @@ RUN case "${PG_VERSION}" in \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-tiktoken-pg-build
|
||||
FROM rust-extensions-build AS pg-tiktoken-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# doesn't use releases
|
||||
# 9118dd4549b7d8c0bbc98e04322499f7bf2fa6f7 - on Oct 29, 2024
|
||||
RUN wget https://github.com/kelvich/pg_tiktoken/archive/9118dd4549b7d8c0bbc98e04322499f7bf2fa6f7.tar.gz -O pg_tiktoken.tar.gz && \
|
||||
echo "a5bc447e7920ee149d3c064b8b9f0086c0e83939499753178f7d35788416f628 pg_tiktoken.tar.gz" | sha256sum --check && \
|
||||
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_tiktoken does not have versions, nor support for pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6878884c41a262318.tar.gz -O pg_tiktoken.tar.gz && \
|
||||
echo "e64e55aaa38c259512d3e27c572da22c4637418cf124caba904cd50944e5004e pg_tiktoken.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
|
||||
# TODO update pgrx version in the pg_tiktoken repo and remove this line
|
||||
sed -i 's/pgrx = { version = "=0.10.2",/pgrx = { version = "0.11.3",/g' Cargo.toml && \
|
||||
@@ -1048,8 +1032,6 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/9118dd4549b7d8c0bbc98e04
|
||||
FROM rust-extensions-build AS pg-pgx-ulid-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# doesn't support v17 yet
|
||||
# https://github.com/pksunkara/pgx_ulid/pull/52
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pgx_ulid does not support pg17 as of the latest version (0.1.5)" && exit 0;; \
|
||||
esac && \
|
||||
@@ -1067,16 +1049,16 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-session-jwt-build
|
||||
FROM rust-extensions-build AS pg-session-jwt-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# NOTE: local_proxy depends on the version of pg_session_jwt
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.1.2-v17.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "c8ecbed9cb8c6441bce5134a176002b043018adf9d05a08e457dda233090a86e pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_session_jwt does not yet have a release that supports pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/pg_session_jwt/archive/e1310b08ba51377a19e0559e4d1194883b9b2ba2.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "837932a077888d5545fd54b0abcc79e5f8e37017c2769a930afc2f5c94df6f4e pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "=0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release
|
||||
|
||||
#########################################################################################
|
||||
|
||||
@@ -562,7 +562,6 @@ pub enum BeMessage<'a> {
|
||||
options: &'a [&'a str],
|
||||
},
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
NeonInterpretedWalRecord(&'a [u8]), // TODO: use appropriate fields
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
@@ -997,17 +996,6 @@ impl BeMessage<'_> {
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
||||
// Neon extension: send interpreted WAL records to relevant pageservers. This is
|
||||
// temporary until we move to a different protocol for Safekeeper->Pageserver WAL
|
||||
// (possibly gRPC).
|
||||
BeMessage::NeonInterpretedWalRecord(data) => {
|
||||
buf.put_u8(b'z'); // arbitrary unused value
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u64(data.len() as u64);
|
||||
buf.put_slice(data);
|
||||
})
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,970 +1 @@
|
||||
//! This module contains logic for decoding and interpreting
|
||||
//! raw bytes which represent a raw Postgres WAL record.
|
||||
|
||||
use crate::models::*;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
impl InterpretedWalRecord {
|
||||
/// Decode and interpreted raw bytes which represent one Postgres WAL record.
|
||||
/// Data blocks which do not match the provided shard identity are filtered out.
|
||||
/// Shard 0 is a special case since it tracks all relation sizes. We only give it
|
||||
/// the keys that are being written as that is enough for updating relation sizes.
|
||||
pub fn from_bytes_filtered(
|
||||
buf: Bytes,
|
||||
shard: &ShardIdentity,
|
||||
lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<InterpretedWalRecord> {
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(buf, &mut decoded, pg_version)?;
|
||||
|
||||
let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) {
|
||||
FlushUncommittedRecords::Yes
|
||||
} else {
|
||||
FlushUncommittedRecords::No
|
||||
};
|
||||
|
||||
let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?;
|
||||
|
||||
let mut blocks = Vec::default();
|
||||
for blk in decoded.blocks.iter() {
|
||||
let rel = RelTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum,
|
||||
};
|
||||
|
||||
let key = rel_block_to_key(rel, blk.blkno);
|
||||
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key);
|
||||
}
|
||||
|
||||
let key_is_local = shard.is_key_local(&key);
|
||||
|
||||
tracing::debug!(
|
||||
lsn=%lsn,
|
||||
key=%key,
|
||||
"ingest: shard decision {}",
|
||||
if !key_is_local { "drop" } else { "keep" },
|
||||
);
|
||||
|
||||
if !key_is_local {
|
||||
if shard.is_shard_zero() {
|
||||
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
|
||||
// its blkno in case it implicitly extends a relation.
|
||||
blocks.push((key.to_compact(), None));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Instead of storing full-page-image WAL record,
|
||||
// it is better to store extracted image: we can skip wal-redo
|
||||
// in this case. Also some FPI records may contain multiple (up to 32) pages,
|
||||
// so them have to be copied multiple times.
|
||||
//
|
||||
let value = if blk.apply_image
|
||||
&& blk.has_image
|
||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
|
||||
// do not materialize null pages because them most likely be soon replaced with real data
|
||||
&& blk.bimg_len != 0
|
||||
{
|
||||
// Extract page image from FPI record
|
||||
let img_len = blk.bimg_len as usize;
|
||||
let img_offs = blk.bimg_offset as usize;
|
||||
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
||||
// TODO(vlad): skip the copy
|
||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||
|
||||
if blk.hole_length != 0 {
|
||||
let tail = image.split_off(blk.hole_offset as usize);
|
||||
image.resize(image.len() + blk.hole_length as usize, 0u8);
|
||||
image.unsplit(tail);
|
||||
}
|
||||
//
|
||||
// Match the logic of XLogReadBufferForRedoExtended:
|
||||
// The page may be uninitialized. If so, we can't set the LSN because
|
||||
// that would corrupt the page.
|
||||
//
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
assert_eq!(image.len(), BLCKSZ as usize);
|
||||
|
||||
Value::Image(image.freeze())
|
||||
} else {
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: decoded.record.clone(),
|
||||
})
|
||||
};
|
||||
|
||||
blocks.push((key.to_compact(), Some(value)));
|
||||
}
|
||||
|
||||
Ok(InterpretedWalRecord {
|
||||
metadata_record,
|
||||
blocks,
|
||||
lsn,
|
||||
flush_uncommitted,
|
||||
xid: decoded.xl_xid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl MetadataRecord {
|
||||
fn from_decoded(
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Note: this doesn't actually copy the bytes since
|
||||
// the [`Bytes`] type implements it via a level of indirection.
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
|
||||
match decoded.xl_rmid {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
Self::decode_heapam_record(&mut buf, decoded, pg_version)
|
||||
}
|
||||
pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version),
|
||||
// Handle other special record types
|
||||
pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded),
|
||||
pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_TBLSPC_ID => {
|
||||
tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
|
||||
Ok(None)
|
||||
}
|
||||
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, lsn),
|
||||
pg_constants::RM_MULTIXACT_ID => {
|
||||
Self::decode_multixact_record(&mut buf, decoded, pg_version)
|
||||
}
|
||||
pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded),
|
||||
// This is an odd duck. It needs to go to all shards.
|
||||
// Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
|
||||
// in WalIngest::new), we have to send the whole DecodedWalRecord::record to
|
||||
// the pageserver and decode it there.
|
||||
//
|
||||
// Alternatively, one can make the checkpoint part of the subscription protocol
|
||||
// to the pageserver. This should work fine, but can be done at a later point.
|
||||
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, lsn),
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
Self::decode_logical_message_record(&mut buf, decoded)
|
||||
}
|
||||
pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded),
|
||||
pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded),
|
||||
_unexpected => {
|
||||
// TODO: consider failing here instead of blindly doing something without
|
||||
// understanding the protocol
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_heapam_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Handle VM bit updates that are implicitly part of heap records.
|
||||
|
||||
// First, look at the record to determine which VM bits need
|
||||
// to be cleared. If either of these variables is set, we
|
||||
// need to clear the corresponding bits in the visibility map.
|
||||
let mut new_heap_blkno: Option<u32> = None;
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
|
||||
match pg_version {
|
||||
14 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v14::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v14::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v14::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v14::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v14::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v14::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
15 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v15::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v15::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v15::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v15::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v15::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v15::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
16 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v16::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v16::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v16::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v16::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v16::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v16::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
17 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = v17::XlHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v17::XlHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = v17::XlHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||
let xlrec = v17::XlHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = v17::XlHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||
let xlrec = v17::XlHeapLockUpdated::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||
let vm_rel = RelTag {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
spcnode: decoded.blocks[0].rnode_spcnode,
|
||||
dbnode: decoded.blocks[0].rnode_dbnode,
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
|
||||
Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits(
|
||||
ClearVmBits {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
vm_rel,
|
||||
flags,
|
||||
},
|
||||
))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_neonmgr_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// Handle VM bit updates that are implicitly part of heap records.
|
||||
|
||||
// First, look at the record to determine which VM bits need
|
||||
// to be cleared. If either of these variables is set, we
|
||||
// need to clear the corresponding bits in the visibility map.
|
||||
let mut new_heap_blkno: Option<u32> = None;
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
|
||||
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
|
||||
|
||||
match pg_version {
|
||||
16 | 17 => {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
match info {
|
||||
pg_constants::XLOG_NEON_HEAP_INSERT => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_DELETE => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_UPDATE
|
||||
| pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf);
|
||||
// the size of tuple data is inferred from the size of the record.
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf);
|
||||
|
||||
let offset_array_len =
|
||||
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
||||
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
||||
0
|
||||
} else {
|
||||
size_of::<u16>() * xlrec.ntuples as usize
|
||||
};
|
||||
assert_eq!(offset_array_len, buf.remaining());
|
||||
|
||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_LOCK => {
|
||||
let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf);
|
||||
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
}
|
||||
info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
|
||||
}
|
||||
}
|
||||
_ => anyhow::bail!(
|
||||
"Neon RMGR has no known compatibility with PostgreSQL version {}",
|
||||
pg_version
|
||||
),
|
||||
}
|
||||
|
||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||
let vm_rel = RelTag {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
spcnode: decoded.blocks[0].rnode_spcnode,
|
||||
dbnode: decoded.blocks[0].rnode_dbnode,
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
|
||||
Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(
|
||||
ClearVmBits {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
vm_rel,
|
||||
flags,
|
||||
},
|
||||
))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_smgr_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_SMGR_CREATE {
|
||||
let create = XlSmgrCreate::decode(buf);
|
||||
let rel = RelTag {
|
||||
spcnode: create.rnode.spcnode,
|
||||
dbnode: create.rnode.dbnode,
|
||||
relnode: create.rnode.relnode,
|
||||
forknum: create.forknum,
|
||||
};
|
||||
|
||||
return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate {
|
||||
rel,
|
||||
}))));
|
||||
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
|
||||
let truncate = XlSmgrTruncate::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_dbase_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
// TODO: Refactor this to avoid the duplication between postgres versions.
|
||||
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
|
||||
|
||||
if pg_version == 14 {
|
||||
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
tracing::debug!("XLOG_DBASE_CREATE v14");
|
||||
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
} else if pg_version == 15 {
|
||||
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
// So we can reuse XlCreateDatabase here.
|
||||
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
||||
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
} else if pg_version == 16 {
|
||||
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
// So we can reuse XlCreateDatabase here.
|
||||
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
||||
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
} else if pg_version == 17 {
|
||||
if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
// So we can reuse XlCreateDatabase here.
|
||||
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
||||
|
||||
let createdb = XlCreateDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
|
||||
db_id: createdb.db_id,
|
||||
tablespace_id: createdb.tablespace_id,
|
||||
src_db_id: createdb.src_db_id,
|
||||
src_tablespace_id: createdb.src_tablespace_id,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(buf);
|
||||
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
|
||||
db_id: dropdb.db_id,
|
||||
tablespace_ids: dropdb.tablespace_ids,
|
||||
}));
|
||||
|
||||
return Ok(Some(record));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_clog_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
|
||||
if info == pg_constants::CLOG_ZEROPAGE {
|
||||
let pageno = if pg_version < 17 {
|
||||
buf.get_u32_le()
|
||||
} else {
|
||||
buf.get_u64_le() as u32
|
||||
};
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
|
||||
Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage(
|
||||
ClogZeroPage { segno, rpageno },
|
||||
))))
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
let xlrec = XlClogTruncate::decode(buf, pg_version);
|
||||
|
||||
Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
|
||||
ClogTruncate {
|
||||
pageno: xlrec.pageno,
|
||||
oldest_xid: xlrec.oldest_xid,
|
||||
oldest_xid_db: xlrec.oldest_xid_db,
|
||||
},
|
||||
))))
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_xact_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
let origin_id = decoded.origin_id;
|
||||
let xl_xid = decoded.xl_xid;
|
||||
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
}))));
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
}))));
|
||||
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared(
|
||||
XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
},
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared(
|
||||
XactCommon {
|
||||
parsed,
|
||||
origin_id,
|
||||
xl_xid,
|
||||
lsn,
|
||||
},
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||
return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare(
|
||||
XactPrepare {
|
||||
xl_xid: decoded.xl_xid,
|
||||
data: Bytes::copy_from_slice(&buf[..]),
|
||||
},
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_multixact_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
|
||||
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|
||||
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
|
||||
{
|
||||
let pageno = if pg_version < 17 {
|
||||
buf.get_u32_le()
|
||||
} else {
|
||||
buf.get_u64_le() as u32
|
||||
};
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
|
||||
let slru_kind = match info {
|
||||
pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
|
||||
pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage(
|
||||
MultiXactZeroPage {
|
||||
slru_kind,
|
||||
segno,
|
||||
rpageno,
|
||||
},
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(buf);
|
||||
return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create(
|
||||
xlrec,
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(buf);
|
||||
return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate(
|
||||
xlrec,
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_relmap_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let update = XlRelmapUpdate::decode(buf);
|
||||
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
// skip xl_relmap_update
|
||||
buf.advance(12);
|
||||
|
||||
Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update(
|
||||
RelmapUpdate {
|
||||
update,
|
||||
buf: Bytes::copy_from_slice(&buf[..]),
|
||||
},
|
||||
))))
|
||||
}
|
||||
|
||||
fn decode_xlog_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord {
|
||||
info,
|
||||
lsn,
|
||||
buf: buf.clone(),
|
||||
}))))
|
||||
}
|
||||
|
||||
fn decode_logical_message_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
|
||||
let xlrec = XlLogicalMessage::decode(buf);
|
||||
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
if prefix == "neon-test" {
|
||||
return Ok(Some(MetadataRecord::LogicalMessage(
|
||||
LogicalMessageRecord::Failpoint,
|
||||
)));
|
||||
}
|
||||
|
||||
if let Some(path) = prefix.strip_prefix("neon-file:") {
|
||||
let buf_size = xlrec.prefix_size + xlrec.message_size;
|
||||
let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
|
||||
return Ok(Some(MetadataRecord::LogicalMessage(
|
||||
LogicalMessageRecord::Put(PutLogicalMessage {
|
||||
path: path.to_string(),
|
||||
buf,
|
||||
}),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_standby_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = XlRunningXacts::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts(
|
||||
StandbyRunningXacts {
|
||||
oldest_running_xid: xlrec.oldest_running_xid,
|
||||
},
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn decode_replorigin_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_REPLORIGIN_SET {
|
||||
let xlrec = XlReploriginSet::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set(
|
||||
xlrec,
|
||||
))));
|
||||
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
|
||||
let xlrec = XlReploriginDrop::decode(buf);
|
||||
return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop(
|
||||
xlrec,
|
||||
))));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,9 +25,7 @@
|
||||
//! |--> write to KV store within the pageserver
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::CompactKey;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_ffi::walrecord::{
|
||||
XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet,
|
||||
XlSmgrTruncate, XlXactParsedRecord,
|
||||
@@ -35,48 +33,6 @@ use postgres_ffi::walrecord::{
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub enum FlushUncommittedRecords {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
pub struct InterpretedWalRecord {
|
||||
/// Optional metadata record - may cause writes to metadata keys
|
||||
/// in the storage engine
|
||||
pub metadata_record: Option<MetadataRecord>,
|
||||
/// Images or deltas for blocks modified in the original WAL record.
|
||||
/// The [`Value`] is optional to avoid sending superfluous data to
|
||||
/// shard 0 for relation size tracking.
|
||||
pub blocks: Vec<(CompactKey, Option<Value>)>,
|
||||
/// Byte offset within WAL for the end of the original PG WAL record
|
||||
pub lsn: Lsn,
|
||||
/// Whether to flush all uncommitted modifications to the storage engine
|
||||
/// before ingesting this record. This is currently only used for legacy PG
|
||||
/// database creations which read pages from a template database. Such WAL
|
||||
/// records require reading data blocks while ingesting, hence the need to flush.
|
||||
pub flush_uncommitted: FlushUncommittedRecords,
|
||||
/// Transaction id of the original PG WAL record
|
||||
pub xid: TransactionId,
|
||||
}
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
pub enum MetadataRecord {
|
||||
Heapam(HeapamRecord),
|
||||
Neonrmgr(NeonrmgrRecord),
|
||||
Smgr(SmgrRecord),
|
||||
Dbase(DbaseRecord),
|
||||
Clog(ClogRecord),
|
||||
Xact(XactRecord),
|
||||
MultiXact(MultiXactRecord),
|
||||
Relmap(RelmapRecord),
|
||||
Xlog(XlogRecord),
|
||||
LogicalMessage(LogicalMessageRecord),
|
||||
Standby(StandbyRecord),
|
||||
Replorigin(ReploriginRecord),
|
||||
}
|
||||
|
||||
pub enum HeapamRecord {
|
||||
ClearVmBits(ClearVmBits),
|
||||
}
|
||||
|
||||
@@ -398,7 +398,9 @@ fn start_pageserver(
|
||||
ControllerUpcallClient::new(conf, &shutdown_pageserver),
|
||||
conf,
|
||||
);
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
if let Some(deletion_workers) = deletion_workers {
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
}
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
|
||||
@@ -14,7 +14,6 @@ use itertools::Itertools as _;
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
@@ -36,62 +35,12 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// upload attempts.
|
||||
type RawMetric = (MetricsKey, (EventType, u64));
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct NewMetricsRoot {
|
||||
version: usize,
|
||||
metrics: Vec<NewRawMetric>,
|
||||
}
|
||||
|
||||
impl NewMetricsRoot {
|
||||
pub fn is_v2_metrics(json_value: &serde_json::Value) -> bool {
|
||||
if let Some(ver) = json_value.get("version") {
|
||||
if let Some(2) = ver.as_u64() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize)]
|
||||
struct NewMetricsRefRoot<'a> {
|
||||
version: usize,
|
||||
metrics: &'a [NewRawMetric],
|
||||
}
|
||||
|
||||
impl<'a> NewMetricsRefRoot<'a> {
|
||||
fn new(metrics: &'a [NewRawMetric]) -> Self {
|
||||
Self {
|
||||
version: 2,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
struct NewRawMetric {
|
||||
key: MetricsKey,
|
||||
kind: EventType,
|
||||
value: u64,
|
||||
// TODO: add generation field and check against generations
|
||||
}
|
||||
|
||||
impl NewRawMetric {
|
||||
#[cfg(test)]
|
||||
fn to_kv_pair(&self) -> (MetricsKey, NewRawMetric) {
|
||||
(self.key, self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Caches the [`RawMetric`]s
|
||||
///
|
||||
/// In practice, during startup, last sent values are stored here to be used in calculating new
|
||||
/// ones. After successful uploading, the cached values are updated to cache. This used to be used
|
||||
/// for deduplication, but that is no longer needed.
|
||||
type Cache = HashMap<MetricsKey, NewRawMetric>;
|
||||
type Cache = HashMap<MetricsKey, (EventType, u64)>;
|
||||
|
||||
pub async fn run(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -282,14 +231,11 @@ async fn restore_and_reschedule(
|
||||
// collect_all_metrics
|
||||
let earlier_metric_at = found_some
|
||||
.iter()
|
||||
.map(|item| item.kind.recorded_at())
|
||||
.map(|(_, (et, _))| et.recorded_at())
|
||||
.copied()
|
||||
.next();
|
||||
|
||||
let cached = found_some
|
||||
.into_iter()
|
||||
.map(|item| (item.key, item))
|
||||
.collect::<Cache>();
|
||||
let cached = found_some.into_iter().collect::<Cache>();
|
||||
|
||||
(cached, earlier_metric_at)
|
||||
}
|
||||
|
||||
@@ -2,33 +2,11 @@ use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::consumption_metrics::NewMetricsRefRoot;
|
||||
|
||||
use super::{NewMetricsRoot, NewRawMetric, RawMetric};
|
||||
|
||||
pub(super) fn read_metrics_from_serde_value(
|
||||
json_value: serde_json::Value,
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
if NewMetricsRoot::is_v2_metrics(&json_value) {
|
||||
let root = serde_json::from_value::<NewMetricsRoot>(json_value)?;
|
||||
Ok(root.metrics)
|
||||
} else {
|
||||
let all_metrics = serde_json::from_value::<Vec<RawMetric>>(json_value)?;
|
||||
let all_metrics = all_metrics
|
||||
.into_iter()
|
||||
.map(|(key, (event_type, value))| NewRawMetric {
|
||||
key,
|
||||
kind: event_type,
|
||||
value,
|
||||
})
|
||||
.collect();
|
||||
Ok(all_metrics)
|
||||
}
|
||||
}
|
||||
use super::RawMetric;
|
||||
|
||||
pub(super) async fn read_metrics_from_disk(
|
||||
path: Arc<Utf8PathBuf>,
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
) -> anyhow::Result<Vec<RawMetric>> {
|
||||
// do not add context to each error, callsite will log with full path
|
||||
let span = tracing::Span::current();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
@@ -42,8 +20,7 @@ pub(super) async fn read_metrics_from_disk(
|
||||
|
||||
let mut file = std::fs::File::open(&*path)?;
|
||||
let reader = std::io::BufReader::new(&mut file);
|
||||
let json_value = serde_json::from_reader::<_, serde_json::Value>(reader)?;
|
||||
read_metrics_from_serde_value(json_value)
|
||||
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
|
||||
})
|
||||
.await
|
||||
.context("read metrics join error")
|
||||
@@ -86,7 +63,7 @@ fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
|
||||
}
|
||||
|
||||
pub(super) async fn flush_metrics_to_disk(
|
||||
current_metrics: &Arc<Vec<NewRawMetric>>,
|
||||
current_metrics: &Arc<Vec<RawMetric>>,
|
||||
path: &Arc<Utf8PathBuf>,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::io::Write;
|
||||
@@ -116,11 +93,8 @@ pub(super) async fn flush_metrics_to_disk(
|
||||
// write out all of the raw metrics, to be read out later on restart as cached values
|
||||
{
|
||||
let mut writer = std::io::BufWriter::new(&mut tempfile);
|
||||
serde_json::to_writer(
|
||||
&mut writer,
|
||||
&NewMetricsRefRoot::new(current_metrics.as_ref()),
|
||||
)
|
||||
.context("serialize metrics")?;
|
||||
serde_json::to_writer(&mut writer, &*current_metrics)
|
||||
.context("serialize metrics")?;
|
||||
writer
|
||||
.into_inner()
|
||||
.map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
|
||||
|
||||
@@ -9,7 +9,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{Cache, NewRawMetric};
|
||||
use super::{Cache, RawMetric};
|
||||
|
||||
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
|
||||
/// instead of static str.
|
||||
@@ -64,21 +64,11 @@ impl MetricsKey {
|
||||
struct AbsoluteValueFactory(MetricsKey);
|
||||
|
||||
impl AbsoluteValueFactory {
|
||||
#[cfg(test)]
|
||||
const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
let key = self.0;
|
||||
(key, (EventType::Absolute { time }, val))
|
||||
}
|
||||
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
|
||||
let key = self.0;
|
||||
NewRawMetric {
|
||||
key,
|
||||
kind: EventType::Absolute { time },
|
||||
value: val,
|
||||
}
|
||||
}
|
||||
|
||||
fn key(&self) -> &MetricsKey {
|
||||
&self.0
|
||||
}
|
||||
@@ -94,28 +84,7 @@ impl IncrementalValueFactory {
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> NewRawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
let when = EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
};
|
||||
NewRawMetric {
|
||||
key,
|
||||
kind: when,
|
||||
value: val,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
#[cfg(test)]
|
||||
const fn from_until_old_format(
|
||||
self,
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> super::RawMetric {
|
||||
) -> RawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
let when = EventType::Incremental {
|
||||
@@ -216,7 +185,7 @@ pub(super) async fn collect_all_metrics(
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
cached_metrics: &Cache,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<NewRawMetric> {
|
||||
) -> Vec<RawMetric> {
|
||||
use pageserver_api::models::TenantState;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
@@ -251,11 +220,11 @@ pub(super) async fn collect_all_metrics(
|
||||
res
|
||||
}
|
||||
|
||||
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
|
||||
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
|
||||
where
|
||||
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
|
||||
{
|
||||
let mut current_metrics: Vec<NewRawMetric> = Vec::new();
|
||||
let mut current_metrics: Vec<RawMetric> = Vec::new();
|
||||
|
||||
let mut tenants = std::pin::pin!(tenants);
|
||||
|
||||
@@ -322,7 +291,7 @@ impl TenantSnapshot {
|
||||
tenant_id: TenantId,
|
||||
now: DateTime<Utc>,
|
||||
cached: &Cache,
|
||||
metrics: &mut Vec<NewRawMetric>,
|
||||
metrics: &mut Vec<RawMetric>,
|
||||
) {
|
||||
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
|
||||
|
||||
@@ -333,9 +302,9 @@ impl TenantSnapshot {
|
||||
let mut synthetic_size = self.synthetic_size;
|
||||
|
||||
if synthetic_size == 0 {
|
||||
if let Some(item) = cached.get(factory.key()) {
|
||||
// use the latest value from previous session, TODO: check generation number
|
||||
synthetic_size = item.value;
|
||||
if let Some((_, value)) = cached.get(factory.key()) {
|
||||
// use the latest value from previous session
|
||||
synthetic_size = *value;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -412,36 +381,37 @@ impl TimelineSnapshot {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
metrics: &mut Vec<NewRawMetric>,
|
||||
metrics: &mut Vec<RawMetric>,
|
||||
cache: &Cache,
|
||||
) {
|
||||
let timeline_written_size = u64::from(self.last_record_lsn);
|
||||
|
||||
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
|
||||
|
||||
let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
|
||||
item.kind
|
||||
.incremental_timerange()
|
||||
.expect("never create EventType::Absolute for written_size_delta")
|
||||
.end
|
||||
});
|
||||
let last_stop_time = cache
|
||||
.get(written_size_delta_key.key())
|
||||
.map(|(until, _val)| {
|
||||
until
|
||||
.incremental_timerange()
|
||||
.expect("never create EventType::Absolute for written_size_delta")
|
||||
.end
|
||||
});
|
||||
|
||||
let written_size_now =
|
||||
let (key, written_size_now) =
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
|
||||
|
||||
// by default, use the last sent written_size as the basis for
|
||||
// calculating the delta. if we don't yet have one, use the load time value.
|
||||
let prev: (DateTime<Utc>, u64) = cache
|
||||
.get(&written_size_now.key)
|
||||
.map(|item| {
|
||||
let prev = cache
|
||||
.get(&key)
|
||||
.map(|(prev_at, prev)| {
|
||||
// use the prev time from our last incremental update, or default to latest
|
||||
// absolute update on the first round.
|
||||
let prev_at = item
|
||||
.kind
|
||||
let prev_at = prev_at
|
||||
.absolute_time()
|
||||
.expect("never create EventType::Incremental for written_size");
|
||||
let prev_at = last_stop_time.unwrap_or(prev_at);
|
||||
(*prev_at, item.value)
|
||||
(*prev_at, *prev)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
// if we don't have a previous point of comparison, compare to the load time
|
||||
@@ -452,28 +422,24 @@ impl TimelineSnapshot {
|
||||
|
||||
let up_to = now;
|
||||
|
||||
if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
|
||||
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
|
||||
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
|
||||
// written_size_delta
|
||||
metrics.push(key_value);
|
||||
// written_size
|
||||
metrics.push(written_size_now);
|
||||
metrics.push((key, written_size_now));
|
||||
} else {
|
||||
// the cached value was ahead of us, report zero until we've caught up
|
||||
metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
|
||||
// the cached value was ahead of us, report the same until we've caught up
|
||||
metrics.push(NewRawMetric {
|
||||
key: written_size_now.key,
|
||||
kind: written_size_now.kind,
|
||||
value: prev.1,
|
||||
});
|
||||
metrics.push((key, (written_size_now.0, prev.1)));
|
||||
}
|
||||
|
||||
{
|
||||
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
|
||||
let current_or_previous = self
|
||||
.current_exact_logical_size
|
||||
.or_else(|| cache.get(factory.key()).map(|item| item.value));
|
||||
.or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
|
||||
|
||||
if let Some(size) = current_or_previous {
|
||||
metrics.push(factory.at(now, size));
|
||||
@@ -486,4 +452,4 @@ impl TimelineSnapshot {
|
||||
mod tests;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::{metric_examples, metric_examples_old};
|
||||
pub(crate) use tests::metric_examples;
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use crate::consumption_metrics::RawMetric;
|
||||
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -52,9 +50,9 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before, disk_consistent_lsn.0)
|
||||
.to_kv_pair()]);
|
||||
let cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
|
||||
]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
@@ -91,13 +89,9 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([
|
||||
// at t=before was the last time the last_record_lsn changed
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before, disk_consistent_lsn.0)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
|
||||
// end time of this event is used for the next ones
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(before, just_before, 0)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0),
|
||||
]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
@@ -144,17 +138,13 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
),
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
@@ -173,7 +163,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
);
|
||||
|
||||
// now if we cache these metrics, and re-run while "still in recovery"
|
||||
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
|
||||
cache.extend(metrics.drain(..));
|
||||
|
||||
// "still in recovery", because our snapshot did not change
|
||||
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
|
||||
@@ -204,14 +194,14 @@ fn post_restart_current_exact_logical_size_uses_cached() {
|
||||
current_exact_logical_size: None,
|
||||
};
|
||||
|
||||
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair()]);
|
||||
let cache = HashMap::from([
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100)
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
metrics.retain(|item| item.key.metric == Name::LogicalSize);
|
||||
metrics.retain(|(key, _)| key.metric == Name::LogicalSize);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
@@ -234,9 +224,7 @@ fn post_restart_synthetic_size_uses_cached_if_available() {
|
||||
let before_restart = DateTime::<Utc>::from(now - std::time::Duration::from_secs(5 * 60));
|
||||
let now = DateTime::<Utc>::from(now);
|
||||
|
||||
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id)
|
||||
.at(before_restart, 1000)
|
||||
.to_kv_pair()]);
|
||||
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
|
||||
@@ -290,29 +278,12 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
times
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples_old(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [RawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until_old_format(before, now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::resident_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
|
||||
]
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [NewRawMetric; 6] {
|
||||
) -> [RawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
|
||||
@@ -7,7 +7,7 @@ use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
|
||||
use super::{metrics::Name, Cache, MetricsKey, NewRawMetric, RawMetric};
|
||||
use super::{metrics::Name, Cache, MetricsKey, RawMetric};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// How the metrics from pageserver are identified.
|
||||
@@ -24,7 +24,7 @@ pub(super) async fn upload_metrics_http(
|
||||
client: &reqwest::Client,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
cancel: &CancellationToken,
|
||||
metrics: &[NewRawMetric],
|
||||
metrics: &[RawMetric],
|
||||
cached_metrics: &mut Cache,
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -53,8 +53,8 @@ pub(super) async fn upload_metrics_http(
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
for item in chunk {
|
||||
cached_metrics.insert(item.key, item.clone());
|
||||
for (curr_key, curr_val) in chunk {
|
||||
cached_metrics.insert(*curr_key, *curr_val);
|
||||
}
|
||||
uploaded += chunk.len();
|
||||
}
|
||||
@@ -86,7 +86,7 @@ pub(super) async fn upload_metrics_bucket(
|
||||
client: &GenericRemoteStorage,
|
||||
cancel: &CancellationToken,
|
||||
node_id: &str,
|
||||
metrics: &[NewRawMetric],
|
||||
metrics: &[RawMetric],
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
if metrics.is_empty() {
|
||||
@@ -140,16 +140,16 @@ pub(super) async fn upload_metrics_bucket(
|
||||
/// across different metrics sinks), and must have the same length as input.
|
||||
fn serialize_in_chunks<'a>(
|
||||
chunk_size: usize,
|
||||
input: &'a [NewRawMetric],
|
||||
input: &'a [RawMetric],
|
||||
idempotency_keys: &'a [IdempotencyKey<'a>],
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
{
|
||||
use bytes::BufMut;
|
||||
|
||||
assert_eq!(input.len(), idempotency_keys.len());
|
||||
|
||||
struct Iter<'a> {
|
||||
inner: std::slice::Chunks<'a, NewRawMetric>,
|
||||
inner: std::slice::Chunks<'a, RawMetric>,
|
||||
idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
|
||||
chunk_size: usize,
|
||||
|
||||
@@ -160,7 +160,7 @@ fn serialize_in_chunks<'a>(
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Iter<'a> {
|
||||
type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
|
||||
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let chunk = self.inner.next()?;
|
||||
@@ -269,58 +269,6 @@ impl RawMetricExt for RawMetric {
|
||||
}
|
||||
}
|
||||
|
||||
impl RawMetricExt for NewRawMetric {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.key;
|
||||
|
||||
let kind = self.kind;
|
||||
let value = self.value;
|
||||
|
||||
Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
|
||||
use std::fmt::Write;
|
||||
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.key;
|
||||
|
||||
let kind = self.kind;
|
||||
let value = self.value;
|
||||
|
||||
*event = Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: {
|
||||
event.idempotency_key.clear();
|
||||
write!(event.idempotency_key, "{key}").unwrap();
|
||||
std::mem::take(&mut event.idempotency_key)
|
||||
},
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait KeyGen<'a> {
|
||||
fn generate(&self) -> IdempotencyKey<'a>;
|
||||
}
|
||||
@@ -433,10 +381,6 @@ async fn upload(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::consumption_metrics::{
|
||||
disk_cache::read_metrics_from_serde_value, NewMetricsRefRoot,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -529,49 +473,23 @@ mod tests {
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
|
||||
let examples = examples.into_iter().zip(metric_samples());
|
||||
|
||||
for ((line, expected), item) in examples {
|
||||
for ((line, expected), (key, (kind, value))) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind: item.kind,
|
||||
metric: item.key.metric,
|
||||
kind,
|
||||
metric: key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value: item.value,
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id: item.key.tenant_id,
|
||||
timeline_id: item.key.timeline_id,
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(
|
||||
expected, actual,
|
||||
"example for {:?} from line {line}",
|
||||
item.kind
|
||||
);
|
||||
assert_eq!(expected, actual, "example for {kind:?} from line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disk_format_upgrade() {
|
||||
let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
|
||||
let new_samples =
|
||||
serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
|
||||
let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
|
||||
let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
|
||||
assert_eq!(upgraded_samples, new_samples);
|
||||
}
|
||||
|
||||
fn metric_samples_old() -> [RawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
|
||||
.unwrap()
|
||||
.into();
|
||||
let [now, before] = [*SAMPLES_NOW, before];
|
||||
|
||||
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
|
||||
}
|
||||
|
||||
fn metric_samples() -> [NewRawMetric; 6] {
|
||||
fn metric_samples() -> [RawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
|
||||
@@ -618,11 +618,13 @@ impl DeletionQueue {
|
||||
/// Caller may use the returned object to construct clients with new_client.
|
||||
/// Caller should tokio::spawn the background() members of the two worker objects returned:
|
||||
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
|
||||
///
|
||||
/// If remote_storage is None, then the returned workers will also be None.
|
||||
pub fn new<C>(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
controller_upcall_client: Option<C>,
|
||||
conf: &'static PageServerConf,
|
||||
) -> (Self, DeletionQueueWorkers<C>)
|
||||
) -> (Self, Option<DeletionQueueWorkers<C>>)
|
||||
where
|
||||
C: ControlPlaneGenerationsApi + Send + Sync,
|
||||
{
|
||||
@@ -654,7 +656,7 @@ impl DeletionQueue {
|
||||
},
|
||||
cancel: cancel.clone(),
|
||||
},
|
||||
DeletionQueueWorkers {
|
||||
Some(DeletionQueueWorkers {
|
||||
frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
|
||||
backend: Validator::new(
|
||||
conf,
|
||||
@@ -665,7 +667,7 @@ impl DeletionQueue {
|
||||
cancel.clone(),
|
||||
),
|
||||
executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -740,7 +742,9 @@ mod test {
|
||||
);
|
||||
|
||||
tracing::debug!("Spawning worker for new queue queue");
|
||||
let worker_join = workers.spawn_with(&tokio::runtime::Handle::current());
|
||||
let worker_join = workers
|
||||
.unwrap()
|
||||
.spawn_with(&tokio::runtime::Handle::current());
|
||||
|
||||
let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
|
||||
let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
|
||||
@@ -851,6 +855,7 @@ mod test {
|
||||
harness.conf,
|
||||
);
|
||||
|
||||
let worker = worker.unwrap();
|
||||
let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
|
||||
|
||||
Ok(TestSetup {
|
||||
|
||||
@@ -80,7 +80,6 @@ use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::tenant::timeline::offload::offload_timeline;
|
||||
use crate::tenant::timeline::offload::OffloadError;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
@@ -2005,12 +2004,7 @@ async fn timeline_offload_handler(
|
||||
}
|
||||
offload_timeline(&tenant, &timeline)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
OffloadError::Cancelled => ApiError::ResourceUnavailable("Timeline shutting down".into()),
|
||||
_ => ApiError::InternalServerError(anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -2066,7 +2060,6 @@ async fn timeline_checkpoint_handler(
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e)
|
||||
}
|
||||
)?;
|
||||
|
||||
@@ -12,7 +12,6 @@ use pageserver_api::key::rel_block_to_key;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
use tokio_tar::Archive;
|
||||
use tracing::*;
|
||||
use wal_decoder::models::InterpretedWalRecord;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -24,6 +23,7 @@ use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::*;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use postgres_ffi::DBState_DB_SHUTDOWNED;
|
||||
use postgres_ffi::Oid;
|
||||
@@ -312,15 +312,11 @@ async fn import_wal(
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
tline.get_shard_identity(),
|
||||
lsn,
|
||||
tline.pg_version,
|
||||
)?;
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
|
||||
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, ctx)
|
||||
.await?;
|
||||
WAL_INGEST.records_committed.inc();
|
||||
|
||||
@@ -457,15 +453,10 @@ pub async fn import_wal_from_tar(
|
||||
let mut modification = tline.begin_modification(last_lsn);
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
tline.get_shard_identity(),
|
||||
lsn,
|
||||
tline.pg_version,
|
||||
)?;
|
||||
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, ctx)
|
||||
.await?;
|
||||
modification.commit(ctx).await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
@@ -521,6 +521,13 @@ pub struct OffloadedTimeline {
|
||||
/// Present for future flattening deliberations.
|
||||
pub archived_at: NaiveDateTime,
|
||||
|
||||
/// Lazily constructed remote client for the timeline
|
||||
///
|
||||
/// If we offload a timeline, we keep around the remote client
|
||||
/// for the duration of the process. If we find it through the
|
||||
/// manifest, we don't construct it up until it's needed (deletion).
|
||||
pub remote_client: Option<Arc<RemoteTimelineClient>>,
|
||||
|
||||
/// Prevent two tasks from deleting the timeline at the same time. If held, the
|
||||
/// timeline is being deleted. If 'true', the timeline has already been deleted.
|
||||
pub delete_progress: TimelineDeleteProgress,
|
||||
@@ -547,6 +554,7 @@ impl OffloadedTimeline {
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
|
||||
remote_client: Some(timeline.remote_client.clone()),
|
||||
delete_progress: timeline.delete_progress.clone(),
|
||||
})
|
||||
}
|
||||
@@ -563,6 +571,7 @@ impl OffloadedTimeline {
|
||||
ancestor_timeline_id,
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
remote_client: None,
|
||||
delete_progress: TimelineDeleteProgress::default(),
|
||||
}
|
||||
}
|
||||
@@ -627,7 +636,7 @@ impl TimelineOrOffloaded {
|
||||
fn maybe_remote_client(&self) -> Option<Arc<RemoteTimelineClient>> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => Some(timeline.remote_client.clone()),
|
||||
TimelineOrOffloaded::Offloaded(_offloaded) => None,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.remote_client.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2529,11 +2538,6 @@ impl Tenant {
|
||||
.await
|
||||
.inspect_err(|e| match e {
|
||||
timeline::CompactionError::ShuttingDown => (),
|
||||
timeline::CompactionError::Offload(_) => {
|
||||
// Failures to offload timelines do not trip the circuit breaker, because
|
||||
// they do not do lots of writes the way compaction itself does: it is cheap
|
||||
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
|
||||
}
|
||||
timeline::CompactionError::Other(e) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
@@ -2549,7 +2553,8 @@ impl Tenant {
|
||||
if pending_task_left == Some(false) && *can_offload {
|
||||
offload_timeline(self, timeline)
|
||||
.instrument(info_span!("offload_timeline", %timeline_id))
|
||||
.await?;
|
||||
.await
|
||||
.map_err(timeline::CompactionError::Other)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -279,7 +279,6 @@ fn log_compaction_error(
|
||||
|
||||
let decision = match e {
|
||||
ShuttingDown => None,
|
||||
Offload(_) => Some(LooksLike::Error),
|
||||
_ if task_cancelled => Some(LooksLike::Info),
|
||||
Other(e) => {
|
||||
let root_cause = e.root_cause();
|
||||
|
||||
@@ -20,7 +20,6 @@ use chrono::{DateTime, Utc};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use handle::ShardTimelineId;
|
||||
use offload::OffloadError;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::{
|
||||
@@ -4476,23 +4475,11 @@ impl Drop for Timeline {
|
||||
pub(crate) enum CompactionError {
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
ShuttingDown,
|
||||
/// Compaction tried to offload a timeline and failed
|
||||
#[error("Failed to offload timeline: {0}")]
|
||||
Offload(OffloadError),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<OffloadError> for CompactionError {
|
||||
fn from(e: OffloadError) -> Self {
|
||||
match e {
|
||||
OffloadError::Cancelled => Self::ShuttingDown,
|
||||
_ => Self::Offload(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
matches!(self, CompactionError::ShuttingDown)
|
||||
|
||||
@@ -18,7 +18,6 @@ use crate::{
|
||||
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant,
|
||||
TimelineOrOffloaded,
|
||||
},
|
||||
virtual_file::MaybeFatalIo,
|
||||
};
|
||||
|
||||
use super::{Timeline, TimelineResources};
|
||||
@@ -63,10 +62,10 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline: &Timeline,
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
// Always ensure the lock order is compaction -> gc.
|
||||
let compaction_lock = timeline.compaction_lock.lock();
|
||||
let _compaction_lock = crate::timed(
|
||||
let compaction_lock = crate::timed(
|
||||
compaction_lock,
|
||||
"acquires compaction lock",
|
||||
std::time::Duration::from_secs(5),
|
||||
@@ -74,7 +73,7 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
.await;
|
||||
|
||||
let gc_lock = timeline.gc_lock.lock();
|
||||
let _gc_lock = crate::timed(
|
||||
let gc_lock = crate::timed(
|
||||
gc_lock,
|
||||
"acquires gc lock",
|
||||
std::time::Duration::from_secs(5),
|
||||
@@ -86,15 +85,24 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
|
||||
let local_timeline_directory = conf.timeline_path(&tenant_shard_id, &timeline.timeline_id);
|
||||
|
||||
fail::fail_point!("timeline-delete-before-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
|
||||
});
|
||||
|
||||
// NB: This need not be atomic because the deleted flag in the IndexPart
|
||||
// will be observed during tenant/timeline load. The deletion will be resumed there.
|
||||
//
|
||||
// ErrorKind::NotFound can happen e.g. if we race with tenant detach, because,
|
||||
// Note that here we do not bail out on std::io::ErrorKind::NotFound.
|
||||
// This can happen if we're called a second time, e.g.,
|
||||
// because of a previous failure/cancellation at/after
|
||||
// failpoint timeline-delete-after-rm.
|
||||
//
|
||||
// ErrorKind::NotFound can also happen if we race with tenant detach, because,
|
||||
// no locks are shared.
|
||||
tokio::fs::remove_dir_all(local_timeline_directory)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("removing timeline directory");
|
||||
.context("remove local timeline directory")?;
|
||||
|
||||
// Make sure previous deletions are ordered before mark removal.
|
||||
// Otherwise there is no guarantee that they reach the disk before mark deletion.
|
||||
@@ -105,9 +113,17 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
let timeline_path = conf.timelines_path(&tenant_shard_id);
|
||||
crashsafe::fsync_async(timeline_path)
|
||||
.await
|
||||
.fatal_err("fsync after removing timeline directory");
|
||||
.context("fsync_pre_mark_remove")?;
|
||||
|
||||
info!("finished deleting layer files, releasing locks");
|
||||
drop(gc_lock);
|
||||
drop(compaction_lock);
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes remote layers and an index file after them.
|
||||
@@ -198,8 +214,7 @@ impl DeleteTimelineFlow {
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
super::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let allow_offloaded_children = false;
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id, allow_offloaded_children)?;
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id)?;
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
@@ -325,7 +340,6 @@ impl DeleteTimelineFlow {
|
||||
pub(super) fn prepare(
|
||||
tenant: &Tenant,
|
||||
timeline_id: TimelineId,
|
||||
allow_offloaded_children: bool,
|
||||
) -> Result<(TimelineOrOffloaded, DeletionGuard), DeleteTimelineError> {
|
||||
// Note the interaction between this guard and deletion guard.
|
||||
// Here we attempt to lock deletion guard when we're holding a lock on timelines.
|
||||
@@ -338,27 +352,30 @@ impl DeleteTimelineFlow {
|
||||
// T1: acquire deletion lock, do another `DeleteTimelineFlow::run`
|
||||
// For more context see this discussion: `https://github.com/neondatabase/neon/pull/4552#discussion_r1253437346`
|
||||
let timelines = tenant.timelines.lock().unwrap();
|
||||
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
|
||||
|
||||
let timeline = match timelines.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Timeline(Arc::clone(t)),
|
||||
None => match timelines_offloaded.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
},
|
||||
None => {
|
||||
let offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
match offloaded_timelines.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure that there are no child timelines, because we are about to remove files,
|
||||
// which will break child branches
|
||||
let mut children = Vec::new();
|
||||
if !allow_offloaded_children {
|
||||
children.extend(timelines_offloaded.iter().filter_map(|(id, entry)| {
|
||||
(entry.ancestor_timeline_id == Some(timeline_id)).then_some(*id)
|
||||
}));
|
||||
}
|
||||
children.extend(timelines.iter().filter_map(|(id, entry)| {
|
||||
(entry.get_ancestor_timeline_id() == Some(timeline_id)).then_some(*id)
|
||||
}));
|
||||
// Ensure that there are no child timelines **attached to that pageserver**,
|
||||
// because detach removes files, which will break child branches
|
||||
let children: Vec<TimelineId> = timelines
|
||||
.iter()
|
||||
.filter_map(|(id, entry)| {
|
||||
if entry.get_ancestor_timeline_id() == Some(timeline_id) {
|
||||
Some(*id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !children.is_empty() {
|
||||
return Err(DeleteTimelineError::HasChildren(children));
|
||||
@@ -424,20 +441,12 @@ impl DeleteTimelineFlow {
|
||||
timeline: &TimelineOrOffloaded,
|
||||
remote_client: Arc<RemoteTimelineClient>,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
fail::fail_point!("timeline-delete-before-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
|
||||
});
|
||||
|
||||
// Offloaded timelines have no local state
|
||||
// TODO: once we persist offloaded information, delete the timeline from there, too
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
});
|
||||
|
||||
delete_remote_layers_and_index(&remote_client).await?;
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
@@ -3,40 +3,16 @@ use std::sync::Arc;
|
||||
use super::delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard};
|
||||
use super::Timeline;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::{OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum OffloadError {
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
#[error("Timeline is not archived")]
|
||||
NotArchived,
|
||||
#[error(transparent)]
|
||||
RemoteStorage(anyhow::Error),
|
||||
#[error("Unexpected offload error: {0}")]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<TenantManifestError> for OffloadError {
|
||||
fn from(e: TenantManifestError) -> Self {
|
||||
match e {
|
||||
TenantManifestError::Cancelled => Self::Cancelled,
|
||||
TenantManifestError::RemoteStorage(e) => Self::RemoteStorage(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::tenant::{OffloadedTimeline, Tenant, TimelineOrOffloaded};
|
||||
|
||||
pub(crate) async fn offload_timeline(
|
||||
tenant: &Tenant,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> Result<(), OffloadError> {
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
tracing::info!("offloading archived timeline");
|
||||
|
||||
let allow_offloaded_children = true;
|
||||
let (timeline, guard) =
|
||||
DeleteTimelineFlow::prepare(tenant, timeline.timeline_id, allow_offloaded_children)
|
||||
.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
|
||||
let (timeline, guard) = DeleteTimelineFlow::prepare(tenant, timeline.timeline_id)?;
|
||||
|
||||
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
|
||||
tracing::error!("timeline already offloaded, but given timeline object");
|
||||
@@ -48,15 +24,14 @@ pub(crate) async fn offload_timeline(
|
||||
Some(true) => (),
|
||||
Some(false) => {
|
||||
tracing::warn!(?is_archived, "tried offloading a non-archived timeline");
|
||||
return Err(OffloadError::NotArchived);
|
||||
anyhow::bail!("timeline isn't archived");
|
||||
}
|
||||
None => {
|
||||
// This is legal: calls to this function can race with the timeline shutting down
|
||||
tracing::info!(
|
||||
tracing::warn!(
|
||||
?is_archived,
|
||||
"tried offloading a timeline whose remote storage is not initialized"
|
||||
"tried offloading a timeline where manifest is not yet available"
|
||||
);
|
||||
return Err(OffloadError::Cancelled);
|
||||
anyhow::bail!("timeline manifest hasn't been loaded yet");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,9 +42,9 @@ pub(crate) async fn offload_timeline(
|
||||
// to make deletions possible while offloading is in progress
|
||||
|
||||
let conf = &tenant.conf;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await?;
|
||||
|
||||
remove_timeline_from_tenant(tenant, &timeline, &guard);
|
||||
remove_timeline_from_tenant(tenant, &timeline, &guard).await?;
|
||||
|
||||
{
|
||||
let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
@@ -88,18 +63,21 @@ pub(crate) async fn offload_timeline(
|
||||
// at the next restart attach it again.
|
||||
// For that to happen, we'd need to make the manifest reflect our *intended* state,
|
||||
// not our actual state of offloaded timelines.
|
||||
tenant.store_tenant_manifest().await?;
|
||||
tenant
|
||||
.store_tenant_manifest()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It is important that this gets called when DeletionGuard is being held.
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
fn remove_timeline_from_tenant(
|
||||
async fn remove_timeline_from_tenant(
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
_: &DeletionGuard, // using it as a witness
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let children_exist = timelines
|
||||
@@ -115,4 +93,8 @@ fn remove_timeline_from_tenant(
|
||||
timelines
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
|
||||
|
||||
drop(timelines);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -141,9 +141,7 @@ impl Drop for UninitializedTimeline<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, create_guard)) = self.raw_timeline.take() {
|
||||
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
|
||||
// This is unusual, but can happen harmlessly if the pageserver is stopped while
|
||||
// creating a timeline.
|
||||
info!("Timeline got dropped without initializing, cleaning its files");
|
||||
error!("Timeline got dropped without initializing, cleaning its files");
|
||||
cleanup_timeline_directory(create_guard);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ use tokio::{select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
@@ -36,6 +35,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord};
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
@@ -339,15 +339,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
modification.tline.get_shard_identity(),
|
||||
lsn,
|
||||
modification.tline.pg_version,
|
||||
)?;
|
||||
// Deserialize WAL record
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
decode_wal_record(recdata, &mut decoded, modification.tline.pg_version)?;
|
||||
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
if decoded.is_dbase_create_copy(timeline.pg_version)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
|
||||
@@ -364,7 +360,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.ingest_record(decoded, lsn, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {lsn}"))?;
|
||||
if !ingested {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -67,10 +67,7 @@ pub(crate) fn apply_in_neon(
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
|
||||
map[map_byte as usize] &= !(flags << map_offset);
|
||||
// The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
|
||||
if !postgres_ffi::page_is_new(page) {
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
|
||||
// Repeat for 'old_heap_blkno', if any
|
||||
@@ -84,10 +81,7 @@ pub(crate) fn apply_in_neon(
|
||||
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
|
||||
|
||||
map[map_byte as usize] &= !(flags << map_offset);
|
||||
// The page should never be empty, but we're checking it anyway as a precaution, so that if it is empty for some reason anyway, we don't make matters worse by setting the LSN on it.
|
||||
if !postgres_ffi::page_is_new(page) {
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
postgres_ffi::page_set_lsn(page, lsn);
|
||||
}
|
||||
}
|
||||
// Non-relational WAL records are handled here, with custom code that has the
|
||||
|
||||
@@ -294,11 +294,6 @@ pub(crate) fn poll_http2_client(
|
||||
conn_id,
|
||||
aux: aux.clone(),
|
||||
});
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.inc();
|
||||
|
||||
Arc::downgrade(&pool)
|
||||
}
|
||||
@@ -311,7 +306,7 @@ pub(crate) fn poll_http2_client(
|
||||
let res = connection.await;
|
||||
match res {
|
||||
Ok(()) => info!("connection closed"),
|
||||
Err(e) => error!(%session_id, "connection error: {e:?}"),
|
||||
Err(e) => error!(%session_id, "connection error: {}", e),
|
||||
}
|
||||
|
||||
// remove from connection pool
|
||||
|
||||
@@ -67,7 +67,7 @@ exclude = [
|
||||
check_untyped_defs = true
|
||||
# Help mypy find imports when running against list of individual files.
|
||||
# Without this line it would behave differently when executed on the entire project.
|
||||
mypy_path = "$MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/test_runner:$MYPY_CONFIG_FILE_DIR/test_runner/stubs"
|
||||
mypy_path = "$MYPY_CONFIG_FILE_DIR:$MYPY_CONFIG_FILE_DIR/test_runner"
|
||||
|
||||
disallow_incomplete_defs = false
|
||||
disallow_untyped_calls = false
|
||||
|
||||
@@ -46,16 +46,10 @@ pub struct SafekeeperPostgresHandler {
|
||||
/// Parsed Postgres command.
|
||||
enum SafekeeperPostgresCommand {
|
||||
StartWalPush,
|
||||
StartReplication {
|
||||
start_lsn: Lsn,
|
||||
term: Option<Term>,
|
||||
interpret_wal: bool,
|
||||
},
|
||||
StartReplication { start_lsn: Lsn, term: Option<Term> },
|
||||
IdentifySystem,
|
||||
TimelineStatus,
|
||||
JSONCtrl {
|
||||
cmd: AppendLogicalMessage,
|
||||
},
|
||||
JSONCtrl { cmd: AppendLogicalMessage },
|
||||
}
|
||||
|
||||
fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
@@ -64,7 +58,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
} else if cmd.starts_with("START_REPLICATION") {
|
||||
let re = Regex::new(
|
||||
// We follow postgres START_REPLICATION LOGICAL options to pass term.
|
||||
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?( interpret_wal)",
|
||||
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
|
||||
)
|
||||
.unwrap();
|
||||
let caps = re
|
||||
@@ -77,12 +71,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let interpret_wal = caps.get(3).is_some();
|
||||
Ok(SafekeeperPostgresCommand::StartReplication {
|
||||
start_lsn,
|
||||
term,
|
||||
interpret_wal,
|
||||
})
|
||||
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
|
||||
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
|
||||
Ok(SafekeeperPostgresCommand::IdentifySystem)
|
||||
} else if cmd.starts_with("TIMELINE_STATUS") {
|
||||
@@ -241,12 +230,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
.instrument(info_span!("WAL receiver"))
|
||||
.await
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication {
|
||||
start_lsn,
|
||||
term,
|
||||
interpret_wal,
|
||||
} => {
|
||||
self.handle_start_replication(pgb, start_lsn, term, interpret_wal)
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
|
||||
self.handle_start_replication(pgb, start_lsn, term)
|
||||
.instrument(info_span!("WAL sender"))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -26,11 +26,10 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{Duration, Instant, MissedTickBehavior};
|
||||
use tokio::time::{Duration, MissedTickBehavior};
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -385,29 +384,9 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
mut next_msg: ProposerAcceptorMessage,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
/// Threshold for logging slow WalAcceptor sends.
|
||||
const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
let started = Instant::now();
|
||||
match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
|
||||
Ok(()) => {}
|
||||
// Slow send, log a message and keep trying. Log context has timeline ID.
|
||||
Err(SendTimeoutError::Timeout(next_msg)) => {
|
||||
warn!(
|
||||
"slow WalAcceptor send blocked for {:.3}s",
|
||||
Instant::now().duration_since(started).as_secs_f64()
|
||||
);
|
||||
if msg_tx.send(next_msg).await.is_err() {
|
||||
return Ok(()); // WalAcceptor terminated
|
||||
}
|
||||
warn!(
|
||||
"slow WalAcceptor send completed after {:.3}s",
|
||||
Instant::now().duration_since(started).as_secs_f64()
|
||||
)
|
||||
}
|
||||
// WalAcceptor terminated.
|
||||
Err(SendTimeoutError::Closed(_)) => return Ok(()),
|
||||
if msg_tx.send(next_msg).await.is_err() {
|
||||
return Ok(()); // chan closed, WalAcceptor terminated
|
||||
}
|
||||
next_msg = read_message(pgb_reader).await?;
|
||||
}
|
||||
|
||||
@@ -380,21 +380,17 @@ impl SafekeeperPostgresHandler {
|
||||
/// Wrapper around handle_start_replication_guts handling result. Error is
|
||||
/// handled here while we're still in walsender ttid span; with API
|
||||
/// extension, this can probably be moved into postgres_backend.
|
||||
///
|
||||
/// If interpret_wal is true, change the protocol to send custom Neon InterpretedWalRecord
|
||||
/// instead of XLogData, for ingestion by Pageservers.
|
||||
pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
start_pos: Lsn,
|
||||
term: Option<Term>,
|
||||
interpret_wal: bool,
|
||||
) -> Result<(), QueryError> {
|
||||
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
|
||||
let residence_guard = tli.wal_residence_guard().await?;
|
||||
|
||||
if let Err(end) = self
|
||||
.handle_start_replication_guts(pgb, start_pos, term, interpret_wal, residence_guard)
|
||||
.handle_start_replication_guts(pgb, start_pos, term, residence_guard)
|
||||
.await
|
||||
{
|
||||
let info = tli.get_safekeeper_info(&self.conf).await;
|
||||
@@ -411,7 +407,6 @@ impl SafekeeperPostgresHandler {
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
start_pos: Lsn,
|
||||
term: Option<Term>,
|
||||
interpret_wal: bool,
|
||||
tli: WalResidentTimeline,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let appname = self.appname.clone();
|
||||
@@ -469,7 +464,6 @@ impl SafekeeperPostgresHandler {
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
interpret_wal,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
@@ -549,8 +543,6 @@ struct WalSender<'a, IO> {
|
||||
/// in. Streaming is stopped if local term changes to a different (higher)
|
||||
/// value.
|
||||
term: Option<Term>,
|
||||
/// If true, decode and filter WAL records and send InterpretedWalRecord instead of XLogRecord.
|
||||
interpret_wal: bool,
|
||||
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
|
||||
end_watch: EndWatch,
|
||||
ws_guard: Arc<WalSenderGuard>,
|
||||
@@ -579,49 +571,45 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
let (msg, send_size) = if self.interpret_wal {
|
||||
(BeMessage::NeonInterpretedWalRecord(&[]), 0) // TODO
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= self.end_pos {
|
||||
chunk_end_pos = self.end_pos;
|
||||
} else {
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= self.end_pos {
|
||||
chunk_end_pos = self.end_pos;
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
|
||||
let send_buf = &mut self.send_buf[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = self.term {
|
||||
Some(self.tli.acquire_term(t).await?)
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
|
||||
let send_buf = &mut self.send_buf[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = self.term {
|
||||
Some(self.tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = self.wal_reader.read(send_buf).await?
|
||||
None
|
||||
};
|
||||
let send_buf = &send_buf[..send_size];
|
||||
let msg = BeMessage::XLogData(XLogDataBody {
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = self.wal_reader.read(send_buf).await?
|
||||
};
|
||||
let send_buf = &send_buf[..send_size];
|
||||
|
||||
// and send it
|
||||
self.pgb
|
||||
.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
wal_start: self.start_pos.0,
|
||||
wal_end: self.end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
data: send_buf,
|
||||
});
|
||||
(msg, send_size)
|
||||
};
|
||||
|
||||
// and send it
|
||||
self.pgb.write_message(&msg).await?;
|
||||
}))
|
||||
.await?;
|
||||
|
||||
if let Some(appname) = &self.appname {
|
||||
if appname == "replica" {
|
||||
|
||||
@@ -658,7 +658,7 @@ async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiE
|
||||
}
|
||||
|
||||
async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -737,7 +737,7 @@ async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, Api
|
||||
}
|
||||
|
||||
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -786,7 +786,7 @@ async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
}
|
||||
|
||||
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -804,7 +804,7 @@ async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
}
|
||||
|
||||
async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -822,7 +822,7 @@ async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>,
|
||||
}
|
||||
|
||||
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -840,7 +840,7 @@ async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError
|
||||
}
|
||||
|
||||
async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Infra)?;
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -48,16 +47,6 @@ impl ChaosInjector {
|
||||
.get_mut(victim)
|
||||
.expect("Held lock between choosing ID and this get");
|
||||
|
||||
if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
|
||||
// Skip non-active scheduling policies, so that a shard with a policy like Pause can
|
||||
// be pinned without being disrupted by us.
|
||||
tracing::info!(
|
||||
"Skipping shard {victim}: scheduling policy is {:?}",
|
||||
shard.get_scheduling_policy()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Pick a secondary to promote
|
||||
let Some(new_location) = shard
|
||||
.intent
|
||||
@@ -74,8 +63,6 @@ impl ChaosInjector {
|
||||
continue;
|
||||
};
|
||||
|
||||
tracing::info!("Injecting chaos: migrate {victim} {old_location}->{new_location}");
|
||||
|
||||
shard.intent.demote_attached(scheduler, old_location);
|
||||
shard.intent.promote_attached(scheduler, new_location);
|
||||
self.service.maybe_reconcile_shard(shard, nodes);
|
||||
|
||||
@@ -1397,7 +1397,7 @@ def neon_simple_env(
|
||||
pageserver_virtual_file_io_mode: Optional[str],
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with 1 safekeeper and 1 pageserver. No authentication, no fsync.
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
@@ -4701,7 +4701,6 @@ def tenant_get_shards(
|
||||
|
||||
If the caller provides `pageserver_id`, it will be used for all shards, even
|
||||
if the shard is indicated by storage controller to be on some other pageserver.
|
||||
If the storage controller is not running, assume an unsharded tenant.
|
||||
|
||||
Caller should over the response to apply their per-pageserver action to
|
||||
each shard
|
||||
@@ -4711,17 +4710,17 @@ def tenant_get_shards(
|
||||
else:
|
||||
override_pageserver = None
|
||||
|
||||
if not env.storage_controller.running and override_pageserver is not None:
|
||||
log.warning(f"storage controller not running, assuming unsharded tenant {tenant_id}")
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver)]
|
||||
|
||||
return [
|
||||
(
|
||||
TenantShardId.parse(s["shard_id"]),
|
||||
override_pageserver or env.get_pageserver(s["node_id"]),
|
||||
)
|
||||
for s in env.storage_controller.locate(tenant_id)
|
||||
]
|
||||
if len(env.pageservers) > 1:
|
||||
return [
|
||||
(
|
||||
TenantShardId.parse(s["shard_id"]),
|
||||
override_pageserver or env.get_pageserver(s["node_id"]),
|
||||
)
|
||||
for s in env.storage_controller.locate(tenant_id)
|
||||
]
|
||||
else:
|
||||
# Assume an unsharded tenant
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.common_types import Lsn, TenantShardId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
tenant_get_shards,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
def test_sharded_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
shard_count: int,
|
||||
):
|
||||
"""
|
||||
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
|
||||
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
|
||||
(shard_count=1) to the sharded case indicates the overhead of sharding.
|
||||
"""
|
||||
|
||||
ROW_COUNT = 100_000_000 # about 7 GB of WAL
|
||||
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure
|
||||
# the storage controller doesn't mess with shard placements.
|
||||
#
|
||||
# TODO: there should be a way to disable storage controller background reconciliations.
|
||||
# Currently, disabling reconciliation also disables foreground operations.
|
||||
tenant_id, timeline_id = env.create_tenant(shard_count=shard_count)
|
||||
|
||||
for shard_number in range(0, shard_count):
|
||||
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
pageserver_id = shard_number + 1
|
||||
env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_id)
|
||||
|
||||
shards = tenant_get_shards(env, tenant_id)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
|
||||
# Start the endpoint.
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
# Ingest data and measure WAL volume and duration.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
log.info("Ingesting data")
|
||||
cur.execute("set statement_timeout = 0")
|
||||
cur.execute("create table huge (i int, j int)")
|
||||
|
||||
with zenbenchmark.record_duration("pageserver_ingest"):
|
||||
with zenbenchmark.record_duration("wal_ingest"):
|
||||
cur.execute(f"insert into huge values (generate_series(1, {ROW_COUNT}), 0)")
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
@@ -103,6 +103,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -144,6 +145,7 @@ def test_timeline_init_break_before_checkpoint_recreate(
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*Failed to load index_part from remote storage, failed creation?.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -91,6 +91,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
[
|
||||
".*Failed to import basebackup.*",
|
||||
".*unexpected non-zero bytes after the tar archive.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*InternalServerError.*timeline not found.*",
|
||||
".*InternalServerError.*Tenant .* not found.*",
|
||||
".*InternalServerError.*Timeline .* not found.*",
|
||||
|
||||
@@ -146,6 +146,8 @@ def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonE
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# happens with the cancellation bailing flushing loop earlier, leaving disk_consistent_lsn at zero
|
||||
".*Timeline got dropped without initializing, cleaning its files",
|
||||
# the response hit_pausable_failpoint_and_later_fail
|
||||
f".*Error processing HTTP request: InternalServerError\\(new timeline {env.initial_tenant}/{env.initial_timeline} has invalid disk_consistent_lsn",
|
||||
]
|
||||
|
||||
@@ -213,13 +213,6 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
|
||||
wait_until(30, 1, leaf_offloaded)
|
||||
wait_until(30, 1, parent_offloaded)
|
||||
|
||||
# Offloaded child timelines should still prevent deletion
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f".* timeline which has child timelines: \\[{leaf_timeline_id}\\]",
|
||||
):
|
||||
ps_http.timeline_delete(tenant_id, parent_timeline_id)
|
||||
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id,
|
||||
grandparent_timeline_id,
|
||||
|
||||
Reference in New Issue
Block a user