diff --git a/Cargo.lock b/Cargo.lock index 64231ed11c..f92da5ec51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4009,7 +4009,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "bytes", "fallible-iterator", @@ -4022,7 +4022,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "base64 0.20.0", "byteorder", @@ -4041,7 +4041,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "bytes", "fallible-iterator", @@ -5663,9 +5663,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smol_str" @@ -6074,9 +6074,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-ctl" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c" +checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b" dependencies = [ "libc", "paste", @@ -6085,9 +6085,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-sys" -version = "0.5.4+5.3.0-patched" +version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1" +checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" dependencies = [ "cc", "libc", @@ -6095,9 +6095,9 @@ dependencies = [ [[package]] name = "tikv-jemallocator" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca" +checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" dependencies = [ "libc", "tikv-jemalloc-sys", @@ -6227,7 +6227,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 8207726caa..dbda930535 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -168,8 +168,8 @@ sync_wrapper = "0.1.2" tar = "0.4" test-context = "0.3" thiserror = "1.0" -tikv-jemallocator = "0.5" -tikv-jemalloc-ctl = "0.5" +tikv-jemallocator = { version = "0.6", features = ["stats"] } +tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] } tokio = { version = "1.17", features = ["macros"] } tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } tokio-io-timeout = "1.2.0" @@ -203,21 +203,10 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed - -# We want to use the 'neon' branch for these, but there's currently one -# incompatible change on the branch. See: -# -# - PR #8076 which contained changes that depended on the new changes in -# the rust-postgres crate, and -# - PR #8654 which reverted those changes and made the code in proxy incompatible -# with the tip of the 'neon' branch again. -# -# When those proxy changes are re-applied (see PR #8747), we can switch using -# the tip of the 'neon' branch again. -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } ## Local libraries compute_api = { version = "0.1", path = "./libs/compute_api/" } @@ -255,7 +244,7 @@ tonic-build = "0.12" [patch.crates-io] # Needed to get `tokio-postgres-rustls` to depend on our fork. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } ################# Binary contents sections diff --git a/compute/etc/neon_collector.jsonnet b/compute/etc/neon_collector.jsonnet index c6fa645b41..75d69c7b68 100644 --- a/compute/etc/neon_collector.jsonnet +++ b/compute/etc/neon_collector.jsonnet @@ -6,6 +6,7 @@ import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet', import 'sql_exporter/compute_current_lsn.libsonnet', import 'sql_exporter/compute_logical_snapshot_files.libsonnet', + import 'sql_exporter/compute_max_connections.libsonnet', import 'sql_exporter/compute_receive_lsn.libsonnet', import 'sql_exporter/compute_subscriptions_count.libsonnet', import 'sql_exporter/connection_counts.libsonnet', diff --git a/compute/etc/sql_exporter/compute_max_connections.libsonnet b/compute/etc/sql_exporter/compute_max_connections.libsonnet new file mode 100644 index 0000000000..69cfa1f19c --- /dev/null +++ b/compute/etc/sql_exporter/compute_max_connections.libsonnet @@ -0,0 +1,10 @@ +{ + metric_name: 'compute_max_connections', + type: 'gauge', + help: 'Max connections allowed for Postgres', + key_labels: null, + values: [ + 'max_connections', + ], + query: importstr 'sql_exporter/compute_max_connections.sql', +} diff --git a/compute/etc/sql_exporter/compute_max_connections.sql b/compute/etc/sql_exporter/compute_max_connections.sql new file mode 100644 index 0000000000..99a49483a6 --- /dev/null +++ b/compute/etc/sql_exporter/compute_max_connections.sql @@ -0,0 +1 @@ +SELECT current_setting('max_connections') as max_connections; diff --git a/deny.toml b/deny.toml index 327ac58db7..8bf643f4ba 100644 --- a/deny.toml +++ b/deny.toml @@ -37,6 +37,7 @@ allow = [ "BSD-2-Clause", "BSD-3-Clause", "CC0-1.0", + "CDDL-1.0", "ISC", "MIT", "MPL-2.0", diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index b3fcaae62f..4505101ea6 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -24,7 +24,7 @@ pub struct Key { /// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as /// a struct of fields. -#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)] +#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] pub struct CompactKey(i128); /// The storage key size. diff --git a/libs/pageserver_api/src/record.rs b/libs/pageserver_api/src/record.rs index 5c3f3deb82..bb62b35d36 100644 --- a/libs/pageserver_api/src/record.rs +++ b/libs/pageserver_api/src/record.rs @@ -41,6 +41,11 @@ pub enum NeonWalRecord { file_path: String, content: Option, }, + // Truncate visibility map page + TruncateVisibilityMap { + trunc_byte: usize, + trunc_offs: usize, + }, /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. #[cfg(feature = "testing")] diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 010a9c2932..09d1fae221 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -24,7 +24,7 @@ use postgres_ffi::Oid; // FIXME: should move 'forknum' as last field to keep this consistent with Postgres. // Then we could replace the custom Ord and PartialOrd implementations below with // deriving them. This will require changes in walredoproc.c. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] pub struct RelTag { pub forknum: u8, pub spcnode: Oid, diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 497d011d7a..e343473d77 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -243,8 +243,11 @@ const FSM_LEAF_NODES_PER_PAGE: usize = FSM_NODES_PER_PAGE - FSM_NON_LEAF_NODES_P pub const SLOTS_PER_FSM_PAGE: u32 = FSM_LEAF_NODES_PER_PAGE as u32; /* From visibilitymap.c */ -pub const VM_HEAPBLOCKS_PER_PAGE: u32 = - (BLCKSZ as usize - SIZEOF_PAGE_HEADER_DATA) as u32 * (8 / 2); // MAPSIZE * (BITS_PER_BYTE / BITS_PER_HEAPBLOCK) + +pub const VM_MAPSIZE: usize = BLCKSZ as usize - MAXALIGN_SIZE_OF_PAGE_HEADER_DATA; +pub const VM_BITS_PER_HEAPBLOCK: usize = 2; +pub const VM_HEAPBLOCKS_PER_BYTE: usize = 8 / VM_BITS_PER_HEAPBLOCK; +pub const VM_HEAPBLOCKS_PER_PAGE: usize = VM_MAPSIZE * VM_HEAPBLOCKS_PER_BYTE; /* From origin.c */ pub const REPLICATION_STATE_MAGIC: u32 = 0x1257DADE; diff --git a/libs/postgres_ffi/src/walrecord.rs b/libs/postgres_ffi/src/walrecord.rs index dedbaef64d..b32106632a 100644 --- a/libs/postgres_ffi/src/walrecord.rs +++ b/libs/postgres_ffi/src/walrecord.rs @@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError; use utils::lsn::Lsn; #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlMultiXactCreate { pub mid: MultiXactId, /* new MultiXact's ID */ @@ -46,7 +46,7 @@ impl XlMultiXactCreate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlMultiXactTruncate { pub oldest_multi_db: Oid, /* to-be-truncated range of multixact offsets */ @@ -72,7 +72,7 @@ impl XlMultiXactTruncate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlRelmapUpdate { pub dbid: Oid, /* database ID, or 0 for shared map */ pub tsid: Oid, /* database's tablespace, or pg_global */ @@ -90,7 +90,7 @@ impl XlRelmapUpdate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlReploriginDrop { pub node_id: RepOriginId, } @@ -104,7 +104,7 @@ impl XlReploriginDrop { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlReploriginSet { pub remote_lsn: Lsn, pub node_id: RepOriginId, @@ -120,7 +120,7 @@ impl XlReploriginSet { } #[repr(C)] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct RelFileNode { pub spcnode: Oid, /* tablespace */ pub dbnode: Oid, /* database */ @@ -911,7 +911,7 @@ impl XlSmgrCreate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlSmgrTruncate { pub blkno: BlockNumber, pub rnode: RelFileNode, @@ -984,7 +984,7 @@ impl XlDropDatabase { /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same /// struct for commits and aborts. /// -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlXactParsedRecord { pub xid: TransactionId, pub info: u8, diff --git a/libs/remote_storage/src/error.rs b/libs/remote_storage/src/error.rs index 17790e9f70..ec9f868998 100644 --- a/libs/remote_storage/src/error.rs +++ b/libs/remote_storage/src/error.rs @@ -15,6 +15,9 @@ pub enum DownloadError { /// /// Concurrency control is not timed within timeout. Timeout, + /// Some integrity/consistency check failed during download. This is used during + /// timeline loads to cancel the load of a tenant if some timeline detects fatal corruption. + Fatal(String), /// The file was found in the remote storage, but the download failed. Other(anyhow::Error), } @@ -29,6 +32,7 @@ impl std::fmt::Display for DownloadError { DownloadError::Unmodified => write!(f, "File was not modified"), DownloadError::Cancelled => write!(f, "Cancelled, shutting down"), DownloadError::Timeout => write!(f, "timeout"), + DownloadError::Fatal(why) => write!(f, "Fatal read error: {why}"), DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"), } } @@ -41,7 +45,7 @@ impl DownloadError { pub fn is_permanent(&self) -> bool { use DownloadError::*; match self { - BadInput(_) | NotFound | Unmodified | Cancelled => true, + BadInput(_) | NotFound | Unmodified | Fatal(_) | Cancelled => true, Timeout | Other(_) => false, } } diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 684718d220..1895f25bfc 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -19,7 +19,7 @@ impl InterpretedWalRecord { pub fn from_bytes_filtered( buf: Bytes, shard: &ShardIdentity, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { let mut decoded = DecodedWALRecord::default(); @@ -32,18 +32,18 @@ impl InterpretedWalRecord { FlushUncommittedRecords::No }; - let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?; + let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?; let batch = SerializedValueBatch::from_decoded_filtered( decoded, shard, - record_end_lsn, + next_record_lsn, pg_version, )?; Ok(InterpretedWalRecord { metadata_record, batch, - end_lsn: record_end_lsn, + next_record_lsn, flush_uncommitted, xid, }) @@ -53,7 +53,7 @@ impl InterpretedWalRecord { impl MetadataRecord { fn from_decoded( decoded: &DecodedWALRecord, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result> { // Note: this doesn't actually copy the bytes since @@ -74,7 +74,9 @@ impl MetadataRecord { Ok(None) } pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version), - pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn), + pg_constants::RM_XACT_ID => { + Self::decode_xact_record(&mut buf, decoded, next_record_lsn) + } pg_constants::RM_MULTIXACT_ID => { Self::decode_multixact_record(&mut buf, decoded, pg_version) } @@ -86,7 +88,9 @@ impl MetadataRecord { // // Alternatively, one can make the checkpoint part of the subscription protocol // to the pageserver. This should work fine, but can be done at a later point. - pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn), + pg_constants::RM_XLOG_ID => { + Self::decode_xlog_record(&mut buf, decoded, next_record_lsn) + } pg_constants::RM_LOGICALMSG_ID => { Self::decode_logical_message_record(&mut buf, decoded) } diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 5d90eeb69c..c69f8c869a 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -32,16 +32,19 @@ use postgres_ffi::walrecord::{ XlSmgrTruncate, XlXactParsedRecord, }; use postgres_ffi::{Oid, TransactionId}; +use serde::{Deserialize, Serialize}; use utils::lsn::Lsn; use crate::serialized_batch::SerializedValueBatch; +#[derive(Serialize, Deserialize)] pub enum FlushUncommittedRecords { Yes, No, } /// An interpreted Postgres WAL record, ready to be handled by the pageserver +#[derive(Serialize, Deserialize)] pub struct InterpretedWalRecord { /// Optional metadata record - may cause writes to metadata keys /// in the storage engine @@ -49,8 +52,10 @@ pub struct InterpretedWalRecord { /// A pre-serialized batch along with the required metadata for ingestion /// by the pageserver pub batch: SerializedValueBatch, - /// Byte offset within WAL for the end of the original PG WAL record - pub end_lsn: Lsn, + /// Byte offset within WAL for the start of the next PG WAL record. + /// Usually this is the end LSN of the current record, but in case of + /// XLOG SWITCH records it will be within the next segment. + pub next_record_lsn: Lsn, /// Whether to flush all uncommitted modifications to the storage engine /// before ingesting this record. This is currently only used for legacy PG /// database creations which read pages from a template database. Such WAL @@ -62,6 +67,7 @@ pub struct InterpretedWalRecord { /// The interpreted part of the Postgres WAL record which requires metadata /// writes to the underlying storage engine. +#[derive(Serialize, Deserialize)] pub enum MetadataRecord { Heapam(HeapamRecord), Neonrmgr(NeonrmgrRecord), @@ -77,10 +83,12 @@ pub enum MetadataRecord { Replorigin(ReploriginRecord), } +#[derive(Serialize, Deserialize)] pub enum HeapamRecord { ClearVmBits(ClearVmBits), } +#[derive(Serialize, Deserialize)] pub struct ClearVmBits { pub new_heap_blkno: Option, pub old_heap_blkno: Option, @@ -88,24 +96,29 @@ pub struct ClearVmBits { pub flags: u8, } +#[derive(Serialize, Deserialize)] pub enum NeonrmgrRecord { ClearVmBits(ClearVmBits), } +#[derive(Serialize, Deserialize)] pub enum SmgrRecord { Create(SmgrCreate), Truncate(XlSmgrTruncate), } +#[derive(Serialize, Deserialize)] pub struct SmgrCreate { pub rel: RelTag, } +#[derive(Serialize, Deserialize)] pub enum DbaseRecord { Create(DbaseCreate), Drop(DbaseDrop), } +#[derive(Serialize, Deserialize)] pub struct DbaseCreate { pub db_id: Oid, pub tablespace_id: Oid, @@ -113,27 +126,32 @@ pub struct DbaseCreate { pub src_tablespace_id: Oid, } +#[derive(Serialize, Deserialize)] pub struct DbaseDrop { pub db_id: Oid, pub tablespace_ids: Vec, } +#[derive(Serialize, Deserialize)] pub enum ClogRecord { ZeroPage(ClogZeroPage), Truncate(ClogTruncate), } +#[derive(Serialize, Deserialize)] pub struct ClogZeroPage { pub segno: u32, pub rpageno: u32, } +#[derive(Serialize, Deserialize)] pub struct ClogTruncate { pub pageno: u32, pub oldest_xid: TransactionId, pub oldest_xid_db: Oid, } +#[derive(Serialize, Deserialize)] pub enum XactRecord { Commit(XactCommon), Abort(XactCommon), @@ -142,6 +160,7 @@ pub enum XactRecord { Prepare(XactPrepare), } +#[derive(Serialize, Deserialize)] pub struct XactCommon { pub parsed: XlXactParsedRecord, pub origin_id: u16, @@ -150,61 +169,73 @@ pub struct XactCommon { pub lsn: Lsn, } +#[derive(Serialize, Deserialize)] pub struct XactPrepare { pub xl_xid: TransactionId, pub data: Bytes, } +#[derive(Serialize, Deserialize)] pub enum MultiXactRecord { ZeroPage(MultiXactZeroPage), Create(XlMultiXactCreate), Truncate(XlMultiXactTruncate), } +#[derive(Serialize, Deserialize)] pub struct MultiXactZeroPage { pub slru_kind: SlruKind, pub segno: u32, pub rpageno: u32, } +#[derive(Serialize, Deserialize)] pub enum RelmapRecord { Update(RelmapUpdate), } +#[derive(Serialize, Deserialize)] pub struct RelmapUpdate { pub update: XlRelmapUpdate, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum XlogRecord { Raw(RawXlogRecord), } +#[derive(Serialize, Deserialize)] pub struct RawXlogRecord { pub info: u8, pub lsn: Lsn, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum LogicalMessageRecord { Put(PutLogicalMessage), #[cfg(feature = "testing")] Failpoint, } +#[derive(Serialize, Deserialize)] pub struct PutLogicalMessage { pub path: String, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum StandbyRecord { RunningXacts(StandbyRunningXacts), } +#[derive(Serialize, Deserialize)] pub struct StandbyRunningXacts { pub oldest_running_xid: TransactionId, } +#[derive(Serialize, Deserialize)] pub enum ReploriginRecord { Set(XlReploriginSet), Drop(XlReploriginDrop), diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 8f33291023..9c0708ebbe 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -16,6 +16,7 @@ use pageserver_api::shard::ShardIdentity; use pageserver_api::{key::CompactKey, value::Value}; use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord}; use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; +use serde::{Deserialize, Serialize}; use utils::bin_ser::BeSer; use utils::lsn::Lsn; @@ -29,6 +30,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); /// relation sizes. In the case of "observed" values, we only need to know /// the key and LSN, so two types of metadata are supported to save on network /// bandwidth. +#[derive(Serialize, Deserialize)] pub enum ValueMeta { Serialized(SerializedValueMeta), Observed(ObservedValueMeta), @@ -75,6 +77,7 @@ impl PartialEq for OrderedValueMeta { impl Eq for OrderedValueMeta {} /// Metadata for a [`Value`] serialized into the batch. +#[derive(Serialize, Deserialize)] pub struct SerializedValueMeta { pub key: CompactKey, pub lsn: Lsn, @@ -86,12 +89,14 @@ pub struct SerializedValueMeta { } /// Metadata for a [`Value`] observed by the batch +#[derive(Serialize, Deserialize)] pub struct ObservedValueMeta { pub key: CompactKey, pub lsn: Lsn, } /// Batch of serialized [`Value`]s. +#[derive(Serialize, Deserialize)] pub struct SerializedValueBatch { /// [`Value`]s serialized in EphemeralFile's native format, /// ready for disk write by the pageserver @@ -132,7 +137,7 @@ impl SerializedValueBatch { pub(crate) fn from_decoded_filtered( decoded: DecodedWALRecord, shard: &ShardIdentity, - record_end_lsn: Lsn, + next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result { // First determine how big the buffer needs to be and allocate it up-front. @@ -156,13 +161,17 @@ impl SerializedValueBatch { let key = rel_block_to_key(rel, blk.blkno); if !key.is_valid_key_on_write_path() { - anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key); + anyhow::bail!( + "Unsupported key decoded at LSN {}: {}", + next_record_lsn, + key + ); } let key_is_local = shard.is_key_local(&key); tracing::debug!( - lsn=%record_end_lsn, + lsn=%next_record_lsn, key=%key, "ingest: shard decision {}", if !key_is_local { "drop" } else { "keep" }, @@ -174,7 +183,7 @@ impl SerializedValueBatch { // its blkno in case it implicitly extends a relation. metadata.push(ValueMeta::Observed(ObservedValueMeta { key: key.to_compact(), - lsn: record_end_lsn, + lsn: next_record_lsn, })) } @@ -205,7 +214,7 @@ impl SerializedValueBatch { // that would corrupt the page. // if !page_is_new(&image) { - page_set_lsn(&mut image, record_end_lsn) + page_set_lsn(&mut image, next_record_lsn) } assert_eq!(image.len(), BLCKSZ as usize); @@ -224,12 +233,12 @@ impl SerializedValueBatch { metadata.push(ValueMeta::Serialized(SerializedValueMeta { key: key.to_compact(), - lsn: record_end_lsn, + lsn: next_record_lsn, batch_offset: relative_off, len: val_ser_size, will_init: val.will_init(), })); - max_lsn = std::cmp::max(max_lsn, record_end_lsn); + max_lsn = std::cmp::max(max_lsn, next_record_lsn); len += 1; } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index dde9c5dd0b..ab170679ba 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -324,6 +324,7 @@ impl From for ApiError { .into_boxed_str(), ), a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()), + Cancelled => ApiError::ResourceUnavailable("shutting down".into()), Other(e) => ApiError::InternalServerError(e), } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d0a96e78a6..909f99ea9d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -39,6 +39,7 @@ use remote_timeline_client::UploadQueueNotReadyError; use std::collections::BTreeMap; use std::fmt; use std::future::Future; +use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; @@ -524,6 +525,9 @@ pub struct OffloadedTimeline { /// Prevent two tasks from deleting the timeline at the same time. If held, the /// timeline is being deleted. If 'true', the timeline has already been deleted. pub delete_progress: TimelineDeleteProgress, + + /// Part of the `OffloadedTimeline` object's lifecycle: this needs to be set before we drop it + pub deleted_from_ancestor: AtomicBool, } impl OffloadedTimeline { @@ -533,9 +537,16 @@ impl OffloadedTimeline { /// the timeline is not in a stopped state. /// Panics if the timeline is not archived. fn from_timeline(timeline: &Timeline) -> Result { - let ancestor_retain_lsn = timeline - .get_ancestor_timeline_id() - .map(|_timeline_id| timeline.get_ancestor_lsn()); + let (ancestor_retain_lsn, ancestor_timeline_id) = + if let Some(ancestor_timeline) = timeline.ancestor_timeline() { + let ancestor_lsn = timeline.get_ancestor_lsn(); + let ancestor_timeline_id = ancestor_timeline.timeline_id; + let mut gc_info = ancestor_timeline.gc_info.write().unwrap(); + gc_info.insert_child(timeline.timeline_id, ancestor_lsn, MaybeOffloaded::Yes); + (Some(ancestor_lsn), Some(ancestor_timeline_id)) + } else { + (None, None) + }; let archived_at = timeline .remote_client .archived_at_stopped_queue()? @@ -543,14 +554,17 @@ impl OffloadedTimeline { Ok(Self { tenant_shard_id: timeline.tenant_shard_id, timeline_id: timeline.timeline_id, - ancestor_timeline_id: timeline.get_ancestor_timeline_id(), + ancestor_timeline_id, ancestor_retain_lsn, archived_at, delete_progress: timeline.delete_progress.clone(), + deleted_from_ancestor: AtomicBool::new(false), }) } fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self { + // We expect to reach this case in tenant loading, where the `retain_lsn` is populated in the parent's `gc_info` + // by the `initialize_gc_info` function. let OffloadedTimelineManifest { timeline_id, ancestor_timeline_id, @@ -564,6 +578,7 @@ impl OffloadedTimeline { ancestor_retain_lsn, archived_at, delete_progress: TimelineDeleteProgress::default(), + deleted_from_ancestor: AtomicBool::new(false), } } fn manifest(&self) -> OffloadedTimelineManifest { @@ -581,6 +596,33 @@ impl OffloadedTimeline { archived_at: *archived_at, } } + /// Delete this timeline's retain_lsn from its ancestor, if present in the given tenant + fn delete_from_ancestor_with_timelines( + &self, + timelines: &std::sync::MutexGuard<'_, HashMap>>, + ) { + if let (Some(_retain_lsn), Some(ancestor_timeline_id)) = + (self.ancestor_retain_lsn, self.ancestor_timeline_id) + { + if let Some((_, ancestor_timeline)) = timelines + .iter() + .find(|(tid, _tl)| **tid == ancestor_timeline_id) + { + ancestor_timeline + .gc_info + .write() + .unwrap() + .remove_child_offloaded(self.timeline_id); + } + } + self.deleted_from_ancestor.store(true, Ordering::Release); + } + /// Call [`Self::delete_from_ancestor_with_timelines`] instead if possible. + /// + /// As the entire tenant is being dropped, don't bother deregistering the `retain_lsn` from the ancestor. + fn defuse_for_tenant_drop(&self) { + self.deleted_from_ancestor.store(true, Ordering::Release); + } } impl fmt::Debug for OffloadedTimeline { @@ -589,6 +631,17 @@ impl fmt::Debug for OffloadedTimeline { } } +impl Drop for OffloadedTimeline { + fn drop(&mut self) { + if !self.deleted_from_ancestor.load(Ordering::Acquire) { + tracing::warn!( + "offloaded timeline {} was dropped without having cleaned it up at the ancestor", + self.timeline_id + ); + } + } +} + #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] pub enum MaybeOffloaded { Yes, @@ -700,6 +753,9 @@ pub enum DeleteTimelineError { #[error("Timeline deletion is already in progress")] AlreadyInProgress(Arc>), + #[error("Cancelled")] + Cancelled, + #[error(transparent)] Other(#[from] anyhow::Error), } @@ -710,6 +766,7 @@ impl Debug for DeleteTimelineError { Self::NotFound => write!(f, "NotFound"), Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(), Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(), + Self::Cancelled => f.debug_tuple("Cancelled").finish(), Self::Other(e) => f.debug_tuple("Other").field(e).finish(), } } @@ -1433,6 +1490,12 @@ impl Tenant { info!(%timeline_id, "index_part not found on remote"); continue; } + Err(DownloadError::Fatal(why)) => { + // If, while loading one remote timeline, we saw an indication that our generation + // number is likely invalid, then we should not load the whole tenant. + error!(%timeline_id, "Fatal error loading timeline: {why}"); + anyhow::bail!(why.to_string()); + } Err(e) => { // Some (possibly ephemeral) error happened during index_part download. // Pretend the timeline exists to not delete the timeline directory, @@ -1521,7 +1584,7 @@ impl Tenant { } // Complete deletions for offloaded timeline id's. offloaded_timelines_list - .retain(|(offloaded_id, _offloaded)| { + .retain(|(offloaded_id, offloaded)| { // At this point, offloaded_timeline_ids has the list of all offloaded timelines // without a prefix in S3, so they are inexistent. // In the end, existence of a timeline is finally determined by the existence of an index-part.json in remote storage. @@ -1529,6 +1592,7 @@ impl Tenant { let delete = offloaded_timeline_ids.contains(offloaded_id); if delete { tracing::info!("Removing offloaded timeline {offloaded_id} from manifest as no remote prefix was found"); + offloaded.defuse_for_tenant_drop(); } !delete }); @@ -1917,9 +1981,15 @@ impl Tenant { ))); }; let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap(); - if offloaded_timelines.remove(&timeline_id).is_none() { - warn!("timeline already removed from offloaded timelines"); + match offloaded_timelines.remove(&timeline_id) { + Some(offloaded) => { + offloaded.delete_from_ancestor_with_timelines(&timelines); + } + None => warn!("timeline already removed from offloaded timelines"), } + + self.initialize_gc_info(&timelines, &offloaded_timelines, Some(timeline_id)); + Arc::clone(timeline) }; @@ -2657,7 +2727,7 @@ impl Tenant { .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping())); // Before activation, populate each Timeline's GcInfo with information about its children - self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor); + self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None); // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. @@ -2772,8 +2842,14 @@ impl Tenant { let timeline_id = timeline.timeline_id; let span = tracing::info_span!("timeline_shutdown", %timeline_id, ?shutdown_mode); js.spawn(async move { timeline.shutdown(shutdown_mode).instrument(span).await }); - }) - }; + }); + } + { + let timelines_offloaded = self.timelines_offloaded.lock().unwrap(); + timelines_offloaded.values().for_each(|timeline| { + timeline.defuse_for_tenant_drop(); + }); + } // test_long_timeline_create_then_tenant_delete is leaning on this message tracing::info!("Waiting for timelines..."); while let Some(res) = js.join_next().await { @@ -3757,10 +3833,13 @@ impl Tenant { &self, timelines: &std::sync::MutexGuard>>, timelines_offloaded: &std::sync::MutexGuard>>, + restrict_to_timeline: Option, ) { - // This function must be called before activation: after activation timeline create/delete operations - // might happen, and this function is not safe to run concurrently with those. - assert!(!self.is_active()); + if restrict_to_timeline.is_none() { + // This function must be called before activation: after activation timeline create/delete operations + // might happen, and this function is not safe to run concurrently with those. + assert!(!self.is_active()); + } // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. @@ -3793,7 +3872,12 @@ impl Tenant { let horizon = self.get_gc_horizon(); // Populate each timeline's GcInfo with information about its child branches - for timeline in timelines.values() { + let timelines_to_write = if let Some(timeline_id) = restrict_to_timeline { + itertools::Either::Left(timelines.get(&timeline_id).into_iter()) + } else { + itertools::Either::Right(timelines.values()) + }; + for timeline in timelines_to_write { let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints .remove(&timeline.timeline_id) .unwrap_or_default(); @@ -9640,4 +9724,54 @@ mod tests { Ok(()) } + + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_timeline_offload_retain_lsn() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_timeline_offload_retain_lsn") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + let tline_parent = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + let tline_child = tenant + .branch_timeline_test(&tline_parent, NEW_TIMELINE_ID, Some(Lsn(0x20)), &ctx) + .await + .unwrap(); + { + let gc_info_parent = tline_parent.gc_info.read().unwrap(); + assert_eq!( + gc_info_parent.retain_lsns, + vec![(Lsn(0x20), tline_child.timeline_id, MaybeOffloaded::No)] + ); + } + // We have to directly call the remote_client instead of using the archive function to avoid constructing broker client... + tline_child + .remote_client + .schedule_index_upload_for_timeline_archival_state(TimelineArchivalState::Archived) + .unwrap(); + tline_child.remote_client.wait_completion().await.unwrap(); + offload_timeline(&tenant, &tline_child) + .instrument(tracing::info_span!(parent: None, "offload_test", tenant_id=%"test", shard_id=%"test", timeline_id=%"test")) + .await.unwrap(); + let child_timeline_id = tline_child.timeline_id; + Arc::try_unwrap(tline_child).unwrap(); + + { + let gc_info_parent = tline_parent.gc_info.read().unwrap(); + assert_eq!( + gc_info_parent.retain_lsns, + vec![(Lsn(0x20), child_timeline_id, MaybeOffloaded::Yes)] + ); + } + + tenant + .get_offloaded_timeline(child_timeline_id) + .unwrap() + .defuse_for_tenant_drop(); + + Ok(()) + } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index b37c16e133..94f42c7827 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -243,7 +243,7 @@ use self::index::IndexPart; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; -use super::Generation; +use super::{DeleteTimelineError, Generation}; pub(crate) use download::{ download_index_part, download_tenant_manifest, is_temp_download_file, @@ -574,12 +574,18 @@ impl RemoteTimelineClient { if latest_index_generation > index_generation { // Unexpected! Why are we loading such an old index if a more recent one exists? - tracing::warn!( + // We will refuse to proceed, as there is no reasonable scenario where this should happen, but + // there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation + // backwards). + tracing::error!( ?index_generation, ?latest_index_generation, ?latest_index_mtime, "Found a newer index while loading an old one" ); + return Err(DownloadError::Fatal( + "Index age exceeds threshold and a newer index exists".into(), + )); } } @@ -1544,15 +1550,17 @@ impl RemoteTimelineClient { /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set. /// The function deletes layer files one by one, then lists the prefix to see if we leaked something /// deletes leaked files if any and proceeds with deletion of index file at the end. - pub(crate) async fn delete_all(self: &Arc) -> anyhow::Result<()> { + pub(crate) async fn delete_all(self: &Arc) -> Result<(), DeleteTimelineError> { debug_assert_current_span_has_tenant_and_timeline_id(); let layers: Vec = { let mut locked = self.upload_queue.lock().unwrap(); - let stopped = locked.stopped_mut()?; + let stopped = locked.stopped_mut().map_err(DeleteTimelineError::Other)?; if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) { - anyhow::bail!("deleted_at is not set") + return Err(DeleteTimelineError::Other(anyhow::anyhow!( + "deleted_at is not set" + ))); } debug_assert!(stopped.upload_queue_for_deletion.no_pending_work()); @@ -1587,7 +1595,10 @@ impl RemoteTimelineClient { }; let layer_deletion_count = layers.len(); - self.deletion_queue_client.push_immediate(layers).await?; + self.deletion_queue_client + .push_immediate(layers) + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Delete the initdb.tar.zst, which is not always present, but deletion attempts of // inexistant objects are not considered errors. @@ -1595,7 +1606,8 @@ impl RemoteTimelineClient { remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &self.timeline_id); self.deletion_queue_client .push_immediate(vec![initdb_path]) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage @@ -1603,7 +1615,9 @@ impl RemoteTimelineClient { // Execute all pending deletions, so that when we proceed to do a listing below, we aren't // taking the burden of listing all the layers that we already know we should delete. - self.flush_deletion_queue().await?; + self.flush_deletion_queue() + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; let cancel = shutdown_token(); @@ -1666,28 +1680,32 @@ impl RemoteTimelineClient { if !remaining_layers.is_empty() { self.deletion_queue_client .push_immediate(remaining_layers) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; } fail::fail_point!("timeline-delete-before-index-delete", |_| { - Err(anyhow::anyhow!( + Err(DeleteTimelineError::Other(anyhow::anyhow!( "failpoint: timeline-delete-before-index-delete" - ))? + )))? }); debug!("enqueuing index part deletion"); self.deletion_queue_client .push_immediate([latest_index].to_vec()) - .await?; + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait // for a flush to a persistent deletion list so that we may be sure deletion will occur. - self.flush_deletion_queue().await?; + self.flush_deletion_queue() + .await + .map_err(|_| DeleteTimelineError::Cancelled)?; fail::fail_point!("timeline-delete-after-index-delete", |_| { - Err(anyhow::anyhow!( + Err(DeleteTimelineError::Other(anyhow::anyhow!( "failpoint: timeline-delete-after-index-delete" - ))? + )))? }); info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json"); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 09ddb19765..2bc14ec317 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -477,8 +477,21 @@ impl GcInfo { self.retain_lsns.sort_by_key(|i| i.0); } - pub(super) fn remove_child(&mut self, child_id: TimelineId) { - self.retain_lsns.retain(|i| i.1 != child_id); + pub(super) fn remove_child_maybe_offloaded( + &mut self, + child_id: TimelineId, + maybe_offloaded: MaybeOffloaded, + ) { + self.retain_lsns + .retain(|i| !(i.1 == child_id && i.2 == maybe_offloaded)); + } + + pub(super) fn remove_child_not_offloaded(&mut self, child_id: TimelineId) { + self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::No); + } + + pub(super) fn remove_child_offloaded(&mut self, child_id: TimelineId) { + self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes); } } @@ -4501,7 +4514,7 @@ impl Drop for Timeline { // This lock should never be poisoned, but in case it is we do a .map() instead of // an unwrap(), to avoid panicking in a destructor and thereby aborting the process. if let Ok(mut gc_info) = ancestor.gc_info.write() { - gc_info.remove_child(self.timeline_id) + gc_info.remove_child_not_offloaded(self.timeline_id) } } } @@ -5030,7 +5043,7 @@ impl Timeline { // 1. Is it newer than GC horizon cutoff point? if l.get_lsn_range().end > space_cutoff { - debug!( + info!( "keeping {} because it's newer than space_cutoff {}", l.layer_name(), space_cutoff, @@ -5041,7 +5054,7 @@ impl Timeline { // 2. It is newer than PiTR cutoff point? if l.get_lsn_range().end > time_cutoff { - debug!( + info!( "keeping {} because it's newer than time_cutoff {}", l.layer_name(), time_cutoff, @@ -5060,7 +5073,7 @@ impl Timeline { for retain_lsn in &retain_lsns { // start_lsn is inclusive if &l.get_lsn_range().start <= retain_lsn { - debug!( + info!( "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", l.layer_name(), retain_lsn, @@ -5075,7 +5088,7 @@ impl Timeline { if let Some(lsn) = &max_lsn_with_valid_lease { // keep if layer start <= any of the lease if &l.get_lsn_range().start <= lsn { - debug!( + info!( "keeping {} because there is a valid lease preventing GC at {}", l.layer_name(), lsn, @@ -5107,13 +5120,13 @@ impl Timeline { if !layers .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) { - debug!("keeping {} because it is the latest layer", l.layer_name()); + info!("keeping {} because it is the latest layer", l.layer_name()); result.layers_not_updated += 1; continue 'outer; } // We didn't find any reason to keep this file, so remove it. - debug!( + info!( "garbage collecting {} is_dropped: xx is_incremental: {}", l.layer_name(), l.is_incremental(), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 5a4c2d9da3..13a8dfa51a 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::Context; use pageserver_api::{models::TimelineState, shard::TenantShardId}; +use remote_storage::DownloadError; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, info_span, instrument, Instrument}; use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; @@ -16,7 +17,7 @@ use crate::{ metadata::TimelineMetadata, remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient}, CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant, - TimelineOrOffloaded, + TenantManifestError, TimelineOrOffloaded, }, virtual_file::MaybeFatalIo, }; @@ -110,13 +111,6 @@ pub(super) async fn delete_local_timeline_directory( info!("finished deleting layer files, releasing locks"); } -/// Removes remote layers and an index file after them. -async fn delete_remote_layers_and_index( - remote_client: &Arc, -) -> anyhow::Result<()> { - remote_client.delete_all().await.context("delete_all") -} - /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] async fn remove_maybe_offloaded_timeline_from_tenant( @@ -147,9 +141,10 @@ async fn remove_maybe_offloaded_timeline_from_tenant( ); } TimelineOrOffloaded::Offloaded(timeline) => { - timelines_offloaded + let offloaded_timeline = timelines_offloaded .remove(&timeline.timeline_id) .expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map"); + offloaded_timeline.delete_from_ancestor_with_timelines(&timelines); } } @@ -221,11 +216,24 @@ impl DeleteTimelineFlow { None => { let remote_client = tenant .build_timeline_client(timeline.timeline_id(), tenant.remote_storage.clone()); - let result = remote_client + let result = match remote_client .download_index_file(&tenant.cancel) .instrument(info_span!("download_index_file")) .await - .map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!("error: {:?}", e)))?; + { + Ok(r) => r, + Err(DownloadError::NotFound) => { + // Deletion is already complete + tracing::info!("Timeline already deleted in remote storage"); + return Ok(()); + } + Err(e) => { + return Err(DeleteTimelineError::Other(anyhow::anyhow!( + "error: {:?}", + e + ))); + } + }; let index_part = match result { MaybeDeletedIndexPart::Deleted(p) => { tracing::info!("Timeline already set as deleted in remote index"); @@ -406,7 +414,12 @@ impl DeleteTimelineFlow { "timeline_delete", async move { if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await { - error!("Error: {err:#}"); + // Only log as an error if it's not a cancellation. + if matches!(err, DeleteTimelineError::Cancelled) { + info!("Shutdown during timeline deletion"); + }else { + error!("Error: {err:#}"); + } if let TimelineOrOffloaded::Timeline(timeline) = timeline { timeline.set_broken(format!("{err:#}")) } @@ -438,7 +451,7 @@ impl DeleteTimelineFlow { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? }); - delete_remote_layers_and_index(&remote_client).await?; + remote_client.delete_all().await?; pausable_failpoint!("in_progress_delete"); @@ -449,10 +462,10 @@ impl DeleteTimelineFlow { // So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted. // However, we handle this case in tenant loading code so the next time we attach, the issue is // resolved. - tenant - .store_tenant_manifest() - .await - .map_err(|e| DeleteTimelineError::Other(anyhow::anyhow!(e)))?; + tenant.store_tenant_manifest().await.map_err(|e| match e { + TenantManifestError::Cancelled => DeleteTimelineError::Cancelled, + _ => DeleteTimelineError::Other(e.into()), + })?; *guard = Self::Finished; diff --git a/pageserver/src/tenant/timeline/offload.rs b/pageserver/src/tenant/timeline/offload.rs index 1394843467..3595d743bc 100644 --- a/pageserver/src/tenant/timeline/offload.rs +++ b/pageserver/src/tenant/timeline/offload.rs @@ -66,7 +66,7 @@ pub(crate) async fn offload_timeline( let conf = &tenant.conf; delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await; - remove_timeline_from_tenant(tenant, &timeline, &guard); + let remaining_refcount = remove_timeline_from_tenant(tenant, &timeline, &guard); { let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap(); @@ -87,16 +87,20 @@ pub(crate) async fn offload_timeline( // not our actual state of offloaded timelines. tenant.store_tenant_manifest().await?; + tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})"); + Ok(()) } /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] +/// +/// Returns the strong count of the timeline `Arc` fn remove_timeline_from_tenant( tenant: &Tenant, timeline: &Timeline, _: &DeletionGuard, // using it as a witness -) { +) -> usize { // Remove the timeline from the map. let mut timelines = tenant.timelines.lock().unwrap(); let children_exist = timelines @@ -109,7 +113,9 @@ fn remove_timeline_from_tenant( panic!("Timeline grew children while we removed layer files"); } - timelines + let timeline = timelines .remove(&timeline.timeline_id) .expect("timeline that we were deleting was concurrently removed from 'timelines' map"); + + Arc::strong_count(&timeline) } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 34bf959058..6ac6920d47 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -331,11 +331,11 @@ pub(super) async fn handle_walreceiver_connection( Ok(()) } - while let Some((record_end_lsn, recdata)) = waldecoder.poll_decode()? { + while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. - if !record_end_lsn.is_aligned() { + if !next_record_lsn.is_aligned() { return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); } @@ -343,7 +343,7 @@ pub(super) async fn handle_walreceiver_connection( let interpreted = InterpretedWalRecord::from_bytes_filtered( recdata, modification.tline.get_shard_identity(), - record_end_lsn, + next_record_lsn, modification.tline.pg_version, )?; @@ -367,10 +367,10 @@ pub(super) async fn handle_walreceiver_connection( .ingest_record(interpreted, &mut modification, &ctx) .await .with_context(|| { - format!("could not ingest record at {record_end_lsn}") + format!("could not ingest record at {next_record_lsn}") })?; if !ingested { - tracing::debug!("ingest: filtered out record @ LSN {record_end_lsn}"); + tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}"); WAL_INGEST.records_filtered.inc(); filtered_records += 1; } @@ -380,7 +380,7 @@ pub(super) async fn handle_walreceiver_connection( // to timeout the tests. fail_point!("walreceiver-after-ingest"); - last_rec_lsn = record_end_lsn; + last_rec_lsn = next_record_lsn; // Commit every ingest_batch_size records. Even if we filtered out // all records, we still need to call commit to advance the LSN. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c3ccd8a2e4..38d69760f2 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -154,7 +154,7 @@ impl WalIngest { WAL_INGEST.records_received.inc(); let prev_len = modification.len(); - modification.set_lsn(interpreted.end_lsn)?; + modification.set_lsn(interpreted.next_record_lsn)?; if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) { // Records of this type should always be preceded by a commit(), as they @@ -587,11 +587,29 @@ impl WalIngest { forknum: VISIBILITYMAP_FORKNUM, }; - let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; - if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { - // Tail of last remaining vm page has to be zeroed. - // We are not precise here and instead of digging in VM bitmap format just clear the whole page. - modification.put_rel_page_image_zero(rel, vm_page_no)?; + // last remaining block, byte, and bit + let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32); + let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE + / pg_constants::VM_HEAPBLOCKS_PER_BYTE; + let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE + * pg_constants::VM_BITS_PER_HEAPBLOCK; + + // Unless the new size is exactly at a visibility map page boundary, the + // tail bits in the last remaining map page, representing truncated heap + // blocks, need to be cleared. This is not only tidy, but also necessary + // because we don't get a chance to clear the bits if the heap is extended + // again. + if (trunc_byte != 0 || trunc_offs != 0) + && self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no)) + { + modification.put_rel_wal_record( + rel, + vm_page_no, + NeonWalRecord::TruncateVisibilityMap { + trunc_byte, + trunc_offs, + }, + )?; vm_page_no += 1; } let nblocks = get_relsize(modification, rel, ctx).await?; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index 78601d87af..d62e325310 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -42,6 +42,34 @@ pub(crate) fn apply_in_neon( } => { anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); } + // + // Code copied from PostgreSQL `visibilitymap_prepare_truncate` function in `visibilitymap.c` + // + NeonWalRecord::TruncateVisibilityMap { + trunc_byte, + trunc_offs, + } => { + // sanity check that this is modifying the correct relation + let (rel, _) = key.to_rel_block().context("invalid record")?; + assert!( + rel.forknum == VISIBILITYMAP_FORKNUM, + "TruncateVisibilityMap record on unexpected rel {}", + rel + ); + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + map[*trunc_byte + 1..].fill(0u8); + /*---- + * Mask out the unwanted bits of the last remaining byte. + * + * ((1 << 0) - 1) = 00000000 + * ((1 << 1) - 1) = 00000001 + * ... + * ((1 << 6) - 1) = 00111111 + * ((1 << 7) - 1) = 01111111 + *---- + */ + map[*trunc_byte] &= (1 << *trunc_offs) - 1; + } NeonWalRecord::ClearVisibilityMapFlags { new_heap_blkno, old_heap_blkno, diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index dc87d79e87..f207ed61f9 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -512,7 +512,7 @@ neon_shmem_startup_hook(void) if (prev_shmem_startup_hook) prev_shmem_startup_hook(); -#if PG_PG_MAJORVERSION_NUM >= 17 +#if PG_MAJORVERSION_NUM >= 17 WAIT_EVENT_NEON_LFC_MAINTENANCE = WaitEventExtensionNew("Neon/FileCache_Maintenance"); WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read"); WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate"); diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index a0a96c6e99..2edcc4ef6f 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -562,6 +562,9 @@ impl WalAcceptor { // Don't flush the WAL on every append, only periodically via flush_ticker. // This batches multiple appends per fsync. If the channel is empty after // sending the reply, we'll schedule an immediate flush. + // + // Note that a flush can still happen on segment bounds, which will result + // in an AppendResponse. if let ProposerAcceptorMessage::AppendRequest(append_request) = msg { msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); dirty = true; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index f4983d44d0..6eb69f0b7c 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -947,6 +947,7 @@ where // while first connection still gets some packets later. It might be // better to not log this as error! above. let write_lsn = self.wal_store.write_lsn(); + let flush_lsn = self.wal_store.flush_lsn(); if write_lsn > msg.h.begin_lsn { bail!( "append request rewrites WAL written before, write_lsn={}, msg lsn={}", @@ -1004,7 +1005,9 @@ where ); // If flush_lsn hasn't updated, AppendResponse is not very useful. - if !require_flush { + // This is the common case for !require_flush, but a flush can still + // happen on segment bounds. + if !require_flush && flush_lsn == self.flush_lsn() { return Ok(None); } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 11f372bceb..e338d70731 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -113,6 +113,13 @@ pub struct PhysicalStorage { /// non-aligned chunks of data. write_record_lsn: Lsn, + /// The last LSN flushed to disk. May be in the middle of a record. + /// + /// NB: when the rest of the system refers to `flush_lsn`, it usually + /// actually refers to `flush_record_lsn`. This ambiguity can be dangerous + /// and should be resolved. + flush_lsn: Lsn, + /// The LSN of the last WAL record flushed to disk. flush_record_lsn: Lsn, @@ -127,23 +134,29 @@ pub struct PhysicalStorage { /// - doesn't point to the end of the segment file: Option, - /// When false, we have just initialized storage using the LSN from find_end_of_wal(). - /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular, - /// there can be a case with unexpected .partial file. + /// When true, WAL truncation potentially has been interrupted and we need + /// to finish it before allowing WAL writes; see truncate_wal for details. + /// In this case [`write_lsn`] can be less than actually written WAL on + /// disk. In particular, there can be a case with unexpected .partial file. /// /// Imagine the following: /// - 000000010000000000000001 - /// - it was fully written, but the last record is split between 2 segments - /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment - /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0 + /// - it was fully written, but the last record is split between 2 + /// segments + /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in + /// the end of this segment + /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were + /// initialized to 0/1FFFFF0 /// - 000000010000000000000002.partial - /// - it has only 1 byte written, which is not enough to make a full WAL record + /// - it has only 1 byte written, which is not enough to make a full WAL + /// record /// - /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal(). - /// This flag will be set to true after the first truncate_wal() call. + /// Partial segment 002 has no WAL records, and it will be removed by the + /// next truncate_wal(). This flag will be set to true after the first + /// truncate_wal() call. /// /// [`write_lsn`]: Self::write_lsn - is_truncated_after_restart: bool, + pending_wal_truncation: bool, } impl PhysicalStorage { @@ -205,10 +218,11 @@ impl PhysicalStorage { system_id: state.server.system_id, write_lsn, write_record_lsn: write_lsn, + flush_lsn, flush_record_lsn: flush_lsn, decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000), file: None, - is_truncated_after_restart: false, + pending_wal_truncation: true, }) } @@ -289,8 +303,9 @@ impl PhysicalStorage { } } - /// Write WAL bytes, which are known to be located in a single WAL segment. - async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { + /// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the + /// segment was completed, closed, and flushed to disk. + async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result { let mut file = if let Some(file) = self.file.take() { file } else { @@ -314,20 +329,24 @@ impl PhysicalStorage { let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size); fs::rename(wal_file_partial_path, wal_file_path).await?; + Ok(true) } else { // otherwise, file can be reused later self.file = Some(file); + Ok(false) } - - Ok(()) } /// Writes WAL to the segment files, until everything is writed. If some segments /// are fully written, they are flushed to disk. The last (partial) segment can /// be flushed separately later. /// - /// Updates `write_lsn`. + /// Updates `write_lsn` and `flush_lsn`. async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { + // TODO: this shouldn't be possible, except possibly with write_lsn == 0. + // Rename this method to `append_exact`, and make it append-only, removing + // the `pos` parameter and this check. For this reason, we don't update + // `flush_lsn` here. if self.write_lsn != pos { // need to flush the file before discarding it if let Some(file) = self.file.take() { @@ -349,9 +368,13 @@ impl PhysicalStorage { buf.len() }; - self.write_in_segment(segno, xlogoff, &buf[..bytes_write]) + let flushed = self + .write_in_segment(segno, xlogoff, &buf[..bytes_write]) .await?; self.write_lsn += bytes_write as u64; + if flushed { + self.flush_lsn = self.write_lsn; + } buf = &buf[bytes_write..]; } @@ -365,6 +388,9 @@ impl Storage for PhysicalStorage { self.write_lsn } /// flush_lsn returns LSN of last durably stored WAL record. + /// + /// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing. + #[allow(clippy::misnamed_getters)] fn flush_lsn(&self) -> Lsn { self.flush_record_lsn } @@ -405,14 +431,22 @@ impl Storage for PhysicalStorage { startpos ); } + if self.pending_wal_truncation { + bail!( + "write_wal called with pending WAL truncation, write_lsn={}, startpos={}", + self.write_lsn, + startpos + ); + } let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?; // WAL is written, updating write metrics self.metrics.observe_write_seconds(write_seconds); self.metrics.observe_write_bytes(buf.len()); - // figure out last record's end lsn for reporting (if we got the - // whole record) + // Figure out the last record's end LSN and update `write_record_lsn` + // (if we got a whole record). The write may also have closed and + // flushed a segment, so update `flush_record_lsn` as well. if self.decoder.available() != startpos { info!( "restart decoder from {} to {}", @@ -423,12 +457,15 @@ impl Storage for PhysicalStorage { self.decoder = WalStreamDecoder::new(startpos, pg_version); } self.decoder.feed_bytes(buf); - loop { - match self.decoder.poll_decode()? { - None => break, // no full record yet - Some((lsn, _rec)) => { - self.write_record_lsn = lsn; - } + + if self.write_record_lsn <= self.flush_lsn { + // We may have flushed a previously written record. + self.flush_record_lsn = self.write_record_lsn; + } + while let Some((lsn, _rec)) = self.decoder.poll_decode()? { + self.write_record_lsn = lsn; + if lsn <= self.flush_lsn { + self.flush_record_lsn = lsn; } } @@ -445,19 +482,17 @@ impl Storage for PhysicalStorage { self.fdatasync_file(&unflushed_file).await?; self.file = Some(unflushed_file); } else { - // We have unflushed data (write_lsn != flush_lsn), but no file. - // This should only happen if last file was fully written and flushed, - // but haven't updated flush_lsn yet. - if self.write_lsn.segment_offset(self.wal_seg_size) != 0 { - bail!( - "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}", - self.write_lsn, - self.flush_record_lsn - ); - } + // We have unflushed data (write_lsn != flush_lsn), but no file. This + // shouldn't happen, since the segment is flushed on close. + bail!( + "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}", + self.write_lsn, + self.flush_record_lsn + ); } // everything is flushed now, let's update flush_lsn + self.flush_lsn = self.write_lsn; self.flush_record_lsn = self.write_record_lsn; Ok(()) } @@ -479,15 +514,35 @@ impl Storage for PhysicalStorage { ); } - // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on - // disk (this happens on each connect). - if self.is_truncated_after_restart + // Quick exit if nothing to do and we know that the state is clean to + // avoid writing up to 16 MiB of zeros on disk (this happens on each + // connect). + if !self.pending_wal_truncation && end_pos == self.write_lsn && end_pos == self.flush_record_lsn { return Ok(()); } + // Atomicity: we start with LSNs reset because once on disk deletion is + // started it can't be reversed. However, we might crash/error in the + // middle, leaving garbage above the truncation point. In theory, + // concatenated with previous records it might form bogus WAL (though + // very unlikely in practice because CRC would guard from that). To + // protect, set pending_wal_truncation flag before beginning: it means + // truncation must be retried and WAL writes are prohibited until it + // succeeds. Flag is also set on boot because we don't know if the last + // state was clean. + // + // Protocol (HandleElected before first AppendRequest) ensures we'll + // always try to ensure clean truncation before any writes. + self.pending_wal_truncation = true; + + self.write_lsn = end_pos; + self.flush_lsn = end_pos; + self.write_record_lsn = end_pos; + self.flush_record_lsn = end_pos; + // Close previously opened file, if any if let Some(unflushed_file) = self.file.take() { self.fdatasync_file(&unflushed_file).await?; @@ -513,11 +568,7 @@ impl Storage for PhysicalStorage { fs::rename(wal_file_path, wal_file_partial_path).await?; } - // Update LSNs - self.write_lsn = end_pos; - self.write_record_lsn = end_pos; - self.flush_record_lsn = end_pos; - self.is_truncated_after_restart = true; + self.pending_wal_truncation = false; Ok(()) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index e3a147bc06..446c476b99 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3642,6 +3642,7 @@ impl Service { match res { Ok(ok) => Ok(ok), Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT), + Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())), Err(e) => { Err( ApiError::InternalServerError(anyhow::anyhow!( @@ -6355,6 +6356,19 @@ impl Service { // Pick the biggest tenant to split first top_n.sort_by_key(|i| i.resident_size); + + // Filter out tenants in a prohibiting scheduling mode + { + let locked = self.inner.read().unwrap(); + top_n.retain(|i| { + if let Some(shard) = locked.tenants.get(&i.id) { + matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) + } else { + false + } + }); + } + let Some(split_candidate) = top_n.into_iter().next() else { tracing::debug!("No split-elegible shards found"); return; diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 89c1f324b4..9de6681beb 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, cast, final import requests +from fixtures.log_helper import log + if TYPE_CHECKING: from typing import Any, Literal, Optional @@ -30,7 +32,11 @@ class NeonAPI: kwargs["headers"] = {} kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}" - return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) + resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) + log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text) + resp.raise_for_status() + + return resp def create_project( self, @@ -66,8 +72,6 @@ class NeonAPI: json=data, ) - assert resp.status_code == 201 - return cast("dict[str, Any]", resp.json()) def get_project_details(self, project_id: str) -> dict[str, Any]: @@ -79,7 +83,7 @@ class NeonAPI: "Content-Type": "application/json", }, ) - assert resp.status_code == 200 + return cast("dict[str, Any]", resp.json()) def delete_project( @@ -95,8 +99,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def start_endpoint( @@ -112,8 +114,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def suspend_endpoint( @@ -129,8 +129,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def restart_endpoint( @@ -146,8 +144,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def create_endpoint( @@ -178,8 +174,6 @@ class NeonAPI: json=data, ) - assert resp.status_code == 201 - return cast("dict[str, Any]", resp.json()) def get_connection_uri( @@ -206,8 +200,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_branches(self, project_id: str) -> dict[str, Any]: @@ -219,8 +211,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_endpoints(self, project_id: str) -> dict[str, Any]: @@ -232,8 +222,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def get_operations(self, project_id: str) -> dict[str, Any]: @@ -246,8 +234,6 @@ class NeonAPI: }, ) - assert resp.status_code == 200 - return cast("dict[str, Any]", resp.json()) def wait_for_operation_to_finish(self, project_id: str): diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 990db1aed0..205a47a9d5 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2379,6 +2379,17 @@ class NeonPageserver(PgProtocol, LogUtils): # # The entries in the list are regular experessions. self.allowed_errors: list[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS) + # Store persistent failpoints that should be reapplied on each start + self._persistent_failpoints: dict[str, str] = {} + + def add_persistent_failpoint(self, name: str, action: str): + """ + Add a failpoint that will be automatically reapplied each time the pageserver starts. + The failpoint will be set immediately if the pageserver is running. + """ + self._persistent_failpoints[name] = action + if self.running: + self.http_client().configure_failpoints([(name, action)]) def timeline_dir( self, @@ -2446,6 +2457,15 @@ class NeonPageserver(PgProtocol, LogUtils): """ assert self.running is False + if self._persistent_failpoints: + # Tests shouldn't use this mechanism _and_ set FAILPOINTS explicitly + assert extra_env_vars is None or "FAILPOINTS" not in extra_env_vars + if extra_env_vars is None: + extra_env_vars = {} + extra_env_vars["FAILPOINTS"] = ",".join( + f"{k}={v}" for (k, v) in self._persistent_failpoints.items() + ) + storage = self.env.pageserver_remote_storage if isinstance(storage, S3Storage): s3_env_vars = storage.access_env_vars() @@ -4522,7 +4542,7 @@ def pytest_addoption(parser: Parser): SMALL_DB_FILE_NAME_REGEX: re.Pattern[str] = re.compile( - r"config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql|conf)" + r"config-v1|heatmap-v1|tenant-manifest|metadata|.+\.(?:toml|pid|json|sql|conf)" ) diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 050c09c1e5..9d653d1a1e 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -1,6 +1,8 @@ from __future__ import annotations import time +from collections.abc import Iterator +from contextlib import contextmanager from typing import TYPE_CHECKING, cast import psycopg2 @@ -18,7 +20,7 @@ if TYPE_CHECKING: from fixtures.benchmark_fixture import NeonBenchmarker from fixtures.neon_api import NeonApiEndpoint from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres - from psycopg2.extensions import cursor + from psycopg2.extensions import connection, cursor @pytest.mark.timeout(1000) @@ -292,6 +294,48 @@ def test_snap_files( then runs pgbench inserts while generating large numbers of snapfiles. Then restarts the node and tries to peek the replication changes. """ + + @contextmanager + def replication_slot(conn: connection, slot_name: str) -> Iterator[None]: + """ + Make sure that the replication slot doesn't outlive the test. Normally + we wouldn't want this behavior, but since the test creates and drops + the replication slot, we do. + + We've had problems in the past where this slot sticking around caused + issues with the publisher retaining WAL during the execution of the + other benchmarks in this suite. + """ + + def __drop_replication_slot(c: cursor) -> None: + c.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_replication_slots + WHERE slot_name = %(slot_name)s + ) THEN + PERFORM pg_drop_replication_slot(%(slot_name)s); + END IF; + END $$; + """, + {"slot_name": slot_name}, + ) + + with conn.cursor() as c: + __drop_replication_slot(c) + c.execute( + "SELECT pg_create_logical_replication_slot(%(slot_name)s, 'test_decoding')", + {"slot_name": slot_name}, + ) + + yield + + with conn.cursor() as c: + __drop_replication_slot(c) + test_duration_min = 60 test_interval_min = 5 pgbench_duration = f"-T{test_duration_min * 60 * 2}" @@ -314,48 +358,35 @@ def test_snap_files( conn = psycopg2.connect(connstr) conn.autocommit = True - with conn.cursor() as cur: - cur.execute( - """ - DO $$ - BEGIN - IF EXISTS ( - SELECT 1 - FROM pg_replication_slots - WHERE slot_name = 'slotter' - ) THEN - PERFORM pg_drop_replication_slot('slotter'); - END IF; - END $$; - """ + with replication_slot(conn, "slotter"): + workload = pg_bin.run_nonblocking( + ["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env ) - cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + try: + start = time.time() + prev_measurement = time.time() + while time.time() - start < test_duration_min * 60: + conn = psycopg2.connect(connstr) + conn.autocommit = True + + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" + ) + check_pgbench_still_running(workload) + cur.execute( + "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" + ) + + conn.close() + + # Measure storage + if time.time() - prev_measurement > test_interval_min * 60: + storage = benchmark_project_pub.get_synthetic_storage_size() + zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) + prev_measurement = time.time() + time.sleep(test_interval_min * 60 / 3) + finally: + workload.terminate() conn.close() - - workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env) - try: - start = time.time() - prev_measurement = time.time() - while time.time() - start < test_duration_min * 60: - conn = psycopg2.connect(connstr) - conn.autocommit = True - - with conn.cursor() as cur: - cur.execute( - "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" - ) - check_pgbench_still_running(workload) - cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())") - - conn.close() - - # Measure storage - if time.time() - prev_measurement > test_interval_min * 60: - storage = benchmark_project_pub.get_synthetic_storage_size() - zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) - prev_measurement = time.time() - time.sleep(test_interval_min * 60 / 3) - - finally: - workload.terminate() diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 8f6c9f16fd..4f59efb8b3 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -35,9 +35,10 @@ from fixtures.pageserver.utils import ( wait_for_upload, ) from fixtures.remote_storage import ( + LocalFsStorage, RemoteStorageKind, ) -from fixtures.utils import wait_until +from fixtures.utils import run_only_on_default_postgres, wait_until from fixtures.workload import Workload if TYPE_CHECKING: @@ -728,3 +729,68 @@ def test_upgrade_generationless_local_file_paths( ) # We should download into the same local path we started with assert os.path.exists(victim_path) + + +@run_only_on_default_postgres("Only tests index logic") +def test_old_index_time_threshold( + neon_env_builder: NeonEnvBuilder, +): + """ + Exercise pageserver's detection of trying to load an ancient non-latest index. + (see https://github.com/neondatabase/neon/issues/6951) + """ + + # Run with local_fs because we will interfere with mtimes by local filesystem access + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + env = neon_env_builder.init_start() + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + workload = Workload(env, tenant_id, timeline_id) + workload.init() + workload.write_rows(32) + + # Remember generation 1's index path + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + index_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id) + + # Increment generation by detaching+attaching, and write+flush some data to get a new remote index + env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"}) + env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}}) + env.storage_controller.reconcile_until_idle() + workload.churn_rows(32) + + # A new index should have been written + assert env.pageserver_remote_storage.index_path(tenant_id, timeline_id) != index_path + + # Hack the mtime on the generation 1 index + log.info(f"Setting old mtime on {index_path}") + os.utime(index_path, times=(time.time(), time.time() - 30 * 24 * 3600)) + env.pageserver.allowed_errors.extend( + [ + ".*Found a newer index while loading an old one.*", + ".*Index age exceeds threshold and a newer index exists.*", + ] + ) + + # Detach from storage controller + attach in an old generation directly on the pageserver. + workload.stop() + env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"}) + env.storage_controller.reconcile_until_idle() + env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"}) + env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy") + + # The controller would not do this (attach in an old generation): we are doing it to simulate + # a hypothetical profound bug in the controller. + env.pageserver.http_client().tenant_location_conf( + tenant_id, {"generation": 1, "mode": "AttachedSingle", "tenant_conf": {}} + ) + + # The pageserver should react to this situation by refusing to attach the tenant and putting + # it into Broken state + env.pageserver.allowed_errors.append(".*tenant is broken.*") + with pytest.raises( + PageserverApiException, + match="tenant is broken: Index age exceeds threshold and a newer index exists", + ): + env.pageserver.http_client().timeline_detail(tenant_id, timeline_id) diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index f257f0853b..826136d5f9 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -122,6 +122,7 @@ def test_readonly_node(neon_simple_env: NeonEnv): ) +@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/9754") def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): """ Test static endpoint is protected from GC by acquiring and renewing lsn leases. diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index d3839e3d2c..ba4e79c343 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -1,22 +1,33 @@ from __future__ import annotations import json +import random +import threading +import time from typing import Optional import pytest -from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId +import requests +from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, last_flush_lsn_upload, ) from fixtures.pageserver.http import PageserverApiException -from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty, list_prefix +from fixtures.pageserver.utils import ( + assert_prefix_empty, + assert_prefix_not_empty, + list_prefix, + wait_until_tenant_active, +) +from fixtures.pg_version import PgVersion from fixtures.remote_storage import S3Storage, s3_storage -from fixtures.utils import wait_until +from fixtures.utils import run_only_on_default_postgres, wait_until from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) +from psycopg2.errors import IoError, UndefinedTable @pytest.mark.parametrize("shard_count", [0, 4]) @@ -378,8 +389,279 @@ def test_timeline_offload_persist(neon_env_builder: NeonEnvBuilder, delete_timel ) -@pytest.mark.parametrize("offload_child", ["offload", "offload-corrupt", "archive", None]) -def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Optional[str]): +@run_only_on_default_postgres("this test isn't sensitive to the contents of timelines") +def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): + """ + A general consistency check on archival/offload timeline state, and its intersection + with tenant migrations and timeline deletions. + """ + + # Offloading is off by default at time of writing: remove this line when it's on by default + neon_env_builder.pageserver_config_override = "timeline_offloading = true" + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + + # We will exercise migrations, so need multiple pageservers + neon_env_builder.num_pageservers = 2 + + env = neon_env_builder.init_start( + initial_tenant_conf={ + "compaction_period": "1s", + } + ) + tenant_id = env.initial_tenant + tenant_shard_id = TenantShardId(tenant_id, 0, 0) + + # Unavailable pageservers during timeline CRUD operations can be logged as errors on the storage controller + env.storage_controller.allowed_errors.extend( + [ + ".*error sending request.*", + # FIXME: the pageserver should not return 500s on cancellation (https://github.com/neondatabase/neon/issues/97680) + ".*InternalServerError(Error deleting timeline .* on .* on .*: pageserver API: error: Cancelled", + ] + ) + + for ps in env.pageservers: + # We will do unclean restarts, which results in these messages when cleaning up files + ps.allowed_errors.extend( + [ + ".*removing local file.*because it has unexpected length.*", + ".*__temp.*", + # FIXME: there are still anyhow::Error paths in timeline creation/deletion which + # generate 500 results when called during shutdown (https://github.com/neondatabase/neon/issues/9768) + ".*InternalServerError.*", + # FIXME: there are still anyhow::Error paths in timeline deletion that generate + # log lines at error severity (https://github.com/neondatabase/neon/issues/9768) + ".*delete_timeline.*Error", + ] + ) + + class TimelineState: + def __init__(self): + self.timeline_id = TimelineId.generate() + self.created = False + self.archived = False + self.offloaded = False + self.deleted = False + + controller_ps_api = env.storage_controller.pageserver_api() + + shutdown = threading.Event() + + violations = [] + + timelines_deleted = [] + + def list_timelines(tenant_id) -> tuple[set[TimelineId], set[TimelineId]]: + """Get the list of active and offloaded TimelineId""" + listing = controller_ps_api.timeline_and_offloaded_list(tenant_id) + active_ids = set([TimelineId(t["timeline_id"]) for t in listing.timelines]) + offloaded_ids = set([TimelineId(t["timeline_id"]) for t in listing.offloaded]) + + return (active_ids, offloaded_ids) + + def timeline_objects(tenant_shard_id, timeline_id): + response = list_prefix( + env.pageserver_remote_storage, # type: ignore + prefix="/".join( + ( + "tenants", + str(tenant_shard_id), + "timelines", + str(timeline_id), + ) + ) + + "/", + ) + + return [k["Key"] for k in response.get("Contents", [])] + + def worker(): + """ + Background thread which drives timeline lifecycle operations, and checks that between steps + it obeys invariants. This should detect errors in pageserver persistence and in errors in + concurrent operations on different timelines when it is run many times in parallel. + """ + state = TimelineState() + + # Jitter worker startup, we're not interested in exercising lots of concurrent creations + # as we know that's I/O bound. + shutdown.wait(random.random() * 10) + + while not shutdown.is_set(): + # A little wait between actions to jitter out the API calls rather than having them + # all queue up at once + shutdown.wait(random.random()) + + try: + if not state.created: + log.info(f"Creating timeline {state.timeline_id}") + controller_ps_api.timeline_create( + PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=state.timeline_id + ) + state.created = True + + if ( + timeline_objects( + tenant_shard_id=tenant_shard_id, timeline_id=state.timeline_id + ) + == [] + ): + msg = f"Timeline {state.timeline_id} unexpectedly not present in remote storage" + violations.append(msg) + + elif state.deleted: + # Try to confirm its deletion completed. + # Deleted timeline should not appear in listing API, either as offloaded or active + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids or state.timeline_id in offloaded_ids: + msg = f"Timeline {state.timeline_id} appeared in listing after deletion was acked" + violations.append(msg) + raise RuntimeError(msg) + + objects = timeline_objects(tenant_shard_id, state.timeline_id) + if len(objects) == 0: + log.info(f"Confirmed deletion of timeline {state.timeline_id}") + timelines_deleted.append(state.timeline_id) + state = TimelineState() # A new timeline ID to create on next iteration + else: + # Deletion of objects doesn't have to be synchronous, we will keep polling + log.info(f"Timeline {state.timeline_id} objects still exist: {objects}") + shutdown.wait(random.random()) + else: + # The main lifetime of a timeline: proceed active->archived->offloaded->deleted + if not state.archived: + log.info(f"Archiving timeline {state.timeline_id}") + controller_ps_api.timeline_archival_config( + tenant_id, state.timeline_id, TimelineArchivalState.ARCHIVED + ) + state.archived = True + elif state.archived and not state.offloaded: + log.info(f"Waiting for offload of timeline {state.timeline_id}") + # Wait for offload: this should happen fast because we configured a short compaction interval + while not shutdown.is_set(): + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids: + log.info(f"Timeline {state.timeline_id} is still active") + shutdown.wait(0.5) + elif state.timeline_id in offloaded_ids: + log.info(f"Timeline {state.timeline_id} is now offloaded") + state.offloaded = True + break + else: + # Timeline is neither offloaded nor active, this is unexpected: the pageserver + # should ensure that the timeline appears in either the offloaded list or main list + msg = f"Timeline {state.timeline_id} disappeared!" + violations.append(msg) + raise RuntimeError(msg) + elif state.offloaded: + # Once it's offloaded it should only be in offloaded or deleted state: check + # it didn't revert back to active. This tests that the manfiest is doing its + # job to suppress loading of offloaded timelines as active. + (active_ids, offloaded_ids) = list_timelines(tenant_id) + if state.timeline_id in active_ids: + msg = f"Timeline {state.timeline_id} is active, should be offloaded or deleted" + violations.append(msg) + raise RuntimeError(msg) + + log.info(f"Deleting timeline {state.timeline_id}") + controller_ps_api.timeline_delete(tenant_id, state.timeline_id) + state.deleted = True + else: + raise RuntimeError("State should be unreachable") + except PageserverApiException as e: + # This is expected: we are injecting chaos, API calls will sometimes fail. + # TODO: can we narrow this to assert we are getting friendly 503s? + log.info(f"Iteration error, will retry: {e}") + shutdown.wait(random.random()) + except requests.exceptions.RetryError as e: + # Retryable error repeated more times than `requests` is configured to tolerate, this + # is expected when a pageserver remains unavailable for a couple seconds + log.info(f"Iteration error, will retry: {e}") + shutdown.wait(random.random()) + except Exception as e: + log.warning( + f"Unexpected worker exception (current timeline {state.timeline_id}): {e}" + ) + else: + # In the non-error case, use a jitterd but small wait, we want to keep + # a high rate of operations going + shutdown.wait(random.random() * 0.1) + + n_workers = 4 + threads = [] + for _i in range(0, n_workers): + t = threading.Thread(target=worker) + t.start() + threads.append(t) + + # Set delay failpoints so that deletions and migrations take some time, and have a good + # chance to interact with other concurrent timeline mutations. + env.storage_controller.configure_failpoints( + [("reconciler-live-migrate-pre-await-lsn", "sleep(1)")] + ) + for ps in env.pageservers: + ps.add_persistent_failpoint("in_progress_delete", "sleep(1)") + + # Generate some chaos, while our workers are trying to complete their timeline operations + rng = random.Random() + try: + chaos_rounds = 48 + for _i in range(0, chaos_rounds): + action = rng.choice([0, 1]) + if action == 0: + # Pick a random pageserver to gracefully restart + pageserver = rng.choice(env.pageservers) + + # Whether to use a graceful shutdown or SIGKILL + immediate = random.choice([True, False]) + log.info(f"Restarting pageserver {pageserver.id}, immediate={immediate}") + + t1 = time.time() + pageserver.restart(immediate=immediate) + restart_duration = time.time() - t1 + + # Make sure we're up for as long as we spent restarting, to ensure operations can make progress + log.info(f"Staying alive for {restart_duration}s") + time.sleep(restart_duration) + else: + # Migrate our tenant between pageservers + origin_ps = env.get_tenant_pageserver(tenant_shard_id) + dest_ps = rng.choice([ps for ps in env.pageservers if ps.id != origin_ps.id]) + log.info(f"Migrating {tenant_shard_id} {origin_ps.id}->{dest_ps.id}") + env.storage_controller.tenant_shard_migrate( + tenant_shard_id=tenant_shard_id, dest_ps_id=dest_ps.id + ) + + log.info(f"Full timeline lifecycles so far: {len(timelines_deleted)}") + finally: + shutdown.set() + + for thread in threads: + thread.join() + + # Sanity check that during our run we did exercise some full timeline lifecycles, in case + # one of our workers got stuck + assert len(timelines_deleted) > 10 + + # That no invariant-violations were reported by workers + assert violations == [] + + +@pytest.mark.parametrize("with_intermediary", [False, True]) +@pytest.mark.parametrize( + "offload_child", + [ + "offload", + "offload-corrupt", + "offload-no-restart", + "offload-parent", + "archive", + None, + ], +) +def test_timeline_retain_lsn( + neon_env_builder: NeonEnvBuilder, with_intermediary: bool, offload_child: Optional[str] +): """ Ensure that retain_lsn functionality for timelines works, both for offloaded and non-offloaded ones """ @@ -387,6 +669,7 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op # Our corruption code only works with S3 compatible storage neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.rust_log_override = "info,[gc_timeline]=debug" env = neon_env_builder.init_start() ps_http = env.pageserver.http_client() @@ -394,22 +677,30 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op tenant_id, root_timeline_id = env.create_tenant( conf={ # small checkpointing and compaction targets to ensure we generate many upload operations - "checkpoint_distance": 128 * 1024, + "checkpoint_distance": 32 * 1024, "compaction_threshold": 1, - "compaction_target_size": 128 * 1024, + "compaction_target_size": 32 * 1024, # set small image creation thresholds so that gc deletes data - "image_creation_threshold": 2, + "image_creation_threshold": 1, # disable background compaction and GC. We invoke it manually when we want it to happen. "gc_period": "0s", "compaction_period": "0s", # Disable pitr, we only want the latest lsn "pitr_interval": "0s", + "gc_horizon": 0, # Don't rely on endpoint lsn leases "lsn_lease_length": "0s", } ) - with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + if with_intermediary: + parent_branch_name = "test_archived_parent" + parent_timeline_id = env.create_branch("test_archived_parent", tenant_id) + else: + parent_branch_name = "main" + parent_timeline_id = root_timeline_id + + with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint: endpoint.safe_psql_many( [ "CREATE TABLE foo(v int, key serial primary key, t text default 'data_content')", @@ -419,14 +710,16 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op ) pre_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") log.info(f"Pre branch sum: {pre_branch_sum}") - last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) + last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id) # Create a branch and write some additional data to the parent - child_timeline_id = env.create_branch("test_archived_branch", tenant_id) + child_timeline_id = env.create_branch( + "test_archived_branch", tenant_id, ancestor_branch_name=parent_branch_name + ) - with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: - # Do some churn of the data. This is important so that we can overwrite image layers. - for i in range(10): + with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint: + # Do some overwriting churn with compactions in between. This is important so that we can overwrite image layers. + for i in range(5): endpoint.safe_psql_many( [ f"SELECT setseed(0.23{i})", @@ -435,9 +728,9 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op "UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 0", ] ) + last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id) post_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") log.info(f"Post branch sum: {post_branch_sum}") - last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) if offload_child is not None: ps_http.timeline_archival_config( @@ -452,9 +745,19 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op assert leaf_detail["is_archived"] is True if "offload" in offload_child: ps_http.timeline_offload(tenant_id, child_timeline_id) + if "offload-parent" in offload_child: + # Also offload the parent to ensure the retain_lsn of the child + # is entered in the parent at unoffloading + ps_http.timeline_archival_config( + tenant_id, + parent_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + ps_http.timeline_offload(tenant_id, parent_timeline_id) # Do a restart to get rid of any in-memory objects (we only init gc info once, at attach) - env.pageserver.stop() + if offload_child is None or "no-restart" not in offload_child: + env.pageserver.stop() if offload_child == "offload-corrupt": assert isinstance(env.pageserver_remote_storage, S3Storage) listing = list_prefix( @@ -489,13 +792,21 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op ".*page_service_conn_main.*could not find data for key.*", ] ) - env.pageserver.start() + if offload_child is None or "no-restart" not in offload_child: + env.pageserver.start() + if offload_child == "offload-parent": + wait_until_tenant_active(ps_http, tenant_id=tenant_id) + ps_http.timeline_archival_config( + tenant_id, + parent_timeline_id, + state=TimelineArchivalState.UNARCHIVED, + ) # Do an agressive gc and compaction of the parent branch - ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=root_timeline_id, gc_horizon=0) + ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=parent_timeline_id, gc_horizon=0) ps_http.timeline_checkpoint( tenant_id, - root_timeline_id, + parent_timeline_id, force_l0_compaction=True, force_repartition=True, wait_until_uploaded=True, @@ -511,10 +822,15 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op # Now, after unarchival, the child timeline should still have its data accessible (or corrupted) if offload_child == "offload-corrupt": - with pytest.raises(RuntimeError, match=".*failed to get basebackup.*"): - env.endpoints.create_start( + if with_intermediary: + error_regex = "(.*could not read .* from page server.*|.*relation .* does not exist)" + else: + error_regex = ".*failed to get basebackup.*" + with pytest.raises((RuntimeError, IoError, UndefinedTable), match=error_regex): + with env.endpoints.create_start( "test_archived_branch", tenant_id=tenant_id, basebackup_request_tries=1 - ) + ) as endpoint: + endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") else: with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint: sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") diff --git a/test_runner/regress/test_vm_truncate.py b/test_runner/regress/test_vm_truncate.py new file mode 100644 index 0000000000..43b4f2d8b1 --- /dev/null +++ b/test_runner/regress/test_vm_truncate.py @@ -0,0 +1,33 @@ +from fixtures.neon_fixtures import NeonEnv + + +# +# Test that VM is properly truncated +# +def test_vm_truncate(neon_simple_env: NeonEnv): + env = neon_simple_env + + endpoint = env.endpoints.create_start("main") + con = endpoint.connect() + cur = con.cursor() + cur.execute("CREATE EXTENSION neon_test_utils") + cur.execute("CREATE EXTENSION pageinspect") + + cur.execute( + "create table t(pk integer primary key, counter integer default 0, filler text default repeat('?', 200))" + ) + cur.execute("insert into t (pk) values (generate_series(1,1000))") + cur.execute("delete from t where pk>10") + cur.execute("vacuum t") # truncates the relation, including its VM and FSM + # get image of the first block of the VM excluding the page header. It's expected + # to still be in the buffer cache. + # ignore page header (24 bytes, 48 - it's hex representation) + cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)") + pg_bitmap = cur.fetchall()[0][0] + # flush shared buffers + cur.execute("SELECT clear_buffer_cache()") + # now download the first block of the VM from the pageserver ... + cur.execute("select substr(encode(get_raw_page('t', 'vm', 0), 'hex'), 48)") + ps_bitmap = cur.fetchall()[0][0] + # and check that content of bitmaps are equal, i.e. PS is producing the same VM page as Postgres + assert pg_bitmap == ps_bitmap diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index ae4018a884..53d3a7364b 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -58,7 +58,7 @@ num-integer = { version = "0.1", features = ["i128"] } num-traits = { version = "0.2", features = ["i128", "libm"] } once_cell = { version = "1" } parquet = { version = "53", default-features = false, features = ["zstd"] } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", default-features = false, features = ["with-serde_json-1"] } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", default-features = false, features = ["with-serde_json-1"] } prost = { version = "0.13", features = ["prost-derive"] } rand = { version = "0.8", features = ["small_rng"] } regex = { version = "1" } @@ -75,10 +75,10 @@ smallvec = { version = "1", default-features = false, features = ["const_new", " spki = { version = "0.7", default-features = false, features = ["pem", "std"] } subtle = { version = "2" } sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] } -tikv-jemalloc-sys = { version = "0.5" } +tikv-jemalloc-sys = { version = "0.6", features = ["stats"] } time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2", features = ["with-serde_json-1"] } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } tokio-stream = { version = "0.1", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }