diff --git a/.github/actions/run-python-test-set/action.yml b/.github/actions/run-python-test-set/action.yml index c6ea52ba88..a2aae0772b 100644 --- a/.github/actions/run-python-test-set/action.yml +++ b/.github/actions/run-python-test-set/action.yml @@ -114,6 +114,7 @@ runs: export PLATFORM=${PLATFORM:-github-actions-selfhosted} export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install} export DEFAULT_PG_VERSION=${PG_VERSION#v} + export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib if [ "${BUILD_TYPE}" = "remote" ]; then export REMOTE_ENV=1 diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index db4209500f..0e748adeb6 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -379,6 +379,10 @@ jobs: - name: Add Postgres binaries to PATH run: | + LD_LIBRARY_PATH="${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib" + export LD_LIBRARY_PATH + echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> $GITHUB_ENV + ${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH diff --git a/.github/workflows/build-build-tools-image.yml b/.github/workflows/build-build-tools-image.yml index 5a94dd8e6f..f1c39e7e4f 100644 --- a/.github/workflows/build-build-tools-image.yml +++ b/.github/workflows/build-build-tools-image.yml @@ -82,6 +82,7 @@ jobs: tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }} - name: Remove custom docker config directory + if: always() run: | rm -rf /tmp/.docker-custom diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index bf6b904506..e8221be168 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -345,6 +345,8 @@ jobs: - name: Run cargo build run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR ${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests - run: sccache --show-stats @@ -395,6 +397,11 @@ jobs: env: NEXTEST_RETRIES: 3 run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR + LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib + export LD_LIBRARY_PATH + #nextest does not yet support running doctests cargo test --doc $CARGO_FLAGS $CARGO_FEATURES diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index ce66aac549..a99e5684fb 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -232,12 +232,19 @@ jobs: - name: Run cargo build run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR mold -run cargo build --locked $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc) - name: Run cargo test env: NEXTEST_RETRIES: 3 run: | + PQ_LIB_DIR=$(pwd)/pg_install/v16/lib + export PQ_LIB_DIR + LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib + export LD_LIBRARY_PATH + cargo nextest run $CARGO_FEATURES -j$(nproc) # Run separate tests for real S3 diff --git a/Dockerfile b/Dockerfile index cded75ec7a..ccb870ad04 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,12 +41,13 @@ ARG AWS_SECRET_ACCESS_KEY COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server +COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib COPY --chown=nonroot . . # Show build caching stats to check if it was used in the end. # Has to be the part of the same RUN since sccache daemon is killed in the end of this RUN, losing the compilation stats. RUN set -e \ - && RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \ + && PQ_LIB_DIR=$(pwd)/pg_install/v16/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \ --bin pg_sni_router \ --bin pageserver \ --bin pagectl \ diff --git a/Dockerfile.build-tools b/Dockerfile.build-tools index 7fe1ee9102..3255258c0b 100644 --- a/Dockerfile.build-tools +++ b/Dockerfile.build-tools @@ -26,7 +26,6 @@ RUN set -e \ liblzma-dev \ libncurses5-dev \ libncursesw5-dev \ - libpq-dev \ libreadline-dev \ libseccomp-dev \ libsqlite3-dev \ diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 6634274d2a..3ac3ce21df 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -325,11 +325,16 @@ impl LocalEnv { } } - pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result { - Ok(self.pg_distrib_dir(pg_version)?.join("bin")) + pub fn pg_dir(&self, pg_version: u32, dir_name: &str) -> anyhow::Result { + Ok(self.pg_distrib_dir(pg_version)?.join(dir_name)) } + + pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result { + self.pg_dir(pg_version, "bin") + } + pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result { - Ok(self.pg_distrib_dir(pg_version)?.join("lib")) + self.pg_dir(pg_version, "lib") } pub fn pageserver_bin(&self) -> PathBuf { diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 5ca1b13b2a..47103a2e0a 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -155,16 +155,16 @@ impl StorageController { .expect("non-Unicode path") } - /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl` + /// Find the directory containing postgres subdirectories, such `bin` and `lib` /// /// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back /// to other versions if that one isn't found. Some automated tests create circumstances /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`. - pub async fn get_pg_bin_dir(&self) -> anyhow::Result { + async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result { let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14]; for v in prefer_versions { - let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap(); + let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap(); if tokio::fs::try_exists(&path).await? { return Ok(path); } @@ -172,11 +172,20 @@ impl StorageController { // Fall through anyhow::bail!( - "Postgres binaries not found in {}", - self.env.pg_distrib_dir.display() + "Postgres directory '{}' not found in {}", + dir_name, + self.env.pg_distrib_dir.display(), ); } + pub async fn get_pg_bin_dir(&self) -> anyhow::Result { + self.get_pg_dir("bin").await + } + + pub async fn get_pg_lib_dir(&self) -> anyhow::Result { + self.get_pg_dir("lib").await + } + /// Readiness check for our postgres process async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result { let bin_path = pg_bin_dir.join("pg_isready"); @@ -229,12 +238,17 @@ impl StorageController { .unwrap() .join("storage_controller_db"); let pg_bin_dir = self.get_pg_bin_dir().await?; + let pg_lib_dir = self.get_pg_lib_dir().await?; let pg_log_path = pg_data_path.join("postgres.log"); if !tokio::fs::try_exists(&pg_data_path).await? { // Initialize empty database let initdb_path = pg_bin_dir.join("initdb"); let mut child = Command::new(&initdb_path) + .envs(vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ]) .args(["-D", pg_data_path.as_ref()]) .spawn() .expect("Failed to spawn initdb"); @@ -269,7 +283,10 @@ impl StorageController { &self.env.base_data_dir, pg_bin_dir.join("pg_ctl").as_std_path(), db_start_args, - [], + vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ], background_process::InitialPidFile::Create(self.postgres_pid_file()), retry_timeout, || self.pg_isready(&pg_bin_dir), @@ -324,7 +341,10 @@ impl StorageController { &self.env.base_data_dir, &self.env.storage_controller_bin(), args, - [], + vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ], background_process::InitialPidFile::Create(self.pid_file()), retry_timeout, || async { diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 0bbb91afc2..d25b23663b 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -356,6 +356,28 @@ impl CheckPoint { } false } + + /// Advance next multi-XID/offset to those given in arguments. + /// + /// It's important that this handles wraparound correctly. This should match the + /// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function. + /// + /// Returns 'true' if the Checkpoint was updated. + pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool { + let mut modified = false; + + if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 { + self.nextMulti = multi_xid; + modified = true; + } + + if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 { + self.nextMultiOffset = multi_offset; + modified = true; + } + + modified + } } /// Generate new, empty WAL segment, with correct block headers at the first diff --git a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs index 496458b2e4..750affc94e 100644 --- a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs +++ b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs @@ -202,6 +202,53 @@ pub fn test_update_next_xid() { assert_eq!(checkpoint.nextXid.value, 2048); } +#[test] +pub fn test_update_next_multixid() { + let checkpoint_buf = [0u8; std::mem::size_of::()]; + let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); + + // simple case + checkpoint.nextMulti = 20; + checkpoint.nextMultiOffset = 20; + checkpoint.update_next_multixid(1000, 2000); + assert_eq!(checkpoint.nextMulti, 1000); + assert_eq!(checkpoint.nextMultiOffset, 2000); + + // No change + checkpoint.update_next_multixid(500, 900); + assert_eq!(checkpoint.nextMulti, 1000); + assert_eq!(checkpoint.nextMultiOffset, 2000); + + // Close to wraparound, but not wrapped around yet + checkpoint.nextMulti = 0xffff0000; + checkpoint.nextMultiOffset = 0xfffe0000; + checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff); + assert_eq!(checkpoint.nextMulti, 0xffff00ff); + assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff); + + // Wraparound + checkpoint.update_next_multixid(1, 900); + assert_eq!(checkpoint.nextMulti, 1); + assert_eq!(checkpoint.nextMultiOffset, 900); + + // Wraparound nextMulti to 0. + // + // It's a bit surprising that nextMulti can be 0, because that's a special value + // (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound: + // nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips + // the 0 and the next multi-xid actually assigned is 1. + checkpoint.nextMulti = 0xffff0000; + checkpoint.nextMultiOffset = 0xfffe0000; + checkpoint.update_next_multixid(0, 0xfffe00ff); + assert_eq!(checkpoint.nextMulti, 0); + assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff); + + // Wraparound nextMultiOffset to 0 + checkpoint.update_next_multixid(0, 0); + assert_eq!(checkpoint.nextMulti, 0); + assert_eq!(checkpoint.nextMultiOffset, 0); +} + #[test] pub fn test_encode_logical_message() { let expected = [ diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index f5aca6dfb3..9cd7ffa042 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -476,7 +476,7 @@ static STANDBY_HORIZON: Lazy = Lazy::new(|| { static RESIDENT_PHYSICAL_SIZE: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_resident_physical_size", - "The size of the layer files present in the pageserver's filesystem.", + "The size of the layer files present in the pageserver's filesystem, for attached locations.", &["tenant_id", "shard_id", "timeline_id"] ) .expect("failed to define a metric") @@ -1691,6 +1691,15 @@ pub(crate) static SECONDARY_MODE: Lazy = Lazy::new(|| { } }); +pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "pageserver_secondary_resident_physical_size", + "The size of the layer files present in the pageserver's filesystem, for secondary locations.", + &["tenant_id", "shard_id"] + ) + .expect("failed to define a metric") +}); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum RemoteOpKind { Upload, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index af6840f525..a233d11c4a 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -23,6 +23,8 @@ use super::{ storage_layer::LayerName, }; +use crate::metrics::SECONDARY_RESIDENT_PHYSICAL_SIZE; +use metrics::UIntGauge; use pageserver_api::{ models, shard::{ShardIdentity, TenantShardId}, @@ -99,6 +101,17 @@ pub(crate) struct SecondaryTenant { // Public state indicating overall progress of downloads relative to the last heatmap seen pub(crate) progress: std::sync::Mutex, + + // Sum of layer sizes on local disk + pub(super) resident_size_metric: UIntGauge, +} + +impl Drop for SecondaryTenant { + fn drop(&mut self) { + let tenant_id = self.tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", self.tenant_shard_id.shard_slug()); + let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); + } } impl SecondaryTenant { @@ -108,6 +121,12 @@ impl SecondaryTenant { tenant_conf: TenantConfOpt, config: &SecondaryLocationConfig, ) -> Arc { + let tenant_id = tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", tenant_shard_id.shard_slug()); + let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE + .get_metric_with_label_values(&[&tenant_id, &shard_id]) + .unwrap(); + Arc::new(Self { tenant_shard_id, // todo: shall we make this a descendent of the @@ -123,6 +142,8 @@ impl SecondaryTenant { detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())), progress: std::sync::Mutex::default(), + + resident_size_metric, }) } @@ -211,16 +232,12 @@ impl SecondaryTenant { // have to 100% match what is on disk, because it's a best-effort warming // of the cache. let mut detail = this.detail.lock().unwrap(); - if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) { - let removed = timeline_detail.on_disk_layers.remove(&name); - - // We might race with removal of the same layer during downloads, if it was removed - // from the heatmap. If we see that the OnDiskState is gone, then no need to - // do a physical deletion or store in evicted_at. - if let Some(removed) = removed { - removed.remove_blocking(); - timeline_detail.evicted_at.insert(name, now); - } + if let Some(removed) = + detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric) + { + // We might race with removal of the same layer during downloads, so finding the layer we + // were trying to remove is optional. Only issue the disk I/O to remove it if we found it. + removed.remove_blocking(); } }) .await diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index f6f30641db..27439d4f03 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -46,6 +46,7 @@ use crate::tenant::{ use camino::Utf8PathBuf; use chrono::format::{DelayedFormat, StrftimeItems}; use futures::Future; +use metrics::UIntGauge; use pageserver_api::models::SecondaryProgress; use pageserver_api::shard::TenantShardId; use remote_storage::{DownloadError, Etag, GenericRemoteStorage}; @@ -131,16 +132,66 @@ impl OnDiskState { .or_else(fs_ext::ignore_not_found) .fatal_err("Deleting secondary layer") } + + pub(crate) fn file_size(&self) -> u64 { + self.metadata.file_size + } } #[derive(Debug, Clone, Default)] pub(super) struct SecondaryDetailTimeline { - pub(super) on_disk_layers: HashMap, + on_disk_layers: HashMap, /// We remember when layers were evicted, to prevent re-downloading them. pub(super) evicted_at: HashMap, } +impl SecondaryDetailTimeline { + pub(super) fn remove_layer( + &mut self, + name: &LayerName, + resident_metric: &UIntGauge, + ) -> Option { + let removed = self.on_disk_layers.remove(name); + if let Some(removed) = &removed { + resident_metric.sub(removed.file_size()); + } + removed + } + + /// `local_path` + fn touch_layer( + &mut self, + conf: &'static PageServerConf, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + touched: &HeatMapLayer, + resident_metric: &UIntGauge, + local_path: F, + ) where + F: FnOnce() -> Utf8PathBuf, + { + use std::collections::hash_map::Entry; + match self.on_disk_layers.entry(touched.name.clone()) { + Entry::Occupied(mut v) => { + v.get_mut().access_time = touched.access_time; + } + Entry::Vacant(e) => { + e.insert(OnDiskState::new( + conf, + tenant_shard_id, + timeline_id, + touched.name.clone(), + touched.metadata.clone(), + touched.access_time, + local_path(), + )); + resident_metric.add(touched.metadata.file_size); + } + } + } +} + // Aspects of a heatmap that we remember after downloading it #[derive(Clone, Debug)] struct DownloadSummary { @@ -158,7 +209,7 @@ pub(super) struct SecondaryDetail { last_download: Option, next_download: Option, - pub(super) timelines: HashMap, + timelines: HashMap, } /// Helper for logging SystemTime @@ -191,6 +242,38 @@ impl SecondaryDetail { } } + pub(super) fn evict_layer( + &mut self, + name: LayerName, + timeline_id: &TimelineId, + now: SystemTime, + resident_metric: &UIntGauge, + ) -> Option { + let timeline = self.timelines.get_mut(timeline_id)?; + let removed = timeline.remove_layer(&name, resident_metric); + if removed.is_some() { + timeline.evicted_at.insert(name, now); + } + removed + } + + pub(super) fn remove_timeline( + &mut self, + timeline_id: &TimelineId, + resident_metric: &UIntGauge, + ) { + let removed = self.timelines.remove(timeline_id); + if let Some(removed) = removed { + resident_metric.sub( + removed + .on_disk_layers + .values() + .map(|l| l.metadata.file_size) + .sum(), + ); + } + } + /// Additionally returns the total number of layers, used for more stable relative access time /// based eviction. pub(super) fn get_layers_for_eviction( @@ -601,8 +684,13 @@ impl<'a> TenantDownloader<'a> { Some(t) => t, None => { // We have no existing state: need to scan local disk for layers first. - let timeline_state = - init_timeline_state(self.conf, tenant_shard_id, timeline).await; + let timeline_state = init_timeline_state( + self.conf, + tenant_shard_id, + timeline, + &self.secondary_state.resident_size_metric, + ) + .await; // Re-acquire detail lock now that we're done with async load from local FS self.secondary_state @@ -671,6 +759,25 @@ impl<'a> TenantDownloader<'a> { .await?; } + // Metrics consistency check in testing builds + if cfg!(feature = "testing") { + let detail = self.secondary_state.detail.lock().unwrap(); + let resident_size = detail + .timelines + .values() + .map(|tl| { + tl.on_disk_layers + .values() + .map(|v| v.metadata.file_size) + .sum::() + }) + .sum::(); + assert_eq!( + resident_size, + self.secondary_state.resident_size_metric.get() + ); + } + // Only update last_etag after a full successful download: this way will not skip // the next download, even if the heatmap's actual etag is unchanged. self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary { @@ -783,7 +890,7 @@ impl<'a> TenantDownloader<'a> { for delete_timeline in &delete_timelines { // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal // from disk fails that will be a fatal error. - detail.timelines.remove(delete_timeline); + detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric); } } @@ -801,7 +908,7 @@ impl<'a> TenantDownloader<'a> { let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else { continue; }; - timeline_state.on_disk_layers.remove(&layer_name); + timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric); } for timeline_id in delete_timelines { @@ -1000,33 +1107,24 @@ impl<'a> TenantDownloader<'a> { let timeline_detail = detail.timelines.entry(timeline_id).or_default(); tracing::info!("Wrote timeline_detail for {} touched layers", touched.len()); - - for t in touched { - use std::collections::hash_map::Entry; - match timeline_detail.on_disk_layers.entry(t.name.clone()) { - Entry::Occupied(mut v) => { - v.get_mut().access_time = t.access_time; - } - Entry::Vacant(e) => { - let local_path = local_layer_path( + touched.into_iter().for_each(|t| { + timeline_detail.touch_layer( + self.conf, + tenant_shard_id, + &timeline_id, + &t, + &self.secondary_state.resident_size_metric, + || { + local_layer_path( self.conf, tenant_shard_id, &timeline_id, &t.name, &t.metadata.generation, - ); - e.insert(OnDiskState::new( - self.conf, - tenant_shard_id, - &timeline_id, - t.name, - t.metadata.clone(), - t.access_time, - local_path, - )); - } - } - } + ) + }, + ) + }); } result @@ -1135,6 +1233,7 @@ async fn init_timeline_state( conf: &'static PageServerConf, tenant_shard_id: &TenantShardId, heatmap: &HeatMapTimeline, + resident_metric: &UIntGauge, ) -> SecondaryDetailTimeline { let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id); let mut detail = SecondaryDetailTimeline::default(); @@ -1210,17 +1309,13 @@ async fn init_timeline_state( } else { // We expect the access time to be initialized immediately afterwards, when // the latest heatmap is applied to the state. - detail.on_disk_layers.insert( - name.clone(), - OnDiskState::new( - conf, - tenant_shard_id, - &heatmap.timeline_id, - name, - remote_meta.metadata.clone(), - remote_meta.access_time, - file_path, - ), + detail.touch_layer( + conf, + tenant_shard_id, + &heatmap.timeline_id, + remote_meta, + resident_metric, + || file_path, ); } } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4f26f2f6d1..07c90385e6 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -343,7 +343,33 @@ impl WalIngest { xlog_checkpoint.oldestActiveXid, self.checkpoint.oldestActiveXid ); - self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid; + + // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, + // because at shutdown, all in-progress transactions will implicitly + // end. Postgres startup code knows that, and allows hot standby to start + // immediately from a shutdown checkpoint. + // + // In Neon, Postgres hot standby startup always behaves as if starting from + // an online checkpoint. It needs a valid `oldestActiveXid` value, so + // instead of overwriting self.checkpoint.oldestActiveXid with + // InvalidTransactionid from the checkpoint WAL record, update it to a + // proper value, knowing that there are no in-progress transactions at this + // point, except for prepared transactions. + // + // See also the neon code changes in the InitWalRecovery() function. + if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID + && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut oldest_active_xid = self.checkpoint.nextXid.value as u32; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 { + oldest_active_xid = xid; + } + } + self.checkpoint.oldestActiveXid = oldest_active_xid; + } else { + self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid; + } // Write a new checkpoint key-value pair on every checkpoint record, even // if nothing really changed. Not strictly required, but it seems nice to @@ -375,6 +401,7 @@ impl WalIngest { if info == pg_constants::XLOG_RUNNING_XACTS { let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf); self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid; + self.checkpoint_modified = true; } } pg_constants::RM_REPLORIGIN_ID => { @@ -1277,13 +1304,10 @@ impl WalIngest { xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db ); - // Here we treat oldestXid and oldestXidDB - // differently from postgres redo routines. - // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid - // until checkpoint happens and updates the value. - // Here we can use the most recent value. - // It's just an optimization, though and can be deleted. - // TODO Figure out if there will be any issues with replica. + // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is + // truncated, but a checkpoint record with the updated values isn't written until + // later. In Neon, a server can start at any LSN, not just on a checkpoint record, + // so we keep the oldestXid and oldestXidDB up-to-date. self.checkpoint.oldestXid = xlrec.oldest_xid; self.checkpoint.oldestXidDB = xlrec.oldest_xid_db; self.checkpoint_modified = true; @@ -1384,14 +1408,31 @@ impl WalIngest { // Note: The multixact members can wrap around, even within one WAL record. offset = offset.wrapping_add(n_this_page as u32); } - if xlrec.mid >= self.checkpoint.nextMulti { - self.checkpoint.nextMulti = xlrec.mid + 1; - self.checkpoint_modified = true; - } - if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset { - self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; + let next_offset = offset; + assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset); + + // Update next-multi-xid and next-offset + // + // NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to + // go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that + // read it, like GetNewMultiXactId(). This is different from how nextXid is + // incremented! nextXid skips over < FirstNormalTransactionId when the the value + // is stored, so it's never 0 in a checkpoint. + // + // I don't know why it's done that way, it seems less error-prone to skip over 0 + // when the value is stored rather than when it's read. But let's do it the same + // way here. + let next_multi_xid = xlrec.mid.wrapping_add(1); + + if self + .checkpoint + .update_next_multixid(next_multi_xid, next_offset) + { self.checkpoint_modified = true; } + + // Also update the next-xid with the highest member. According to the comments in + // multixact_redo(), this shouldn't be necessary, but let's do the same here. let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| { if let Some(max_xid) = acc { if mbr.xid.wrapping_sub(max_xid) as i32 > 0 { diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index b6b2db7e71..e4968bdf89 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -12,6 +12,8 @@ #include "fmgr.h" #include "miscadmin.h" +#include "access/subtrans.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog.h" #include "storage/buf_internals.h" @@ -22,10 +24,12 @@ #include "replication/logical.h" #include "replication/slot.h" #include "replication/walsender.h" +#include "storage/proc.h" #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "funcapi.h" #include "access/htup_details.h" +#include "utils/builtins.h" #include "utils/pg_lsn.h" #include "utils/guc.h" #include "utils/wait_event.h" @@ -266,6 +270,293 @@ LogicalSlotsMonitorMain(Datum main_arg) } } +/* + * XXX: These private to procarray.c, but we need them here. + */ +#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts) +#define TOTAL_MAX_CACHED_SUBXIDS \ + ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS) + +/* + * Restore running-xact information by scanning the CLOG at startup. + * + * In PostgreSQL, a standby always has to wait for a running-xacts WAL record + * to arrive before it can start accepting queries. Furthermore, if there are + * transactions with too many subxids (> 64) open to fit in the in-memory + * subxids cache, the running-xacts record will be marked as "suboverflowed", + * and the standby will need to also wait for the currently in-progress + * transactions to finish. + * + * That's not great in PostgreSQL, because a hot standby does not necessary + * open up for queries immediately as you might expect. But it's worse in + * Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint + * record; it can start at any LSN. Postgres arranges things so that there is + * a running-xacts record soon after every checkpoint record, but when you + * start from an arbitrary LSN, that doesn't help. If the primary is idle, or + * not running at all, it might never write a new running-xacts record, + * leaving the replica in a limbo where it can never start accepting queries. + * + * To mitigate that, we have an additional mechanism to find the running-xacts + * information: we scan the CLOG, making note of any XIDs not marked as + * committed or aborted. They are added to the Postgres known-assigned XIDs + * array by calling ProcArrayApplyRecoveryInfo() in the caller of this + * function. + * + * There is one big limitation with that mechanism: The size of the + * known-assigned XIDs is limited, so if there are a lot of in-progress XIDs, + * we have to give up. Furthermore, we don't know how many of the in-progress + * XIDs are subtransactions, and if we use up all the space in the + * known-assigned XIDs array for subtransactions, we might run out of space in + * the array later during WAL replay, causing the replica to shut down with + * "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to + * the known-assigned array without risking that error later is very low, + * merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up + * to half of the known-assigned XIDs array for the subtransactions, even + * though that risks getting the error later. + * + * Note: It's OK if the recovered list of XIDs includes some transactions that + * have crashed in the primary, and hence will never commit. They will be seen + * as in-progress, until we see a new next running-acts record with an + * oldestActiveXid that invalidates them. That's how the known-assigned XIDs + * array always works. + * + * If scraping the CLOG doesn't succeed for some reason, like the subxid + * overflow, Postgres will fall back to waiting for a running-xacts record + * like usual. + * + * Returns true if a complete list of in-progress XIDs was scraped. + */ +static bool +RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids) +{ + TransactionId from; + TransactionId till; + int max_xcnt; + TransactionId *prepared_xids = NULL; + int n_prepared_xids; + TransactionId *restored_xids = NULL; + int n_restored_xids; + int next_prepared_idx; + + Assert(*xids == NULL); + + /* + * If the checkpoint doesn't have a valid oldestActiveXid, bail out. We + * don't know where to start the scan. + * + * This shouldn't happen, because the pageserver always maintains a valid + * oldestActiveXid nowadays. Except when starting at an old point in time + * that was ingested before the pageserver was taught to do that. + */ + if (!TransactionIdIsValid(checkpoint->oldestActiveXid)) + { + elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set"); + goto fail; + } + + /* + * We will scan the CLOG starting from the oldest active XID. + * + * In some corner cases, the oldestActiveXid from the last checkpoint + * might already have been truncated from the CLOG. That is, + * oldestActiveXid might be older than oldestXid. That's possible because + * oldestActiveXid is only updated at checkpoints. After the last + * checkpoint, the oldest transaction might have committed, and the CLOG + * might also have been already truncated. So if oldestActiveXid is older + * than oldestXid, start at oldestXid instead. (Otherwise we'd try to + * access CLOG segments that have already been truncated away.) + */ + from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid) + ? checkpoint->oldestActiveXid : checkpoint->oldestXid; + till = XidFromFullTransactionId(checkpoint->nextXid); + + /* + * To avoid "too many KnownAssignedXids" error later during replay, we + * limit number of collected transactions. This is a tradeoff: if we are + * willing to consume more of the KnownAssignedXids space for the XIDs + * now, that allows us to start up, but we might run out of space later. + * + * The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS, + * which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In + * PostgreSQL, that's always enough because the primary will always write + * an XLOG_XACT_ASSIGNMENT record if a transaction has more than + * PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows + * the standby to mark the XIDs in pg_subtrans and removing them from the + * KnowingAssignedXids array. + * + * Here, we don't know which XIDs belong to subtransactions that have + * already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we + * wanted to be totally safe and avoid the possibility of getting a "too + * many KnownAssignedXids" error later, we would have to limit ourselves + * to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top + * transaction IDs too, because we cannot distinguish between top + * transaction IDs and subtransactions here. + * + * Somewhat arbitrarily, we use up to half of KnownAssignedXids. That + * strikes a sensible balance between being useful, and risking a "too + * many KnownAssignedXids" error later. + */ + max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2; + + /* + * Collect XIDs of prepared transactions in an array. This includes only + * their top-level XIDs. We assume that StandbyRecoverPreparedTransactions + * has already been called, so we can find all the sub-transactions in + * pg_subtrans. + */ + PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids); + qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator); + + /* + * Scan the CLOG, collecting in-progress XIDs into 'restored_xids'. + */ + elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till); + restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId)); + n_restored_xids = 0; + next_prepared_idx = 0; + for (TransactionId xid = from; xid != till;) + { + XLogRecPtr xidlsn; + XidStatus xidstatus; + + xidstatus = TransactionIdGetStatus(xid, &xidlsn); + + /* + * "Merge" the prepared transactions into the restored_xids array as + * we go. The prepared transactions array is sorted. This is mostly + * a sanity check to ensure that all the prpeared transactions are + * seen as in-progress. (There is a check after the loop that we didn't + * miss any.) + */ + if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx]) + { + /* + * This is a top-level transaction ID of a prepared transaction. + * Include it in the array. + */ + + /* sanity check */ + if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS) + { + elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG", + xid, xidstatus); + Assert(false); + goto fail; + } + + elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids); + next_prepared_idx++; + } + else if (xidstatus == TRANSACTION_STATUS_COMMITTED) + { + elog(DEBUG1, "XID %u: was committed", xid); + goto skip; + } + else if (xidstatus == TRANSACTION_STATUS_ABORTED) + { + elog(DEBUG1, "XID %u: was aborted", xid); + goto skip; + } + else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS) + { + /* + * In-progress transactions are included in the array. + * + * Except subtransactions of the prepared transactions. They are + * already set in pg_subtrans, and hence don't need to be tracked + * in the known-assigned XIDs array. + */ + if (n_prepared_xids > 0) + { + TransactionId parent = SubTransGetParent(xid); + + if (TransactionIdIsValid(parent)) + { + /* + * This is a subtransaction belonging to a prepared + * transaction. + * + * Sanity check that it is in the prepared XIDs array. It + * should be, because StandbyRecoverPreparedTransactions + * populated pg_subtrans, and no other XID should be set + * in it yet. (This also relies on the fact that + * StandbyRecoverPreparedTransactions sets the parent of + * each subxid to point directly to the top-level XID, + * rather than restoring the original subtransaction + * hierarchy.) + */ + if (bsearch(&parent, prepared_xids, next_prepared_idx, + sizeof(TransactionId), xidLogicalComparator) == NULL) + { + elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG", + xid, parent); + Assert(false); + goto fail; + } + elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent); + goto skip; + } + } + + /* include it in the array */ + elog(DEBUG1, "XID %u: is in progress", xid); + } + else + { + /* + * SUB_COMMITTED is a transient state used at commit. We don't + * expect to see that here. + */ + elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG", + xid, xidstatus); + Assert(false); + goto fail; + } + + if (n_restored_xids >= max_xcnt) + { + /* + * Overflowed. We won't be able to install the RunningTransactions + * snapshot. + */ + elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u", + checkpoint->oldestXid, checkpoint->oldestActiveXid, + XidFromFullTransactionId(checkpoint->nextXid)); + goto fail; + } + + restored_xids[n_restored_xids++] = xid; + + skip: + TransactionIdAdvance(xid); + continue; + } + + /* sanity check */ + if (next_prepared_idx != n_prepared_xids) + { + elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG", + prepared_xids[next_prepared_idx]); + Assert(false); + goto fail; + } + + elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u", + n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid)); + *nxids = n_restored_xids; + *xids = restored_xids; + return true; + + fail: + *nxids = 0; + *xids = NULL; + if (restored_xids) + pfree(restored_xids); + if (prepared_xids) + pfree(prepared_xids); + return false; +} + void _PG_init(void) { @@ -288,6 +579,8 @@ _PG_init(void) pg_init_extension_server(); + restore_running_xacts_callback = RestoreRunningXactsFromClog; + /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the diff --git a/pgxn/neon_test_utils/Makefile b/pgxn/neon_test_utils/Makefile index 1ee87357e5..1371272439 100644 --- a/pgxn/neon_test_utils/Makefile +++ b/pgxn/neon_test_utils/Makefile @@ -7,7 +7,7 @@ OBJS = \ neontest.o EXTENSION = neon_test_utils -DATA = neon_test_utils--1.1.sql +DATA = neon_test_utils--1.2.sql PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging" PG_CONFIG = pg_config diff --git a/pgxn/neon_test_utils/neon_test_utils--1.1.sql b/pgxn/neon_test_utils/neon_test_utils--1.2.sql similarity index 96% rename from pgxn/neon_test_utils/neon_test_utils--1.1.sql rename to pgxn/neon_test_utils/neon_test_utils--1.2.sql index 534784f319..f84a24ec8d 100644 --- a/pgxn/neon_test_utils/neon_test_utils--1.1.sql +++ b/pgxn/neon_test_utils/neon_test_utils--1.2.sql @@ -41,7 +41,7 @@ RETURNS bytea AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex' LANGUAGE C PARALLEL UNSAFE; -CREATE FUNCTION neon_xlogflush(lsn pg_lsn) +CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL) RETURNS VOID AS 'MODULE_PATHNAME', 'neon_xlogflush' LANGUAGE C PARALLEL UNSAFE; diff --git a/pgxn/neon_test_utils/neon_test_utils.control b/pgxn/neon_test_utils/neon_test_utils.control index 5f6d640835..c7b9191ddc 100644 --- a/pgxn/neon_test_utils/neon_test_utils.control +++ b/pgxn/neon_test_utils/neon_test_utils.control @@ -1,6 +1,6 @@ # neon_test_utils extension comment = 'helpers for neon testing and debugging' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/neon_test_utils' relocatable = true trusted = true diff --git a/pgxn/neon_test_utils/neontest.c b/pgxn/neon_test_utils/neontest.c index 47f245fbf1..944936d395 100644 --- a/pgxn/neon_test_utils/neontest.c +++ b/pgxn/neon_test_utils/neontest.c @@ -15,6 +15,7 @@ #include "access/relation.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/xlog_internal.h" #include "catalog/namespace.h" #include "fmgr.h" #include "funcapi.h" @@ -444,11 +445,46 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS) /* * Directly calls XLogFlush(lsn) to flush WAL buffers. + * + * If 'lsn' is not specified (is NULL), flush all generated WAL. */ Datum neon_xlogflush(PG_FUNCTION_ARGS) { - XLogRecPtr lsn = PG_GETARG_LSN(0); + XLogRecPtr lsn; + + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is in progress"), + errhint("cannot flush WAL during recovery."))); + + if (!PG_ARGISNULL(0)) + lsn = PG_GETARG_LSN(0); + else + { + lsn = GetXLogInsertRecPtr(); + + /*--- + * The LSN returned by GetXLogInsertRecPtr() is the position where the + * next inserted record would begin. If the last record ended just at + * the page boundary, the next record will begin after the page header + * on the next page, and that's what GetXLogInsertRecPtr().returns, + * but the page header has not been written yet. If we tried to flush + * it, XLogFlush() would throw an error: + * + * ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X + * + * To avoid that, if the insert position points to just after the page + * header, back off to page boundary. + */ + if (lsn % XLOG_BLCKSZ == SizeOfXLogShortPHD && + XLogSegmentOffset(lsn, wal_segment_size) > XLOG_BLCKSZ) + lsn -= SizeOfXLogShortPHD; + else if (lsn % XLOG_BLCKSZ == SizeOfXLogLongPHD && + XLogSegmentOffset(lsn, wal_segment_size) < XLOG_BLCKSZ) + lsn -= SizeOfXLogLongPHD; + } XLogFlush(lsn); PG_RETURN_VOID(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4911917bf4..e1c8514351 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3491,7 +3491,6 @@ class Endpoint(PgProtocol, LogUtils): ): super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres") self.env = env - self.running = False self.branch_name: Optional[str] = None # dubious self.endpoint_id: Optional[str] = None # dubious, see asserts below self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA @@ -3857,7 +3856,9 @@ class EndpointFactory: return self - def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]): + def new_replica( + self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None + ): branch_name = origin.branch_name assert origin in self.endpoints assert branch_name is not None diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 60535b7592..b75a480a63 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -198,7 +198,7 @@ def wait_for_last_record_lsn( lsn: Lsn, ) -> Lsn: """waits for pageserver to catch up to a certain lsn, returns the last observed lsn.""" - for i in range(100): + for i in range(1000): current_lsn = last_record_lsn(pageserver_http, tenant, timeline) if current_lsn >= lsn: return current_lsn diff --git a/test_runner/regress/test_next_xid.py b/test_runner/regress/test_next_xid.py index b9e7e642b5..51e847135e 100644 --- a/test_runner/regress/test_next_xid.py +++ b/test_runner/regress/test_next_xid.py @@ -7,6 +7,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, + VanillaPostgres, import_timeline_from_vanilla_postgres, wait_for_wal_insert_lsn, ) @@ -182,3 +183,275 @@ def test_import_at_2bil( cur = conn.cursor() cur.execute("SELECT count(*) from t") assert cur.fetchone() == (10000 + 1 + 1,) + + +# Constants and macros copied from PostgreSQL multixact.c and headers. These are needed to +# calculate the SLRU segments that a particular multixid or multixid-offsets falls into. +BLCKSZ = 8192 +MULTIXACT_OFFSETS_PER_PAGE = int(BLCKSZ / 4) +SLRU_PAGES_PER_SEGMENT = int(32) +MXACT_MEMBER_BITS_PER_XACT = 8 +MXACT_MEMBER_FLAGS_PER_BYTE = 1 +MULTIXACT_FLAGBYTES_PER_GROUP = 4 +MULTIXACT_MEMBERS_PER_MEMBERGROUP = MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE +MULTIXACT_MEMBERGROUP_SIZE = 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP +MULTIXACT_MEMBERGROUPS_PER_PAGE = int(BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE) +MULTIXACT_MEMBERS_PER_PAGE = MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP + + +def MultiXactIdToOffsetSegment(xid: int): + return int(xid / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_OFFSETS_PER_PAGE)) + + +def MXOffsetToMemberSegment(off: int): + return int(off / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_MEMBERS_PER_PAGE)) + + +def advance_multixid_to( + pg_bin: PgBin, vanilla_pg: VanillaPostgres, next_multi_xid: int, next_multi_offset: int +): + """ + Use pg_resetwal to advance the nextMulti and nextMultiOffset values in a stand-alone + Postgres cluster. This is useful to get close to wraparound or some other interesting + value, without having to burn a lot of time consuming the (multi-)XIDs one by one. + + The new values should be higher than the old ones, in a wraparound-aware sense. + + On entry, the server should be running. It will be shut down and restarted. + """ + + # Read old values from the last checkpoint. We will pass the old oldestMultiXid value + # back to pg_resetwal, there's no option to leave it alone. + with vanilla_pg.connect() as conn: + with conn.cursor() as cur: + # Make sure the oldest-multi-xid value in the control file is up-to-date + cur.execute("checkpoint") + cur.execute("select oldest_multi_xid, next_multixact_id from pg_control_checkpoint()") + rec = cur.fetchone() + assert rec is not None + (ckpt_oldest_multi_xid, ckpt_next_multi_xid) = rec + log.info(f"oldestMultiXid was {ckpt_oldest_multi_xid}, nextMultiXid was {ckpt_next_multi_xid}") + log.info(f"Resetting to {next_multi_xid}") + + # Use pg_resetwal to reset the next multiXid and multiOffset to given values. + vanilla_pg.stop() + pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal") + cmd = [ + pg_resetwal_path, + f"--multixact-ids={next_multi_xid},{ckpt_oldest_multi_xid}", + f"--multixact-offset={next_multi_offset}", + "-D", + str(vanilla_pg.pgdatadir), + ] + pg_bin.run_capture(cmd) + + # Because we skip over a lot of values, Postgres hasn't created the SLRU segments for + # the new values yet. Create them manually, to allow Postgres to start up. + # + # This leaves "gaps" in the SLRU where segments between old value and new value are + # missing. That's OK for our purposes. Autovacuum will print some warnings about the + # missing segments, but will clean it up by truncating the SLRUs up to the new value, + # closing the gap. + segname = "%04X" % MultiXactIdToOffsetSegment(next_multi_xid) + log.info(f"Creating dummy segment pg_multixact/offsets/{segname}") + with open(vanilla_pg.pgdatadir / "pg_multixact" / "offsets" / segname, "w") as of: + of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ) + of.flush() + + segname = "%04X" % MXOffsetToMemberSegment(next_multi_offset) + log.info(f"Creating dummy segment pg_multixact/members/{segname}") + with open(vanilla_pg.pgdatadir / "pg_multixact" / "members" / segname, "w") as of: + of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ) + of.flush() + + # Start Postgres again and wait until autovacuum has processed all the databases + # + # This allows truncating the SLRUs, fixing the gaps with missing segments. + vanilla_pg.start() + with vanilla_pg.connect().cursor() as cur: + for _ in range(1000): + datminmxid = int( + query_scalar(cur, "select min(datminmxid::text::int8) from pg_database") + ) + log.info(f"datminmxid {datminmxid}") + if next_multi_xid - datminmxid < 1_000_000: # not wraparound-aware! + break + time.sleep(0.5) + + +def test_multixid_wraparound_import( + neon_env_builder: NeonEnvBuilder, + test_output_dir: Path, + pg_bin: PgBin, + vanilla_pg, +): + """ + Test that the wraparound of the "next-multi-xid" counter is handled correctly in + pageserver, And multi-offsets as well + """ + env = neon_env_builder.init_start() + + # In order to to test multixid wraparound, we need to first advance the counter to + # within spitting distance of the wraparound, that is 2^32 multi-XIDs. We could simply + # run a workload that consumes a lot of multi-XIDs until we approach that, but that + # takes a very long time. So we cheat. + # + # Our strategy is to create a vanilla Postgres cluster, and use pg_resetwal to + # directly set the multi-xid counter a higher value. However, we cannot directly set + # it to just before 2^32 (~ 4 billion), because that would make the exisitng + # 'relminmxid' values to look like they're in the future. It's not clear how the + # system would behave in that situation. So instead, we bump it up ~ 1 billion + # multi-XIDs at a time, and let autovacuum to process all the relations and update + # 'relminmxid' between each run. + # + # XXX: For the multi-offsets, most of the bump is done in the last call. This is + # because advancing it ~ 1 billion at a time hit a pathological case in the + # MultiXactMemberFreezeThreshold() function, causing autovacuum not trigger multixid + # freezing. See + # https://www.postgresql.org/message-id/85fb354c-f89f-4d47-b3a2-3cbd461c90a3%40iki.fi + # Multi-offsets don't have the same wraparound problems at 2 billion mark as + # multi-xids do, so one big jump is fine. + vanilla_pg.configure( + [ + "log_autovacuum_min_duration = 0", + # Perform anti-wraparound vacuuming aggressively + "autovacuum_naptime='1 s'", + "autovacuum_freeze_max_age = 1000000", + "autovacuum_multixact_freeze_max_age = 1000000", + ], + ) + vanilla_pg.start() + advance_multixid_to(pg_bin, vanilla_pg, 0x40000000, 0x10000000) + advance_multixid_to(pg_bin, vanilla_pg, 0x80000000, 0x20000000) + advance_multixid_to(pg_bin, vanilla_pg, 0xC0000000, 0x30000000) + advance_multixid_to(pg_bin, vanilla_pg, 0xFFFFFF00, 0xFFFFFF00) + + vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") + vanilla_pg.safe_psql("create table tt as select g as id from generate_series(1, 10) g") + vanilla_pg.safe_psql("CHECKPOINT") + + # Import the cluster to the pageserver + tenant_id = TenantId.generate() + env.pageserver.tenant_create(tenant_id) + timeline_id = TimelineId.generate() + import_timeline_from_vanilla_postgres( + test_output_dir, + env, + pg_bin, + tenant_id, + timeline_id, + "imported_multixid_wraparound_test", + vanilla_pg.connstr(), + ) + vanilla_pg.stop() + + endpoint = env.endpoints.create_start( + "imported_multixid_wraparound_test", + tenant_id=tenant_id, + config_lines=[ + "log_autovacuum_min_duration = 0", + "autovacuum_naptime='5 s'", + "autovacuum=off", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + assert query_scalar(cur, "select count(*) from tt") == 10 # sanity check + + # Install extension containing function needed for test + cur.execute("CREATE EXTENSION neon_test_utils") + + # Consume a lot of XIDs, just to advance the XIDs to different range than the + # multi-xids. That avoids confusion while debugging + cur.execute("select test_consume_xids(100000)") + cur.execute("select pg_switch_wal()") + cur.execute("checkpoint") + + # Use subtransactions so that each row in 'tt' is stamped with different XID. Leave + # the transaction open. + cur.execute("BEGIN") + cur.execute( + """ +do $$ +declare + idvar int; +begin + for idvar in select id from tt loop + begin + update tt set id = idvar where id = idvar; + exception when others then + raise 'didn''t expect an error: %', sqlerrm; + end; + end loop; +end; +$$; +""" + ) + + # In a different transaction, acquire a FOR KEY SHARE lock on each row. This generates + # a new multixid for each row, with the previous xmax and this transaction's XID as the + # members. + # + # Repeat this until the multi-xid counter wraps around. + conn3 = endpoint.connect() + cur3 = conn3.cursor() + next_multixact_id_before_restart = 0 + observed_before_wraparound = False + while True: + cur3.execute("BEGIN") + cur3.execute("SELECT * FROM tt FOR KEY SHARE") + + # Get the xmax of one of the rows we locked. It should be a multi-xid. It might + # not be the latest one, but close enough. + row_xmax = int(query_scalar(cur3, "SELECT xmax FROM tt LIMIT 1")) + cur3.execute("COMMIT") + log.info(f"observed a row with xmax {row_xmax}") + + # High value means not wrapped around yet + if row_xmax >= 0xFFFFFF00: + observed_before_wraparound = True + continue + + # xmax should not be a regular XID. (We bumped up the regular XID range earlier + # to around 100000 and above.) + assert row_xmax < 100 + + # xmax values < FirstNormalTransactionId (== 3) could be special XID values, or + # multixid values after wraparound. We don't know for sure which, so keep going to + # be sure we see value that's unambiguously a wrapped-around multixid + if row_xmax < 3: + continue + + next_multixact_id_before_restart = row_xmax + log.info( + f"next_multixact_id is now at {next_multixact_id_before_restart} or a little higher" + ) + break + + # We should have observed the state before wraparound + assert observed_before_wraparound + + cur.execute("COMMIT") + + # Wait until pageserver has received all the data, and restart the endpoint + wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id) + endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint + endpoint.start() + + # Check that the next-multixid value wrapped around correctly + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("select next_multixact_id from pg_control_checkpoint()") + next_multixact_id_after_restart = int( + query_scalar(cur, "select next_multixact_id from pg_control_checkpoint()") + ) + log.info(f"next_multixact_id after restart: {next_multixact_id_after_restart}") + assert next_multixact_id_after_restart >= next_multixact_id_before_restart + + # The multi-offset should wrap around as well + cur.execute("select next_multi_offset from pg_control_checkpoint()") + next_multi_offset_after_restart = int( + query_scalar(cur, "select next_multi_offset from pg_control_checkpoint()") + ) + log.info(f"next_multi_offset after restart: {next_multi_offset_after_restart}") + assert next_multi_offset_after_restart < 100000 diff --git a/test_runner/regress/test_replica_start.py b/test_runner/regress/test_replica_start.py new file mode 100644 index 0000000000..17d476a8a6 --- /dev/null +++ b/test_runner/regress/test_replica_start.py @@ -0,0 +1,646 @@ +""" +In PostgreSQL, a standby always has to wait for a running-xacts WAL record to +arrive before it can start accepting queries. Furthermore, if there are +transactions with too many subxids (> 64) open to fit in the in-memory subxids +cache, the running-xacts record will be marked as "suboverflowed", and the +standby will need to also wait for the currently in-progress transactions to +finish. + +In Neon, we have an additional mechanism that scans the CLOG at server startup +to determine the list of running transactions, so that the standby can start up +immediately without waiting for the running-xacts record, but that mechanism +only works if the # of active (sub-)transactions is reasonably small. Otherwise +it falls back to waiting. Furthermore, it's somewhat optimistic in using up the +known-assigned XIDs array: if too many transactions with subxids are started in +the primary later, the replay in the replica will crash with "too many +KnownAssignedXids" error. + +This module contains tests for those various cases at standby startup: starting +from shutdown checkpoint, using the CLOG scanning mechanism, waiting for +running-xacts record and for in-progress transactions to finish etc. +""" + +import threading +from contextlib import closing + +import psycopg2 +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup +from fixtures.pg_version import PgVersion +from fixtures.utils import query_scalar, wait_until + +CREATE_SUBXACTS_FUNC = """ +create or replace function create_subxacts(n integer) returns void as $$ +declare + i integer; +begin + for i in 1..n loop + begin + insert into t (payload) values (0); + exception + when others then + raise exception 'caught something: %', sqlerrm; + end; + end loop; +end; $$ language plpgsql +""" + + +def test_replica_start_scan_clog(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup. There is one + transaction active in the primary when the standby is started. The primary + is killed before it has a chance to write a running-xacts record. The + CLOG-scanning at neon startup allows the standby to start up anyway. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + primary_cur.execute("select pg_switch_wal()") + + # Start a transaction in the primary. Leave the transaction open. + # + # The transaction has some subtransactions, but not too many to cause the + # CLOG-scanning mechanism to give up. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(50)") + + # Wait for the WAL to be flushed, but then immediately kill the primary, + # before it has a chance to generate a running-xacts record. + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + primary.stop(mode="immediate") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (0,) + + +def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup, after + leaving behind crashed transactions. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + primary_cur.execute("select pg_switch_wal()") + + # Consume a lot of XIDs, then kill Postgres without giving it a + # chance to write abort records for them. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(100000)") + primary.stop(mode="immediate") + + # Restart the primary. Do some light work, and shut it down cleanly + primary.start() + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("insert into t (payload) values (0)") + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. (Restarting the primary writes a checkpoint and/or running-xacts + # record, which allows the standby to know that the crashed XIDs are aborted) + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (1,) + + +def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version): + """ + Test that starting a replica works right after the primary has + created a running-xacts record. This may seem like a trivial case, + but during development, we had a bug that was triggered by having + oldestActiveXid == nextXid. Starting right after a running-xacts + record is one way to test that case. + + See the module docstring for background. + """ + env = neon_simple_env + + if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15: + pytest.skip("pg_log_standby_snapshot() function is available only in PG16") + + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("select pg_log_standby_snapshot()") + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select 123") + assert secondary_cur.fetchone() == (123,) + + +def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv): + """ + Test replica startup when there are a lot of (sub)transactions active in the + primary. That's too many for the CLOG-scanning mechanism to handle, so the + replica has to wait for the large transaction to finish before it starts to + accept queries. + + After replica startup, test MVCC with transactions that were in-progress + when the replica was started. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create + # lots of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Start a transaction with 100000 subtransactions, and leave it open. That's + # too many to fit in the "known-assigned XIDs array" in the replica, and + # also too many to fit in the subxid caches so the running-xacts record will + # also overflow. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(100000)") + + # Start another, smaller transaction in the primary. We'll come back to this + # later. + primary_conn2 = primary.connect() + primary_cur2 = primary_conn2.cursor() + primary_cur2.execute("begin") + primary_cur2.execute("insert into t (payload) values (0)") + + # Create a replica. but before that, wait for the wal to be flushed to + # safekeepers, so that the replica is started at a point where the large + # transaction is already active. (The whole transaction might not be flushed + # yet, but that's OK.) + # + # Start it in a separate thread, so that we can do other stuff while it's + # blocked waiting for the startup to finish. + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary") + start_secondary_thread = threading.Thread(target=secondary.start) + start_secondary_thread.start() + + # Verify that the replica has otherwise started up, but cannot start + # accepting queries yet. + log.info("Waiting 5 s to verify that the secondary does not start") + start_secondary_thread.join(5) + assert secondary.log_contains("consistent recovery state reached") + assert secondary.log_contains("started streaming WAL from primary") + # The "redo starts" message is printed when the first WAL record is + # received. It might or might not be present in the log depending on how + # far exactly the WAL was flushed when the replica was started, and whether + # background activity caused any more WAL records to be flushed on the + # primary afterwards. + # + # assert secondary.log_contains("redo # starts") + + # should not be open for connections yet + assert start_secondary_thread.is_alive() + assert not secondary.is_running() + assert not secondary.log_contains("database system is ready to accept read-only connections") + + # Commit the large transaction in the primary. + # + # Within the next 15 s, the primary should write a new running-xacts record + # to the WAL which shows the transaction as completed. Once the replica + # replays that record, it will start accepting queries. + primary_cur.execute("commit") + start_secondary_thread.join() + + # Verify that the large transaction is correctly visible in the secondary + # (but not the second, small transaction, which is still in-progress!) + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + # Perform some more MVCC testing using the second transaction that was + # started in the primary before the replica was created + primary_cur2.execute("select create_subxacts(10000)") + + # The second transaction still hasn't committed + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ") + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + # Commit the second transaction in the primary + primary_cur2.execute("commit") + + # Should still be invisible to the old snapshot + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + # Commit the REPEATABLE READ transaction in the replica. Both + # primary transactions should now be visible to a new snapshot. + secondary_cur.execute("commit") + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (110001,) + + +def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv): + """ + The CLOG-scanning mechanism fills the known-assigned XIDs array + optimistically at standby startup, betting that it can still fit + upcoming transactions replayed later from the WAL in the + array. This test tests what happens when that bet fails and the + known-assigned XID array fills up after the standby has already + been started. The WAL redo will fail with an error: + + FATAL: too many KnownAssignedXids + CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64 + + which causes the standby to shut down. + + See the module docstring for background. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Determine how many connections we can use + primary_cur.execute("show max_connections") + max_connections = int(primary_cur.fetchall()[0][0]) + primary_cur.execute("show superuser_reserved_connections") + superuser_reserved_connections = int(primary_cur.fetchall()[0][0]) + n_connections = max_connections - superuser_reserved_connections + n_subxids = 200 + + # Start one top transaction in primary, with lots of subtransactions. This + # uses up much of the known-assigned XIDs space in the standby, but doesn't + # cause it to overflow. + large_p_conn = primary.connect() + large_p_cur = large_p_conn.cursor() + large_p_cur.execute("begin") + large_p_cur.execute(f"select create_subxacts({max_connections} * 30)") + + with closing(primary.connect()) as small_p_conn: + with small_p_conn.cursor() as small_p_cur: + small_p_cur.execute("select create_subxacts(1)") + + # Create a replica at this LSN + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + + # The transaction in primary has not committed yet. + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (1,) + + # Start max number of top transactions in primary, with a lot of + # subtransactions each. We add the subtransactions to each top transaction + # in a round-robin fashion, instead of adding a lot of subtransactions to + # one top transaction at a time. This way, we will have the max number of + # subtransactions in the in-memory subxid cache of each top transaction, + # until they all overflow. + # + # Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all + # the subxid caches after creating 64 subxids in each top transaction. The + # point just before the caches have overflowed is the most interesting point + # in time, but we'll keep going beyond that, to ensure that this test is + # robust even if PGPROC_MAX_CACHED_SUBXIDS changes. + p_curs = [] + for _ in range(0, n_connections): + p_cur = primary.connect().cursor() + p_cur.execute("begin") + p_curs.append(p_cur) + + for _subxid in range(0, n_subxids): + for i in range(0, n_connections): + p_curs[i].execute("select create_subxacts(1)") + + # Commit all the transactions in the primary + for i in range(0, n_connections): + p_curs[i].execute("commit") + large_p_cur.execute("commit") + + # Wait until the replica crashes with "too many KnownAssignedXids" error. + def check_replica_crashed(): + try: + secondary.connect() + except psycopg2.Error: + # Once the connection fails, return success + return None + raise RuntimeError("connection succeeded") + + wait_until(20, 0.5, check_replica_crashed) + assert secondary.log_contains("too many KnownAssignedXids") + + # Replica is crashed, so ignore stop result + secondary.check_stop_result = False + + +def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv): + """ + Before PR #7288, a hot standby in neon incorrectly started up + immediately, before it had received a running-xacts record. That + led to visibility bugs if there were active transactions in the + primary. This test reproduces the incorrect query results and + incorrectly set hint bits, before that was fixed. + """ + env = neon_simple_env + + primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary") + p_cur = primary.connect().cursor() + + p_cur.execute("begin") + p_cur.execute("create table t(pk integer primary key, payload integer)") + p_cur.execute("insert into t values (generate_series(1,100000), 0)") + + secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") + wait_replica_caughtup(primary, secondary) + s_cur = secondary.connect().cursor() + + # Set hint bits for pg_class tuples. If primary's transaction is + # not marked as in-progress in MVCC snapshot, then XMIN_INVALID + # hint bit will be set for table's 't' tuple, making it invisible + # even after the commit record is replayed later. + s_cur.execute("select * from pg_class") + + p_cur.execute("commit") + wait_replica_caughtup(primary, secondary) + s_cur.execute("select * from t where pk = 1") + assert s_cur.fetchone() == (1, 0) + + +@pytest.mark.parametrize("shutdown", [True, False]) +def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool): + """ + Test the CLOG-scanning mechanism at hot standby startup in the presence of + prepared transactions. + + This test is run in two variants: one where the primary server is shut down + before starting the secondary, or not. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start( + branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"] + ) + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("CREATE EXTENSION neon_test_utils") + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute("create table t1(pk integer primary key)") + primary_cur.execute("create table t2(pk integer primary key)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Prepare a transaction for two-phase commit + primary_cur.execute("begin") + primary_cur.execute("insert into t1 values (1)") + primary_cur.execute("prepare transaction 't1'") + + # Prepare another transaction for two-phase commit, with a subtransaction + primary_cur.execute("begin") + primary_cur.execute("insert into t2 values (2)") + primary_cur.execute("savepoint sp") + primary_cur.execute("insert into t2 values (3)") + primary_cur.execute("prepare transaction 't2'") + + # Start a transaction in the primary. Leave the transaction open. + # + # The transaction has some subtransactions, but not too many to cause the + # CLOG-scanning mechanism to give up. + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(50)") + + # Wait for the WAL to be flushed + primary_cur.execute("select neon_xlogflush()") + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + + if shutdown: + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start( + origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"] + ) + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (0,) + secondary_cur.execute("select count(*) from t1") + assert secondary_cur.fetchone() == (0,) + secondary_cur.execute("select count(*) from t2") + assert secondary_cur.fetchone() == (0,) + + if shutdown: + primary.start() + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + else: + primary_cur.execute("commit") + primary_cur.execute("commit prepared 't1'") + primary_cur.execute("commit prepared 't2'") + + wait_replica_caughtup(primary, secondary) + + secondary_cur.execute("select count(*) from t") + if shutdown: + assert secondary_cur.fetchone() == (0,) + else: + assert secondary_cur.fetchone() == (50,) + secondary_cur.execute("select * from t1") + assert secondary_cur.fetchall() == [(1,)] + secondary_cur.execute("select * from t2") + assert secondary_cur.fetchall() == [(2,), (3,)] + + +def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup in the presence of + prepared transactions, with subtransactions. + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start( + branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"] + ) + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + + # Install extension containing function needed for test + primary_cur.execute("CREATE EXTENSION neon_test_utils") + + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs) + # + # This is interesting, because it tests that pg_subtrans is initialized correctly + # at standby startup. (We had a bug where it didn't at one point during development.) + while True: + xid = int(query_scalar(primary_cur, "SELECT txid_current()")) + log.info(f"xid now {xid}") + # Consume 500 transactions at a time until we get close + if xid < 65535 - 600: + primary_cur.execute("select test_consume_xids(500);") + else: + break + primary_cur.execute("checkpoint") + + # Prepare a transaction for two-phase commit + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(1000)") + primary_cur.execute("prepare transaction 't1'") + + # Wait for the WAL to be flushed, and stop the primary + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start( + origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"] + ) + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (0,) + + primary.start() + + # Open a lot of subtransactions in the primary, causing the subxids cache to overflow + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("select create_subxacts(100000)") + + wait_replica_caughtup(primary, secondary) + + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100000,) + + primary_cur.execute("commit prepared 't1'") + + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (101000,) + + +def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv): + """ + Test the CLOG-scanning mechanism at hot standby startup in the presence of + prepared transactions, with lots of subtransactions. + + Like test_replica_start_with_prepared_xacts_with_subxacts, but with more + subxacts, to test that the prepared transaction's subxids don't consume + space in the known-assigned XIDs array. (They are set in pg_subtrans + instead) + """ + + # Initialize the primary, a test table, and a helper function to create lots + # of subtransactions. + env = neon_simple_env + primary = env.endpoints.create_start( + branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"] + ) + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + + # Install extension containing function needed for test + primary_cur.execute("CREATE EXTENSION neon_test_utils") + + primary_cur.execute("create table t(pk serial primary key, payload integer)") + primary_cur.execute(CREATE_SUBXACTS_FUNC) + + # Prepare a transaction for two-phase commit, with lots of subxids + primary_cur.execute("begin") + primary_cur.execute("select create_subxacts(50000)") + + # to make things a bit more varied, intersperse a few other XIDs in between + # the prepared transaction's sub-XIDs + with primary.connect().cursor() as primary_cur2: + primary_cur2.execute("insert into t (payload) values (123)") + primary_cur2.execute("begin; insert into t (payload) values (-1); rollback") + + primary_cur.execute("select create_subxacts(50000)") + primary_cur.execute("prepare transaction 't1'") + + # Wait for the WAL to be flushed + wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline) + + primary.stop(mode="fast") + + # Create a replica. It should start up normally, thanks to the CLOG-scanning + # mechanism. + secondary = env.endpoints.new_replica_start( + origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"] + ) + + # The transaction did not commit, so it should not be visible in the secondary + secondary_conn = secondary.connect() + secondary_cur = secondary_conn.cursor() + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (1,) + + primary.start() + + # Open a lot of subtransactions in the primary, causing the subxids cache to overflow + primary_conn = primary.connect() + primary_cur = primary_conn.cursor() + primary_cur.execute("select create_subxacts(100000)") + + wait_replica_caughtup(primary, secondary) + + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (100001,) + + primary_cur.execute("commit prepared 't1'") + + wait_replica_caughtup(primary, secondary) + secondary_cur.execute("select count(*) from t") + assert secondary_cur.fetchone() == (200001,) diff --git a/test_runner/regress/test_replication_start.py b/test_runner/regress/test_replication_start.py deleted file mode 100644 index 2360745990..0000000000 --- a/test_runner/regress/test_replication_start.py +++ /dev/null @@ -1,32 +0,0 @@ -import pytest -from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup - - -@pytest.mark.xfail -def test_replication_start(neon_simple_env: NeonEnv): - env = neon_simple_env - - with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary: - with primary.connect() as p_con: - with p_con.cursor() as p_cur: - p_cur.execute("begin") - p_cur.execute("create table t(pk integer primary key, payload integer)") - p_cur.execute("insert into t values (generate_series(1,100000), 0)") - p_cur.execute("select txid_current()") - xid = p_cur.fetchall()[0][0] - log.info(f"Master transaction {xid}") - with env.endpoints.new_replica_start( - origin=primary, endpoint_id="secondary" - ) as secondary: - wait_replica_caughtup(primary, secondary) - with secondary.connect() as s_con: - with s_con.cursor() as s_cur: - # Enforce setting hint bits for pg_class tuples. - # If master's transaction is not marked as in-progress in MVCC snapshot, - # then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible. - s_cur.execute("select * from pg_class") - p_cur.execute("commit") - wait_replica_caughtup(primary, secondary) - s_cur.execute("select * from t where pk = 1") - assert s_cur.fetchone() == (1, 0) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 223dd92595..ad73770c44 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 223dd925959f8124711dd3d867dc8ba6629d52c0 +Subproject commit ad73770c446ea361f43e4f0404798b7e5e7a62d8 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index f54d7373eb..4874c8e52e 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit f54d7373eb0de5a54bce2becdb1c801026c7edff +Subproject commit 4874c8e52ed349a9f8290bbdcd91eb92677a5d24 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index e06bebc753..b810fdfcbb 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit e06bebc75306b583e758b52c95946d41109239b2 +Subproject commit b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2 diff --git a/vendor/revisions.json b/vendor/revisions.json index 574e371934..da49ff19c3 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"], - "v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"], - "v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"] + "v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"], + "v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"], + "v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"] }