mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Merge branch 'main' into bayandin/replace-cachepot-with-sccache
This commit is contained in:
@@ -114,6 +114,7 @@ runs:
|
||||
export PLATFORM=${PLATFORM:-github-actions-selfhosted}
|
||||
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install}
|
||||
export DEFAULT_PG_VERSION=${PG_VERSION#v}
|
||||
export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib
|
||||
|
||||
if [ "${BUILD_TYPE}" = "remote" ]; then
|
||||
export REMOTE_ENV=1
|
||||
|
||||
4
.github/workflows/benchmarking.yml
vendored
4
.github/workflows/benchmarking.yml
vendored
@@ -379,6 +379,10 @@ jobs:
|
||||
|
||||
- name: Add Postgres binaries to PATH
|
||||
run: |
|
||||
LD_LIBRARY_PATH="${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib"
|
||||
export LD_LIBRARY_PATH
|
||||
echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" >> $GITHUB_ENV
|
||||
|
||||
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
|
||||
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
|
||||
|
||||
|
||||
@@ -82,6 +82,7 @@ jobs:
|
||||
tags: neondatabase/build-tools:${{ inputs.image-tag }}-${{ matrix.arch }}
|
||||
|
||||
- name: Remove custom docker config directory
|
||||
if: always()
|
||||
run: |
|
||||
rm -rf /tmp/.docker-custom
|
||||
|
||||
|
||||
7
.github/workflows/build_and_test.yml
vendored
7
.github/workflows/build_and_test.yml
vendored
@@ -345,6 +345,8 @@ jobs:
|
||||
|
||||
- name: Run cargo build
|
||||
run: |
|
||||
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
|
||||
export PQ_LIB_DIR
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
|
||||
|
||||
- run: sccache --show-stats
|
||||
@@ -395,6 +397,11 @@ jobs:
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
|
||||
export PQ_LIB_DIR
|
||||
LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib
|
||||
export LD_LIBRARY_PATH
|
||||
|
||||
#nextest does not yet support running doctests
|
||||
cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
|
||||
7
.github/workflows/neon_extra_builds.yml
vendored
7
.github/workflows/neon_extra_builds.yml
vendored
@@ -232,12 +232,19 @@ jobs:
|
||||
|
||||
- name: Run cargo build
|
||||
run: |
|
||||
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
|
||||
export PQ_LIB_DIR
|
||||
mold -run cargo build --locked $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc)
|
||||
|
||||
- name: Run cargo test
|
||||
env:
|
||||
NEXTEST_RETRIES: 3
|
||||
run: |
|
||||
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
|
||||
export PQ_LIB_DIR
|
||||
LD_LIBRARY_PATH=$(pwd)/pg_install/v16/lib
|
||||
export LD_LIBRARY_PATH
|
||||
|
||||
cargo nextest run $CARGO_FEATURES -j$(nproc)
|
||||
|
||||
# Run separate tests for real S3
|
||||
|
||||
@@ -41,12 +41,13 @@ ARG AWS_SECRET_ACCESS_KEY
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
# Show build caching stats to check if it was used in the end.
|
||||
# Has to be the part of the same RUN since sccache daemon is killed in the end of this RUN, losing the compilation stats.
|
||||
RUN set -e \
|
||||
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
|
||||
&& PQ_LIB_DIR=$(pwd)/pg_install/v16/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
|
||||
--bin pg_sni_router \
|
||||
--bin pageserver \
|
||||
--bin pagectl \
|
||||
|
||||
@@ -26,7 +26,6 @@ RUN set -e \
|
||||
liblzma-dev \
|
||||
libncurses5-dev \
|
||||
libncursesw5-dev \
|
||||
libpq-dev \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
libsqlite3-dev \
|
||||
|
||||
@@ -325,11 +325,16 @@ impl LocalEnv {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
|
||||
pub fn pg_dir(&self, pg_version: u32, dir_name: &str) -> anyhow::Result<PathBuf> {
|
||||
Ok(self.pg_distrib_dir(pg_version)?.join(dir_name))
|
||||
}
|
||||
|
||||
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
self.pg_dir(pg_version, "bin")
|
||||
}
|
||||
|
||||
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
|
||||
self.pg_dir(pg_version, "lib")
|
||||
}
|
||||
|
||||
pub fn pageserver_bin(&self) -> PathBuf {
|
||||
|
||||
@@ -155,16 +155,16 @@ impl StorageController {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
/// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
|
||||
/// Find the directory containing postgres subdirectories, such `bin` and `lib`
|
||||
///
|
||||
/// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
|
||||
/// to other versions if that one isn't found. Some automated tests create circumstances
|
||||
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
|
||||
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
|
||||
async fn get_pg_dir(&self, dir_name: &str) -> anyhow::Result<Utf8PathBuf> {
|
||||
let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
|
||||
|
||||
for v in prefer_versions {
|
||||
let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
|
||||
let path = Utf8PathBuf::from_path_buf(self.env.pg_dir(v, dir_name)?).unwrap();
|
||||
if tokio::fs::try_exists(&path).await? {
|
||||
return Ok(path);
|
||||
}
|
||||
@@ -172,11 +172,20 @@ impl StorageController {
|
||||
|
||||
// Fall through
|
||||
anyhow::bail!(
|
||||
"Postgres binaries not found in {}",
|
||||
self.env.pg_distrib_dir.display()
|
||||
"Postgres directory '{}' not found in {}",
|
||||
dir_name,
|
||||
self.env.pg_distrib_dir.display(),
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
|
||||
self.get_pg_dir("bin").await
|
||||
}
|
||||
|
||||
pub async fn get_pg_lib_dir(&self) -> anyhow::Result<Utf8PathBuf> {
|
||||
self.get_pg_dir("lib").await
|
||||
}
|
||||
|
||||
/// Readiness check for our postgres process
|
||||
async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
|
||||
let bin_path = pg_bin_dir.join("pg_isready");
|
||||
@@ -229,12 +238,17 @@ impl StorageController {
|
||||
.unwrap()
|
||||
.join("storage_controller_db");
|
||||
let pg_bin_dir = self.get_pg_bin_dir().await?;
|
||||
let pg_lib_dir = self.get_pg_lib_dir().await?;
|
||||
let pg_log_path = pg_data_path.join("postgres.log");
|
||||
|
||||
if !tokio::fs::try_exists(&pg_data_path).await? {
|
||||
// Initialize empty database
|
||||
let initdb_path = pg_bin_dir.join("initdb");
|
||||
let mut child = Command::new(&initdb_path)
|
||||
.envs(vec![
|
||||
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
|
||||
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
|
||||
])
|
||||
.args(["-D", pg_data_path.as_ref()])
|
||||
.spawn()
|
||||
.expect("Failed to spawn initdb");
|
||||
@@ -269,7 +283,10 @@ impl StorageController {
|
||||
&self.env.base_data_dir,
|
||||
pg_bin_dir.join("pg_ctl").as_std_path(),
|
||||
db_start_args,
|
||||
[],
|
||||
vec![
|
||||
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
|
||||
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
|
||||
],
|
||||
background_process::InitialPidFile::Create(self.postgres_pid_file()),
|
||||
retry_timeout,
|
||||
|| self.pg_isready(&pg_bin_dir),
|
||||
@@ -324,7 +341,10 @@ impl StorageController {
|
||||
&self.env.base_data_dir,
|
||||
&self.env.storage_controller_bin(),
|
||||
args,
|
||||
[],
|
||||
vec![
|
||||
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
|
||||
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
|
||||
],
|
||||
background_process::InitialPidFile::Create(self.pid_file()),
|
||||
retry_timeout,
|
||||
|| async {
|
||||
|
||||
@@ -356,6 +356,28 @@ impl CheckPoint {
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Advance next multi-XID/offset to those given in arguments.
|
||||
///
|
||||
/// It's important that this handles wraparound correctly. This should match the
|
||||
/// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
|
||||
///
|
||||
/// Returns 'true' if the Checkpoint was updated.
|
||||
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
|
||||
let mut modified = false;
|
||||
|
||||
if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
|
||||
self.nextMulti = multi_xid;
|
||||
modified = true;
|
||||
}
|
||||
|
||||
if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
|
||||
self.nextMultiOffset = multi_offset;
|
||||
modified = true;
|
||||
}
|
||||
|
||||
modified
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate new, empty WAL segment, with correct block headers at the first
|
||||
|
||||
@@ -202,6 +202,53 @@ pub fn test_update_next_xid() {
|
||||
assert_eq!(checkpoint.nextXid.value, 2048);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_update_next_multixid() {
|
||||
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
|
||||
|
||||
// simple case
|
||||
checkpoint.nextMulti = 20;
|
||||
checkpoint.nextMultiOffset = 20;
|
||||
checkpoint.update_next_multixid(1000, 2000);
|
||||
assert_eq!(checkpoint.nextMulti, 1000);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 2000);
|
||||
|
||||
// No change
|
||||
checkpoint.update_next_multixid(500, 900);
|
||||
assert_eq!(checkpoint.nextMulti, 1000);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 2000);
|
||||
|
||||
// Close to wraparound, but not wrapped around yet
|
||||
checkpoint.nextMulti = 0xffff0000;
|
||||
checkpoint.nextMultiOffset = 0xfffe0000;
|
||||
checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
|
||||
assert_eq!(checkpoint.nextMulti, 0xffff00ff);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
|
||||
|
||||
// Wraparound
|
||||
checkpoint.update_next_multixid(1, 900);
|
||||
assert_eq!(checkpoint.nextMulti, 1);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 900);
|
||||
|
||||
// Wraparound nextMulti to 0.
|
||||
//
|
||||
// It's a bit surprising that nextMulti can be 0, because that's a special value
|
||||
// (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
|
||||
// nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
|
||||
// the 0 and the next multi-xid actually assigned is 1.
|
||||
checkpoint.nextMulti = 0xffff0000;
|
||||
checkpoint.nextMultiOffset = 0xfffe0000;
|
||||
checkpoint.update_next_multixid(0, 0xfffe00ff);
|
||||
assert_eq!(checkpoint.nextMulti, 0);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
|
||||
|
||||
// Wraparound nextMultiOffset to 0
|
||||
checkpoint.update_next_multixid(0, 0);
|
||||
assert_eq!(checkpoint.nextMulti, 0);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_encode_logical_message() {
|
||||
let expected = [
|
||||
|
||||
@@ -476,7 +476,7 @@ static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_resident_physical_size",
|
||||
"The size of the layer files present in the pageserver's filesystem.",
|
||||
"The size of the layer files present in the pageserver's filesystem, for attached locations.",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -1691,6 +1691,15 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_secondary_resident_physical_size",
|
||||
"The size of the layer files present in the pageserver's filesystem, for secondary locations.",
|
||||
&["tenant_id", "shard_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum RemoteOpKind {
|
||||
Upload,
|
||||
|
||||
@@ -23,6 +23,8 @@ use super::{
|
||||
storage_layer::LayerName,
|
||||
};
|
||||
|
||||
use crate::metrics::SECONDARY_RESIDENT_PHYSICAL_SIZE;
|
||||
use metrics::UIntGauge;
|
||||
use pageserver_api::{
|
||||
models,
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -99,6 +101,17 @@ pub(crate) struct SecondaryTenant {
|
||||
|
||||
// Public state indicating overall progress of downloads relative to the last heatmap seen
|
||||
pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
|
||||
|
||||
// Sum of layer sizes on local disk
|
||||
pub(super) resident_size_metric: UIntGauge,
|
||||
}
|
||||
|
||||
impl Drop for SecondaryTenant {
|
||||
fn drop(&mut self) {
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
}
|
||||
}
|
||||
|
||||
impl SecondaryTenant {
|
||||
@@ -108,6 +121,12 @@ impl SecondaryTenant {
|
||||
tenant_conf: TenantConfOpt,
|
||||
config: &SecondaryLocationConfig,
|
||||
) -> Arc<Self> {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id])
|
||||
.unwrap();
|
||||
|
||||
Arc::new(Self {
|
||||
tenant_shard_id,
|
||||
// todo: shall we make this a descendent of the
|
||||
@@ -123,6 +142,8 @@ impl SecondaryTenant {
|
||||
detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
|
||||
|
||||
progress: std::sync::Mutex::default(),
|
||||
|
||||
resident_size_metric,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -211,16 +232,12 @@ impl SecondaryTenant {
|
||||
// have to 100% match what is on disk, because it's a best-effort warming
|
||||
// of the cache.
|
||||
let mut detail = this.detail.lock().unwrap();
|
||||
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
|
||||
let removed = timeline_detail.on_disk_layers.remove(&name);
|
||||
|
||||
// We might race with removal of the same layer during downloads, if it was removed
|
||||
// from the heatmap. If we see that the OnDiskState is gone, then no need to
|
||||
// do a physical deletion or store in evicted_at.
|
||||
if let Some(removed) = removed {
|
||||
removed.remove_blocking();
|
||||
timeline_detail.evicted_at.insert(name, now);
|
||||
}
|
||||
if let Some(removed) =
|
||||
detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric)
|
||||
{
|
||||
// We might race with removal of the same layer during downloads, so finding the layer we
|
||||
// were trying to remove is optional. Only issue the disk I/O to remove it if we found it.
|
||||
removed.remove_blocking();
|
||||
}
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -46,6 +46,7 @@ use crate::tenant::{
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
use metrics::UIntGauge;
|
||||
use pageserver_api::models::SecondaryProgress;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
|
||||
@@ -131,16 +132,66 @@ impl OnDiskState {
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err("Deleting secondary layer")
|
||||
}
|
||||
|
||||
pub(crate) fn file_size(&self) -> u64 {
|
||||
self.metadata.file_size
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
pub(super) on_disk_layers: HashMap<LayerName, OnDiskState>,
|
||||
on_disk_layers: HashMap<LayerName, OnDiskState>,
|
||||
|
||||
/// We remember when layers were evicted, to prevent re-downloading them.
|
||||
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
|
||||
}
|
||||
|
||||
impl SecondaryDetailTimeline {
|
||||
pub(super) fn remove_layer(
|
||||
&mut self,
|
||||
name: &LayerName,
|
||||
resident_metric: &UIntGauge,
|
||||
) -> Option<OnDiskState> {
|
||||
let removed = self.on_disk_layers.remove(name);
|
||||
if let Some(removed) = &removed {
|
||||
resident_metric.sub(removed.file_size());
|
||||
}
|
||||
removed
|
||||
}
|
||||
|
||||
/// `local_path`
|
||||
fn touch_layer<F>(
|
||||
&mut self,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
touched: &HeatMapLayer,
|
||||
resident_metric: &UIntGauge,
|
||||
local_path: F,
|
||||
) where
|
||||
F: FnOnce() -> Utf8PathBuf,
|
||||
{
|
||||
use std::collections::hash_map::Entry;
|
||||
match self.on_disk_layers.entry(touched.name.clone()) {
|
||||
Entry::Occupied(mut v) => {
|
||||
v.get_mut().access_time = touched.access_time;
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
e.insert(OnDiskState::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
touched.name.clone(),
|
||||
touched.metadata.clone(),
|
||||
touched.access_time,
|
||||
local_path(),
|
||||
));
|
||||
resident_metric.add(touched.metadata.file_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Aspects of a heatmap that we remember after downloading it
|
||||
#[derive(Clone, Debug)]
|
||||
struct DownloadSummary {
|
||||
@@ -158,7 +209,7 @@ pub(super) struct SecondaryDetail {
|
||||
|
||||
last_download: Option<DownloadSummary>,
|
||||
next_download: Option<Instant>,
|
||||
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
|
||||
timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
|
||||
}
|
||||
|
||||
/// Helper for logging SystemTime
|
||||
@@ -191,6 +242,38 @@ impl SecondaryDetail {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn evict_layer(
|
||||
&mut self,
|
||||
name: LayerName,
|
||||
timeline_id: &TimelineId,
|
||||
now: SystemTime,
|
||||
resident_metric: &UIntGauge,
|
||||
) -> Option<OnDiskState> {
|
||||
let timeline = self.timelines.get_mut(timeline_id)?;
|
||||
let removed = timeline.remove_layer(&name, resident_metric);
|
||||
if removed.is_some() {
|
||||
timeline.evicted_at.insert(name, now);
|
||||
}
|
||||
removed
|
||||
}
|
||||
|
||||
pub(super) fn remove_timeline(
|
||||
&mut self,
|
||||
timeline_id: &TimelineId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
let removed = self.timelines.remove(timeline_id);
|
||||
if let Some(removed) = removed {
|
||||
resident_metric.sub(
|
||||
removed
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Additionally returns the total number of layers, used for more stable relative access time
|
||||
/// based eviction.
|
||||
pub(super) fn get_layers_for_eviction(
|
||||
@@ -601,8 +684,13 @@ impl<'a> TenantDownloader<'a> {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
// We have no existing state: need to scan local disk for layers first.
|
||||
let timeline_state =
|
||||
init_timeline_state(self.conf, tenant_shard_id, timeline).await;
|
||||
let timeline_state = init_timeline_state(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Re-acquire detail lock now that we're done with async load from local FS
|
||||
self.secondary_state
|
||||
@@ -671,6 +759,25 @@ impl<'a> TenantDownloader<'a> {
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Metrics consistency check in testing builds
|
||||
if cfg!(feature = "testing") {
|
||||
let detail = self.secondary_state.detail.lock().unwrap();
|
||||
let resident_size = detail
|
||||
.timelines
|
||||
.values()
|
||||
.map(|tl| {
|
||||
tl.on_disk_layers
|
||||
.values()
|
||||
.map(|v| v.metadata.file_size)
|
||||
.sum::<u64>()
|
||||
})
|
||||
.sum::<u64>();
|
||||
assert_eq!(
|
||||
resident_size,
|
||||
self.secondary_state.resident_size_metric.get()
|
||||
);
|
||||
}
|
||||
|
||||
// Only update last_etag after a full successful download: this way will not skip
|
||||
// the next download, even if the heatmap's actual etag is unchanged.
|
||||
self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
|
||||
@@ -783,7 +890,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
for delete_timeline in &delete_timelines {
|
||||
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
|
||||
// from disk fails that will be a fatal error.
|
||||
detail.timelines.remove(delete_timeline);
|
||||
detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -801,7 +908,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
|
||||
continue;
|
||||
};
|
||||
timeline_state.on_disk_layers.remove(&layer_name);
|
||||
timeline_state.remove_layer(&layer_name, &self.secondary_state.resident_size_metric);
|
||||
}
|
||||
|
||||
for timeline_id in delete_timelines {
|
||||
@@ -1000,33 +1107,24 @@ impl<'a> TenantDownloader<'a> {
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
|
||||
for t in touched {
|
||||
use std::collections::hash_map::Entry;
|
||||
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
|
||||
Entry::Occupied(mut v) => {
|
||||
v.get_mut().access_time = t.access_time;
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
let local_path = local_layer_path(
|
||||
touched.into_iter().for_each(|t| {
|
||||
timeline_detail.touch_layer(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
&t,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
|| {
|
||||
local_layer_path(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
&t.name,
|
||||
&t.metadata.generation,
|
||||
);
|
||||
e.insert(OnDiskState::new(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
t.name,
|
||||
t.metadata.clone(),
|
||||
t.access_time,
|
||||
local_path,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
},
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
result
|
||||
@@ -1135,6 +1233,7 @@ async fn init_timeline_state(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
heatmap: &HeatMapTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
) -> SecondaryDetailTimeline {
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::default();
|
||||
@@ -1210,17 +1309,13 @@ async fn init_timeline_state(
|
||||
} else {
|
||||
// We expect the access time to be initialized immediately afterwards, when
|
||||
// the latest heatmap is applied to the state.
|
||||
detail.on_disk_layers.insert(
|
||||
name.clone(),
|
||||
OnDiskState::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&heatmap.timeline_id,
|
||||
name,
|
||||
remote_meta.metadata.clone(),
|
||||
remote_meta.access_time,
|
||||
file_path,
|
||||
),
|
||||
detail.touch_layer(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&heatmap.timeline_id,
|
||||
remote_meta,
|
||||
resident_metric,
|
||||
|| file_path,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,7 +343,33 @@ impl WalIngest {
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
self.checkpoint.oldestActiveXid
|
||||
);
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
|
||||
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
||||
// because at shutdown, all in-progress transactions will implicitly
|
||||
// end. Postgres startup code knows that, and allows hot standby to start
|
||||
// immediately from a shutdown checkpoint.
|
||||
//
|
||||
// In Neon, Postgres hot standby startup always behaves as if starting from
|
||||
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
||||
// instead of overwriting self.checkpoint.oldestActiveXid with
|
||||
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
||||
// proper value, knowing that there are no in-progress transactions at this
|
||||
// point, except for prepared transactions.
|
||||
//
|
||||
// See also the neon code changes in the InitWalRecovery() function.
|
||||
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
||||
oldest_active_xid = xid;
|
||||
}
|
||||
}
|
||||
self.checkpoint.oldestActiveXid = oldest_active_xid;
|
||||
} else {
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
@@ -375,6 +401,7 @@ impl WalIngest {
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
pg_constants::RM_REPLORIGIN_ID => {
|
||||
@@ -1277,13 +1304,10 @@ impl WalIngest {
|
||||
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
|
||||
);
|
||||
|
||||
// Here we treat oldestXid and oldestXidDB
|
||||
// differently from postgres redo routines.
|
||||
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
|
||||
// until checkpoint happens and updates the value.
|
||||
// Here we can use the most recent value.
|
||||
// It's just an optimization, though and can be deleted.
|
||||
// TODO Figure out if there will be any issues with replica.
|
||||
// In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
|
||||
// truncated, but a checkpoint record with the updated values isn't written until
|
||||
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
|
||||
// so we keep the oldestXid and oldestXidDB up-to-date.
|
||||
self.checkpoint.oldestXid = xlrec.oldest_xid;
|
||||
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
||||
self.checkpoint_modified = true;
|
||||
@@ -1384,14 +1408,31 @@ impl WalIngest {
|
||||
// Note: The multixact members can wrap around, even within one WAL record.
|
||||
offset = offset.wrapping_add(n_this_page as u32);
|
||||
}
|
||||
if xlrec.mid >= self.checkpoint.nextMulti {
|
||||
self.checkpoint.nextMulti = xlrec.mid + 1;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
|
||||
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
|
||||
let next_offset = offset;
|
||||
assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
|
||||
|
||||
// Update next-multi-xid and next-offset
|
||||
//
|
||||
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
|
||||
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
|
||||
// read it, like GetNewMultiXactId(). This is different from how nextXid is
|
||||
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
|
||||
// is stored, so it's never 0 in a checkpoint.
|
||||
//
|
||||
// I don't know why it's done that way, it seems less error-prone to skip over 0
|
||||
// when the value is stored rather than when it's read. But let's do it the same
|
||||
// way here.
|
||||
let next_multi_xid = xlrec.mid.wrapping_add(1);
|
||||
|
||||
if self
|
||||
.checkpoint
|
||||
.update_next_multixid(next_multi_xid, next_offset)
|
||||
{
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
|
||||
// Also update the next-xid with the highest member. According to the comments in
|
||||
// multixact_redo(), this shouldn't be necessary, but let's do the same here.
|
||||
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
|
||||
if let Some(max_xid) = acc {
|
||||
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
|
||||
|
||||
293
pgxn/neon/neon.c
293
pgxn/neon/neon.c
@@ -12,6 +12,8 @@
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "access/subtrans.h"
|
||||
#include "access/twophase.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "storage/buf_internals.h"
|
||||
@@ -22,10 +24,12 @@
|
||||
#include "replication/logical.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "funcapi.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/wait_event.h"
|
||||
@@ -266,6 +270,293 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* XXX: These private to procarray.c, but we need them here.
|
||||
*/
|
||||
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
|
||||
#define TOTAL_MAX_CACHED_SUBXIDS \
|
||||
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
|
||||
|
||||
/*
|
||||
* Restore running-xact information by scanning the CLOG at startup.
|
||||
*
|
||||
* In PostgreSQL, a standby always has to wait for a running-xacts WAL record
|
||||
* to arrive before it can start accepting queries. Furthermore, if there are
|
||||
* transactions with too many subxids (> 64) open to fit in the in-memory
|
||||
* subxids cache, the running-xacts record will be marked as "suboverflowed",
|
||||
* and the standby will need to also wait for the currently in-progress
|
||||
* transactions to finish.
|
||||
*
|
||||
* That's not great in PostgreSQL, because a hot standby does not necessary
|
||||
* open up for queries immediately as you might expect. But it's worse in
|
||||
* Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint
|
||||
* record; it can start at any LSN. Postgres arranges things so that there is
|
||||
* a running-xacts record soon after every checkpoint record, but when you
|
||||
* start from an arbitrary LSN, that doesn't help. If the primary is idle, or
|
||||
* not running at all, it might never write a new running-xacts record,
|
||||
* leaving the replica in a limbo where it can never start accepting queries.
|
||||
*
|
||||
* To mitigate that, we have an additional mechanism to find the running-xacts
|
||||
* information: we scan the CLOG, making note of any XIDs not marked as
|
||||
* committed or aborted. They are added to the Postgres known-assigned XIDs
|
||||
* array by calling ProcArrayApplyRecoveryInfo() in the caller of this
|
||||
* function.
|
||||
*
|
||||
* There is one big limitation with that mechanism: The size of the
|
||||
* known-assigned XIDs is limited, so if there are a lot of in-progress XIDs,
|
||||
* we have to give up. Furthermore, we don't know how many of the in-progress
|
||||
* XIDs are subtransactions, and if we use up all the space in the
|
||||
* known-assigned XIDs array for subtransactions, we might run out of space in
|
||||
* the array later during WAL replay, causing the replica to shut down with
|
||||
* "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to
|
||||
* the known-assigned array without risking that error later is very low,
|
||||
* merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up
|
||||
* to half of the known-assigned XIDs array for the subtransactions, even
|
||||
* though that risks getting the error later.
|
||||
*
|
||||
* Note: It's OK if the recovered list of XIDs includes some transactions that
|
||||
* have crashed in the primary, and hence will never commit. They will be seen
|
||||
* as in-progress, until we see a new next running-acts record with an
|
||||
* oldestActiveXid that invalidates them. That's how the known-assigned XIDs
|
||||
* array always works.
|
||||
*
|
||||
* If scraping the CLOG doesn't succeed for some reason, like the subxid
|
||||
* overflow, Postgres will fall back to waiting for a running-xacts record
|
||||
* like usual.
|
||||
*
|
||||
* Returns true if a complete list of in-progress XIDs was scraped.
|
||||
*/
|
||||
static bool
|
||||
RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids)
|
||||
{
|
||||
TransactionId from;
|
||||
TransactionId till;
|
||||
int max_xcnt;
|
||||
TransactionId *prepared_xids = NULL;
|
||||
int n_prepared_xids;
|
||||
TransactionId *restored_xids = NULL;
|
||||
int n_restored_xids;
|
||||
int next_prepared_idx;
|
||||
|
||||
Assert(*xids == NULL);
|
||||
|
||||
/*
|
||||
* If the checkpoint doesn't have a valid oldestActiveXid, bail out. We
|
||||
* don't know where to start the scan.
|
||||
*
|
||||
* This shouldn't happen, because the pageserver always maintains a valid
|
||||
* oldestActiveXid nowadays. Except when starting at an old point in time
|
||||
* that was ingested before the pageserver was taught to do that.
|
||||
*/
|
||||
if (!TransactionIdIsValid(checkpoint->oldestActiveXid))
|
||||
{
|
||||
elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set");
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/*
|
||||
* We will scan the CLOG starting from the oldest active XID.
|
||||
*
|
||||
* In some corner cases, the oldestActiveXid from the last checkpoint
|
||||
* might already have been truncated from the CLOG. That is,
|
||||
* oldestActiveXid might be older than oldestXid. That's possible because
|
||||
* oldestActiveXid is only updated at checkpoints. After the last
|
||||
* checkpoint, the oldest transaction might have committed, and the CLOG
|
||||
* might also have been already truncated. So if oldestActiveXid is older
|
||||
* than oldestXid, start at oldestXid instead. (Otherwise we'd try to
|
||||
* access CLOG segments that have already been truncated away.)
|
||||
*/
|
||||
from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid)
|
||||
? checkpoint->oldestActiveXid : checkpoint->oldestXid;
|
||||
till = XidFromFullTransactionId(checkpoint->nextXid);
|
||||
|
||||
/*
|
||||
* To avoid "too many KnownAssignedXids" error later during replay, we
|
||||
* limit number of collected transactions. This is a tradeoff: if we are
|
||||
* willing to consume more of the KnownAssignedXids space for the XIDs
|
||||
* now, that allows us to start up, but we might run out of space later.
|
||||
*
|
||||
* The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS,
|
||||
* which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In
|
||||
* PostgreSQL, that's always enough because the primary will always write
|
||||
* an XLOG_XACT_ASSIGNMENT record if a transaction has more than
|
||||
* PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows
|
||||
* the standby to mark the XIDs in pg_subtrans and removing them from the
|
||||
* KnowingAssignedXids array.
|
||||
*
|
||||
* Here, we don't know which XIDs belong to subtransactions that have
|
||||
* already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we
|
||||
* wanted to be totally safe and avoid the possibility of getting a "too
|
||||
* many KnownAssignedXids" error later, we would have to limit ourselves
|
||||
* to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top
|
||||
* transaction IDs too, because we cannot distinguish between top
|
||||
* transaction IDs and subtransactions here.
|
||||
*
|
||||
* Somewhat arbitrarily, we use up to half of KnownAssignedXids. That
|
||||
* strikes a sensible balance between being useful, and risking a "too
|
||||
* many KnownAssignedXids" error later.
|
||||
*/
|
||||
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
|
||||
|
||||
/*
|
||||
* Collect XIDs of prepared transactions in an array. This includes only
|
||||
* their top-level XIDs. We assume that StandbyRecoverPreparedTransactions
|
||||
* has already been called, so we can find all the sub-transactions in
|
||||
* pg_subtrans.
|
||||
*/
|
||||
PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids);
|
||||
qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator);
|
||||
|
||||
/*
|
||||
* Scan the CLOG, collecting in-progress XIDs into 'restored_xids'.
|
||||
*/
|
||||
elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till);
|
||||
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
|
||||
n_restored_xids = 0;
|
||||
next_prepared_idx = 0;
|
||||
for (TransactionId xid = from; xid != till;)
|
||||
{
|
||||
XLogRecPtr xidlsn;
|
||||
XidStatus xidstatus;
|
||||
|
||||
xidstatus = TransactionIdGetStatus(xid, &xidlsn);
|
||||
|
||||
/*
|
||||
* "Merge" the prepared transactions into the restored_xids array as
|
||||
* we go. The prepared transactions array is sorted. This is mostly
|
||||
* a sanity check to ensure that all the prpeared transactions are
|
||||
* seen as in-progress. (There is a check after the loop that we didn't
|
||||
* miss any.)
|
||||
*/
|
||||
if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx])
|
||||
{
|
||||
/*
|
||||
* This is a top-level transaction ID of a prepared transaction.
|
||||
* Include it in the array.
|
||||
*/
|
||||
|
||||
/* sanity check */
|
||||
if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG",
|
||||
xid, xidstatus);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids);
|
||||
next_prepared_idx++;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_COMMITTED)
|
||||
{
|
||||
elog(DEBUG1, "XID %u: was committed", xid);
|
||||
goto skip;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_ABORTED)
|
||||
{
|
||||
elog(DEBUG1, "XID %u: was aborted", xid);
|
||||
goto skip;
|
||||
}
|
||||
else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
|
||||
{
|
||||
/*
|
||||
* In-progress transactions are included in the array.
|
||||
*
|
||||
* Except subtransactions of the prepared transactions. They are
|
||||
* already set in pg_subtrans, and hence don't need to be tracked
|
||||
* in the known-assigned XIDs array.
|
||||
*/
|
||||
if (n_prepared_xids > 0)
|
||||
{
|
||||
TransactionId parent = SubTransGetParent(xid);
|
||||
|
||||
if (TransactionIdIsValid(parent))
|
||||
{
|
||||
/*
|
||||
* This is a subtransaction belonging to a prepared
|
||||
* transaction.
|
||||
*
|
||||
* Sanity check that it is in the prepared XIDs array. It
|
||||
* should be, because StandbyRecoverPreparedTransactions
|
||||
* populated pg_subtrans, and no other XID should be set
|
||||
* in it yet. (This also relies on the fact that
|
||||
* StandbyRecoverPreparedTransactions sets the parent of
|
||||
* each subxid to point directly to the top-level XID,
|
||||
* rather than restoring the original subtransaction
|
||||
* hierarchy.)
|
||||
*/
|
||||
if (bsearch(&parent, prepared_xids, next_prepared_idx,
|
||||
sizeof(TransactionId), xidLogicalComparator) == NULL)
|
||||
{
|
||||
elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG",
|
||||
xid, parent);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent);
|
||||
goto skip;
|
||||
}
|
||||
}
|
||||
|
||||
/* include it in the array */
|
||||
elog(DEBUG1, "XID %u: is in progress", xid);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* SUB_COMMITTED is a transient state used at commit. We don't
|
||||
* expect to see that here.
|
||||
*/
|
||||
elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG",
|
||||
xid, xidstatus);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (n_restored_xids >= max_xcnt)
|
||||
{
|
||||
/*
|
||||
* Overflowed. We won't be able to install the RunningTransactions
|
||||
* snapshot.
|
||||
*/
|
||||
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
|
||||
checkpoint->oldestXid, checkpoint->oldestActiveXid,
|
||||
XidFromFullTransactionId(checkpoint->nextXid));
|
||||
goto fail;
|
||||
}
|
||||
|
||||
restored_xids[n_restored_xids++] = xid;
|
||||
|
||||
skip:
|
||||
TransactionIdAdvance(xid);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* sanity check */
|
||||
if (next_prepared_idx != n_prepared_xids)
|
||||
{
|
||||
elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG",
|
||||
prepared_xids[next_prepared_idx]);
|
||||
Assert(false);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
|
||||
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
|
||||
*nxids = n_restored_xids;
|
||||
*xids = restored_xids;
|
||||
return true;
|
||||
|
||||
fail:
|
||||
*nxids = 0;
|
||||
*xids = NULL;
|
||||
if (restored_xids)
|
||||
pfree(restored_xids);
|
||||
if (prepared_xids)
|
||||
pfree(prepared_xids);
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
@@ -288,6 +579,8 @@ _PG_init(void)
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
restore_running_xacts_callback = RestoreRunningXactsFromClog;
|
||||
|
||||
/*
|
||||
* Important: This must happen after other parts of the extension are
|
||||
* loaded, otherwise any settings to GUCs that were set before the
|
||||
|
||||
@@ -7,7 +7,7 @@ OBJS = \
|
||||
neontest.o
|
||||
|
||||
EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.1.sql
|
||||
DATA = neon_test_utils--1.2.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
|
||||
@@ -41,7 +41,7 @@ RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
|
||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon_test_utils extension
|
||||
comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.1'
|
||||
default_version = '1.2'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "access/relation.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "fmgr.h"
|
||||
#include "funcapi.h"
|
||||
@@ -444,11 +445,46 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
|
||||
/*
|
||||
* Directly calls XLogFlush(lsn) to flush WAL buffers.
|
||||
*
|
||||
* If 'lsn' is not specified (is NULL), flush all generated WAL.
|
||||
*/
|
||||
Datum
|
||||
neon_xlogflush(PG_FUNCTION_ARGS)
|
||||
{
|
||||
XLogRecPtr lsn = PG_GETARG_LSN(0);
|
||||
XLogRecPtr lsn;
|
||||
|
||||
if (RecoveryInProgress())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("recovery is in progress"),
|
||||
errhint("cannot flush WAL during recovery.")));
|
||||
|
||||
if (!PG_ARGISNULL(0))
|
||||
lsn = PG_GETARG_LSN(0);
|
||||
else
|
||||
{
|
||||
lsn = GetXLogInsertRecPtr();
|
||||
|
||||
/*---
|
||||
* The LSN returned by GetXLogInsertRecPtr() is the position where the
|
||||
* next inserted record would begin. If the last record ended just at
|
||||
* the page boundary, the next record will begin after the page header
|
||||
* on the next page, and that's what GetXLogInsertRecPtr().returns,
|
||||
* but the page header has not been written yet. If we tried to flush
|
||||
* it, XLogFlush() would throw an error:
|
||||
*
|
||||
* ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X
|
||||
*
|
||||
* To avoid that, if the insert position points to just after the page
|
||||
* header, back off to page boundary.
|
||||
*/
|
||||
if (lsn % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
|
||||
XLogSegmentOffset(lsn, wal_segment_size) > XLOG_BLCKSZ)
|
||||
lsn -= SizeOfXLogShortPHD;
|
||||
else if (lsn % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
|
||||
XLogSegmentOffset(lsn, wal_segment_size) < XLOG_BLCKSZ)
|
||||
lsn -= SizeOfXLogLongPHD;
|
||||
}
|
||||
|
||||
XLogFlush(lsn);
|
||||
PG_RETURN_VOID();
|
||||
|
||||
@@ -3491,7 +3491,6 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
):
|
||||
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.branch_name: Optional[str] = None # dubious
|
||||
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
@@ -3857,7 +3856,9 @@ class EndpointFactory:
|
||||
|
||||
return self
|
||||
|
||||
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
|
||||
def new_replica(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
@@ -198,7 +198,7 @@ def wait_for_last_record_lsn(
|
||||
lsn: Lsn,
|
||||
) -> Lsn:
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
for i in range(100):
|
||||
for i in range(1000):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
return current_lsn
|
||||
|
||||
@@ -7,6 +7,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
VanillaPostgres,
|
||||
import_timeline_from_vanilla_postgres,
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
@@ -182,3 +183,275 @@ def test_import_at_2bil(
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (10000 + 1 + 1,)
|
||||
|
||||
|
||||
# Constants and macros copied from PostgreSQL multixact.c and headers. These are needed to
|
||||
# calculate the SLRU segments that a particular multixid or multixid-offsets falls into.
|
||||
BLCKSZ = 8192
|
||||
MULTIXACT_OFFSETS_PER_PAGE = int(BLCKSZ / 4)
|
||||
SLRU_PAGES_PER_SEGMENT = int(32)
|
||||
MXACT_MEMBER_BITS_PER_XACT = 8
|
||||
MXACT_MEMBER_FLAGS_PER_BYTE = 1
|
||||
MULTIXACT_FLAGBYTES_PER_GROUP = 4
|
||||
MULTIXACT_MEMBERS_PER_MEMBERGROUP = MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE
|
||||
MULTIXACT_MEMBERGROUP_SIZE = 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP
|
||||
MULTIXACT_MEMBERGROUPS_PER_PAGE = int(BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE)
|
||||
MULTIXACT_MEMBERS_PER_PAGE = MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP
|
||||
|
||||
|
||||
def MultiXactIdToOffsetSegment(xid: int):
|
||||
return int(xid / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_OFFSETS_PER_PAGE))
|
||||
|
||||
|
||||
def MXOffsetToMemberSegment(off: int):
|
||||
return int(off / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_MEMBERS_PER_PAGE))
|
||||
|
||||
|
||||
def advance_multixid_to(
|
||||
pg_bin: PgBin, vanilla_pg: VanillaPostgres, next_multi_xid: int, next_multi_offset: int
|
||||
):
|
||||
"""
|
||||
Use pg_resetwal to advance the nextMulti and nextMultiOffset values in a stand-alone
|
||||
Postgres cluster. This is useful to get close to wraparound or some other interesting
|
||||
value, without having to burn a lot of time consuming the (multi-)XIDs one by one.
|
||||
|
||||
The new values should be higher than the old ones, in a wraparound-aware sense.
|
||||
|
||||
On entry, the server should be running. It will be shut down and restarted.
|
||||
"""
|
||||
|
||||
# Read old values from the last checkpoint. We will pass the old oldestMultiXid value
|
||||
# back to pg_resetwal, there's no option to leave it alone.
|
||||
with vanilla_pg.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Make sure the oldest-multi-xid value in the control file is up-to-date
|
||||
cur.execute("checkpoint")
|
||||
cur.execute("select oldest_multi_xid, next_multixact_id from pg_control_checkpoint()")
|
||||
rec = cur.fetchone()
|
||||
assert rec is not None
|
||||
(ckpt_oldest_multi_xid, ckpt_next_multi_xid) = rec
|
||||
log.info(f"oldestMultiXid was {ckpt_oldest_multi_xid}, nextMultiXid was {ckpt_next_multi_xid}")
|
||||
log.info(f"Resetting to {next_multi_xid}")
|
||||
|
||||
# Use pg_resetwal to reset the next multiXid and multiOffset to given values.
|
||||
vanilla_pg.stop()
|
||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
||||
cmd = [
|
||||
pg_resetwal_path,
|
||||
f"--multixact-ids={next_multi_xid},{ckpt_oldest_multi_xid}",
|
||||
f"--multixact-offset={next_multi_offset}",
|
||||
"-D",
|
||||
str(vanilla_pg.pgdatadir),
|
||||
]
|
||||
pg_bin.run_capture(cmd)
|
||||
|
||||
# Because we skip over a lot of values, Postgres hasn't created the SLRU segments for
|
||||
# the new values yet. Create them manually, to allow Postgres to start up.
|
||||
#
|
||||
# This leaves "gaps" in the SLRU where segments between old value and new value are
|
||||
# missing. That's OK for our purposes. Autovacuum will print some warnings about the
|
||||
# missing segments, but will clean it up by truncating the SLRUs up to the new value,
|
||||
# closing the gap.
|
||||
segname = "%04X" % MultiXactIdToOffsetSegment(next_multi_xid)
|
||||
log.info(f"Creating dummy segment pg_multixact/offsets/{segname}")
|
||||
with open(vanilla_pg.pgdatadir / "pg_multixact" / "offsets" / segname, "w") as of:
|
||||
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
|
||||
of.flush()
|
||||
|
||||
segname = "%04X" % MXOffsetToMemberSegment(next_multi_offset)
|
||||
log.info(f"Creating dummy segment pg_multixact/members/{segname}")
|
||||
with open(vanilla_pg.pgdatadir / "pg_multixact" / "members" / segname, "w") as of:
|
||||
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
|
||||
of.flush()
|
||||
|
||||
# Start Postgres again and wait until autovacuum has processed all the databases
|
||||
#
|
||||
# This allows truncating the SLRUs, fixing the gaps with missing segments.
|
||||
vanilla_pg.start()
|
||||
with vanilla_pg.connect().cursor() as cur:
|
||||
for _ in range(1000):
|
||||
datminmxid = int(
|
||||
query_scalar(cur, "select min(datminmxid::text::int8) from pg_database")
|
||||
)
|
||||
log.info(f"datminmxid {datminmxid}")
|
||||
if next_multi_xid - datminmxid < 1_000_000: # not wraparound-aware!
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def test_multixid_wraparound_import(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_bin: PgBin,
|
||||
vanilla_pg,
|
||||
):
|
||||
"""
|
||||
Test that the wraparound of the "next-multi-xid" counter is handled correctly in
|
||||
pageserver, And multi-offsets as well
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# In order to to test multixid wraparound, we need to first advance the counter to
|
||||
# within spitting distance of the wraparound, that is 2^32 multi-XIDs. We could simply
|
||||
# run a workload that consumes a lot of multi-XIDs until we approach that, but that
|
||||
# takes a very long time. So we cheat.
|
||||
#
|
||||
# Our strategy is to create a vanilla Postgres cluster, and use pg_resetwal to
|
||||
# directly set the multi-xid counter a higher value. However, we cannot directly set
|
||||
# it to just before 2^32 (~ 4 billion), because that would make the exisitng
|
||||
# 'relminmxid' values to look like they're in the future. It's not clear how the
|
||||
# system would behave in that situation. So instead, we bump it up ~ 1 billion
|
||||
# multi-XIDs at a time, and let autovacuum to process all the relations and update
|
||||
# 'relminmxid' between each run.
|
||||
#
|
||||
# XXX: For the multi-offsets, most of the bump is done in the last call. This is
|
||||
# because advancing it ~ 1 billion at a time hit a pathological case in the
|
||||
# MultiXactMemberFreezeThreshold() function, causing autovacuum not trigger multixid
|
||||
# freezing. See
|
||||
# https://www.postgresql.org/message-id/85fb354c-f89f-4d47-b3a2-3cbd461c90a3%40iki.fi
|
||||
# Multi-offsets don't have the same wraparound problems at 2 billion mark as
|
||||
# multi-xids do, so one big jump is fine.
|
||||
vanilla_pg.configure(
|
||||
[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
# Perform anti-wraparound vacuuming aggressively
|
||||
"autovacuum_naptime='1 s'",
|
||||
"autovacuum_freeze_max_age = 1000000",
|
||||
"autovacuum_multixact_freeze_max_age = 1000000",
|
||||
],
|
||||
)
|
||||
vanilla_pg.start()
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0x40000000, 0x10000000)
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0x80000000, 0x20000000)
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0xC0000000, 0x30000000)
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0xFFFFFF00, 0xFFFFFF00)
|
||||
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("create table tt as select g as id from generate_series(1, 10) g")
|
||||
vanilla_pg.safe_psql("CHECKPOINT")
|
||||
|
||||
# Import the cluster to the pageserver
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.tenant_create(tenant_id)
|
||||
timeline_id = TimelineId.generate()
|
||||
import_timeline_from_vanilla_postgres(
|
||||
test_output_dir,
|
||||
env,
|
||||
pg_bin,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
"imported_multixid_wraparound_test",
|
||||
vanilla_pg.connstr(),
|
||||
)
|
||||
vanilla_pg.stop()
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"imported_multixid_wraparound_test",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
"autovacuum_naptime='5 s'",
|
||||
"autovacuum=off",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
assert query_scalar(cur, "select count(*) from tt") == 10 # sanity check
|
||||
|
||||
# Install extension containing function needed for test
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
# Consume a lot of XIDs, just to advance the XIDs to different range than the
|
||||
# multi-xids. That avoids confusion while debugging
|
||||
cur.execute("select test_consume_xids(100000)")
|
||||
cur.execute("select pg_switch_wal()")
|
||||
cur.execute("checkpoint")
|
||||
|
||||
# Use subtransactions so that each row in 'tt' is stamped with different XID. Leave
|
||||
# the transaction open.
|
||||
cur.execute("BEGIN")
|
||||
cur.execute(
|
||||
"""
|
||||
do $$
|
||||
declare
|
||||
idvar int;
|
||||
begin
|
||||
for idvar in select id from tt loop
|
||||
begin
|
||||
update tt set id = idvar where id = idvar;
|
||||
exception when others then
|
||||
raise 'didn''t expect an error: %', sqlerrm;
|
||||
end;
|
||||
end loop;
|
||||
end;
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
# In a different transaction, acquire a FOR KEY SHARE lock on each row. This generates
|
||||
# a new multixid for each row, with the previous xmax and this transaction's XID as the
|
||||
# members.
|
||||
#
|
||||
# Repeat this until the multi-xid counter wraps around.
|
||||
conn3 = endpoint.connect()
|
||||
cur3 = conn3.cursor()
|
||||
next_multixact_id_before_restart = 0
|
||||
observed_before_wraparound = False
|
||||
while True:
|
||||
cur3.execute("BEGIN")
|
||||
cur3.execute("SELECT * FROM tt FOR KEY SHARE")
|
||||
|
||||
# Get the xmax of one of the rows we locked. It should be a multi-xid. It might
|
||||
# not be the latest one, but close enough.
|
||||
row_xmax = int(query_scalar(cur3, "SELECT xmax FROM tt LIMIT 1"))
|
||||
cur3.execute("COMMIT")
|
||||
log.info(f"observed a row with xmax {row_xmax}")
|
||||
|
||||
# High value means not wrapped around yet
|
||||
if row_xmax >= 0xFFFFFF00:
|
||||
observed_before_wraparound = True
|
||||
continue
|
||||
|
||||
# xmax should not be a regular XID. (We bumped up the regular XID range earlier
|
||||
# to around 100000 and above.)
|
||||
assert row_xmax < 100
|
||||
|
||||
# xmax values < FirstNormalTransactionId (== 3) could be special XID values, or
|
||||
# multixid values after wraparound. We don't know for sure which, so keep going to
|
||||
# be sure we see value that's unambiguously a wrapped-around multixid
|
||||
if row_xmax < 3:
|
||||
continue
|
||||
|
||||
next_multixact_id_before_restart = row_xmax
|
||||
log.info(
|
||||
f"next_multixact_id is now at {next_multixact_id_before_restart} or a little higher"
|
||||
)
|
||||
break
|
||||
|
||||
# We should have observed the state before wraparound
|
||||
assert observed_before_wraparound
|
||||
|
||||
cur.execute("COMMIT")
|
||||
|
||||
# Wait until pageserver has received all the data, and restart the endpoint
|
||||
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint
|
||||
endpoint.start()
|
||||
|
||||
# Check that the next-multixid value wrapped around correctly
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("select next_multixact_id from pg_control_checkpoint()")
|
||||
next_multixact_id_after_restart = int(
|
||||
query_scalar(cur, "select next_multixact_id from pg_control_checkpoint()")
|
||||
)
|
||||
log.info(f"next_multixact_id after restart: {next_multixact_id_after_restart}")
|
||||
assert next_multixact_id_after_restart >= next_multixact_id_before_restart
|
||||
|
||||
# The multi-offset should wrap around as well
|
||||
cur.execute("select next_multi_offset from pg_control_checkpoint()")
|
||||
next_multi_offset_after_restart = int(
|
||||
query_scalar(cur, "select next_multi_offset from pg_control_checkpoint()")
|
||||
)
|
||||
log.info(f"next_multi_offset after restart: {next_multi_offset_after_restart}")
|
||||
assert next_multi_offset_after_restart < 100000
|
||||
|
||||
646
test_runner/regress/test_replica_start.py
Normal file
646
test_runner/regress/test_replica_start.py
Normal file
@@ -0,0 +1,646 @@
|
||||
"""
|
||||
In PostgreSQL, a standby always has to wait for a running-xacts WAL record to
|
||||
arrive before it can start accepting queries. Furthermore, if there are
|
||||
transactions with too many subxids (> 64) open to fit in the in-memory subxids
|
||||
cache, the running-xacts record will be marked as "suboverflowed", and the
|
||||
standby will need to also wait for the currently in-progress transactions to
|
||||
finish.
|
||||
|
||||
In Neon, we have an additional mechanism that scans the CLOG at server startup
|
||||
to determine the list of running transactions, so that the standby can start up
|
||||
immediately without waiting for the running-xacts record, but that mechanism
|
||||
only works if the # of active (sub-)transactions is reasonably small. Otherwise
|
||||
it falls back to waiting. Furthermore, it's somewhat optimistic in using up the
|
||||
known-assigned XIDs array: if too many transactions with subxids are started in
|
||||
the primary later, the replay in the replica will crash with "too many
|
||||
KnownAssignedXids" error.
|
||||
|
||||
This module contains tests for those various cases at standby startup: starting
|
||||
from shutdown checkpoint, using the CLOG scanning mechanism, waiting for
|
||||
running-xacts record and for in-progress transactions to finish etc.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
CREATE_SUBXACTS_FUNC = """
|
||||
create or replace function create_subxacts(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
begin
|
||||
insert into t (payload) values (0);
|
||||
exception
|
||||
when others then
|
||||
raise exception 'caught something: %', sqlerrm;
|
||||
end;
|
||||
end loop;
|
||||
end; $$ language plpgsql
|
||||
"""
|
||||
|
||||
|
||||
def test_replica_start_scan_clog(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup. There is one
|
||||
transaction active in the primary when the standby is started. The primary
|
||||
is killed before it has a chance to write a running-xacts record. The
|
||||
CLOG-scanning at neon startup allows the standby to start up anyway.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
primary_cur.execute("select pg_switch_wal()")
|
||||
|
||||
# Start a transaction in the primary. Leave the transaction open.
|
||||
#
|
||||
# The transaction has some subtransactions, but not too many to cause the
|
||||
# CLOG-scanning mechanism to give up.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50)")
|
||||
|
||||
# Wait for the WAL to be flushed, but then immediately kill the primary,
|
||||
# before it has a chance to generate a running-xacts record.
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
primary.stop(mode="immediate")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
|
||||
def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup, after
|
||||
leaving behind crashed transactions.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
primary_cur.execute("select pg_switch_wal()")
|
||||
|
||||
# Consume a lot of XIDs, then kill Postgres without giving it a
|
||||
# chance to write abort records for them.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
primary.stop(mode="immediate")
|
||||
|
||||
# Restart the primary. Do some light work, and shut it down cleanly
|
||||
primary.start()
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("insert into t (payload) values (0)")
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism. (Restarting the primary writes a checkpoint and/or running-xacts
|
||||
# record, which allows the standby to know that the crashed XIDs are aborted)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
|
||||
def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version):
|
||||
"""
|
||||
Test that starting a replica works right after the primary has
|
||||
created a running-xacts record. This may seem like a trivial case,
|
||||
but during development, we had a bug that was triggered by having
|
||||
oldestActiveXid == nextXid. Starting right after a running-xacts
|
||||
record is one way to test that case.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15:
|
||||
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("select pg_log_standby_snapshot()")
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select 123")
|
||||
assert secondary_cur.fetchone() == (123,)
|
||||
|
||||
|
||||
def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test replica startup when there are a lot of (sub)transactions active in the
|
||||
primary. That's too many for the CLOG-scanning mechanism to handle, so the
|
||||
replica has to wait for the large transaction to finish before it starts to
|
||||
accept queries.
|
||||
|
||||
After replica startup, test MVCC with transactions that were in-progress
|
||||
when the replica was started.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create
|
||||
# lots of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Start a transaction with 100000 subtransactions, and leave it open. That's
|
||||
# too many to fit in the "known-assigned XIDs array" in the replica, and
|
||||
# also too many to fit in the subxid caches so the running-xacts record will
|
||||
# also overflow.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
# Start another, smaller transaction in the primary. We'll come back to this
|
||||
# later.
|
||||
primary_conn2 = primary.connect()
|
||||
primary_cur2 = primary_conn2.cursor()
|
||||
primary_cur2.execute("begin")
|
||||
primary_cur2.execute("insert into t (payload) values (0)")
|
||||
|
||||
# Create a replica. but before that, wait for the wal to be flushed to
|
||||
# safekeepers, so that the replica is started at a point where the large
|
||||
# transaction is already active. (The whole transaction might not be flushed
|
||||
# yet, but that's OK.)
|
||||
#
|
||||
# Start it in a separate thread, so that we can do other stuff while it's
|
||||
# blocked waiting for the startup to finish.
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary")
|
||||
start_secondary_thread = threading.Thread(target=secondary.start)
|
||||
start_secondary_thread.start()
|
||||
|
||||
# Verify that the replica has otherwise started up, but cannot start
|
||||
# accepting queries yet.
|
||||
log.info("Waiting 5 s to verify that the secondary does not start")
|
||||
start_secondary_thread.join(5)
|
||||
assert secondary.log_contains("consistent recovery state reached")
|
||||
assert secondary.log_contains("started streaming WAL from primary")
|
||||
# The "redo starts" message is printed when the first WAL record is
|
||||
# received. It might or might not be present in the log depending on how
|
||||
# far exactly the WAL was flushed when the replica was started, and whether
|
||||
# background activity caused any more WAL records to be flushed on the
|
||||
# primary afterwards.
|
||||
#
|
||||
# assert secondary.log_contains("redo # starts")
|
||||
|
||||
# should not be open for connections yet
|
||||
assert start_secondary_thread.is_alive()
|
||||
assert not secondary.is_running()
|
||||
assert not secondary.log_contains("database system is ready to accept read-only connections")
|
||||
|
||||
# Commit the large transaction in the primary.
|
||||
#
|
||||
# Within the next 15 s, the primary should write a new running-xacts record
|
||||
# to the WAL which shows the transaction as completed. Once the replica
|
||||
# replays that record, it will start accepting queries.
|
||||
primary_cur.execute("commit")
|
||||
start_secondary_thread.join()
|
||||
|
||||
# Verify that the large transaction is correctly visible in the secondary
|
||||
# (but not the second, small transaction, which is still in-progress!)
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Perform some more MVCC testing using the second transaction that was
|
||||
# started in the primary before the replica was created
|
||||
primary_cur2.execute("select create_subxacts(10000)")
|
||||
|
||||
# The second transaction still hasn't committed
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Commit the second transaction in the primary
|
||||
primary_cur2.execute("commit")
|
||||
|
||||
# Should still be invisible to the old snapshot
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
# Commit the REPEATABLE READ transaction in the replica. Both
|
||||
# primary transactions should now be visible to a new snapshot.
|
||||
secondary_cur.execute("commit")
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (110001,)
|
||||
|
||||
|
||||
def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
The CLOG-scanning mechanism fills the known-assigned XIDs array
|
||||
optimistically at standby startup, betting that it can still fit
|
||||
upcoming transactions replayed later from the WAL in the
|
||||
array. This test tests what happens when that bet fails and the
|
||||
known-assigned XID array fills up after the standby has already
|
||||
been started. The WAL redo will fail with an error:
|
||||
|
||||
FATAL: too many KnownAssignedXids
|
||||
CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
|
||||
|
||||
which causes the standby to shut down.
|
||||
|
||||
See the module docstring for background.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Determine how many connections we can use
|
||||
primary_cur.execute("show max_connections")
|
||||
max_connections = int(primary_cur.fetchall()[0][0])
|
||||
primary_cur.execute("show superuser_reserved_connections")
|
||||
superuser_reserved_connections = int(primary_cur.fetchall()[0][0])
|
||||
n_connections = max_connections - superuser_reserved_connections
|
||||
n_subxids = 200
|
||||
|
||||
# Start one top transaction in primary, with lots of subtransactions. This
|
||||
# uses up much of the known-assigned XIDs space in the standby, but doesn't
|
||||
# cause it to overflow.
|
||||
large_p_conn = primary.connect()
|
||||
large_p_cur = large_p_conn.cursor()
|
||||
large_p_cur.execute("begin")
|
||||
large_p_cur.execute(f"select create_subxacts({max_connections} * 30)")
|
||||
|
||||
with closing(primary.connect()) as small_p_conn:
|
||||
with small_p_conn.cursor() as small_p_cur:
|
||||
small_p_cur.execute("select create_subxacts(1)")
|
||||
|
||||
# Create a replica at this LSN
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
|
||||
# The transaction in primary has not committed yet.
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
# Start max number of top transactions in primary, with a lot of
|
||||
# subtransactions each. We add the subtransactions to each top transaction
|
||||
# in a round-robin fashion, instead of adding a lot of subtransactions to
|
||||
# one top transaction at a time. This way, we will have the max number of
|
||||
# subtransactions in the in-memory subxid cache of each top transaction,
|
||||
# until they all overflow.
|
||||
#
|
||||
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all
|
||||
# the subxid caches after creating 64 subxids in each top transaction. The
|
||||
# point just before the caches have overflowed is the most interesting point
|
||||
# in time, but we'll keep going beyond that, to ensure that this test is
|
||||
# robust even if PGPROC_MAX_CACHED_SUBXIDS changes.
|
||||
p_curs = []
|
||||
for _ in range(0, n_connections):
|
||||
p_cur = primary.connect().cursor()
|
||||
p_cur.execute("begin")
|
||||
p_curs.append(p_cur)
|
||||
|
||||
for _subxid in range(0, n_subxids):
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("select create_subxacts(1)")
|
||||
|
||||
# Commit all the transactions in the primary
|
||||
for i in range(0, n_connections):
|
||||
p_curs[i].execute("commit")
|
||||
large_p_cur.execute("commit")
|
||||
|
||||
# Wait until the replica crashes with "too many KnownAssignedXids" error.
|
||||
def check_replica_crashed():
|
||||
try:
|
||||
secondary.connect()
|
||||
except psycopg2.Error:
|
||||
# Once the connection fails, return success
|
||||
return None
|
||||
raise RuntimeError("connection succeeded")
|
||||
|
||||
wait_until(20, 0.5, check_replica_crashed)
|
||||
assert secondary.log_contains("too many KnownAssignedXids")
|
||||
|
||||
# Replica is crashed, so ignore stop result
|
||||
secondary.check_stop_result = False
|
||||
|
||||
|
||||
def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Before PR #7288, a hot standby in neon incorrectly started up
|
||||
immediately, before it had received a running-xacts record. That
|
||||
led to visibility bugs if there were active transactions in the
|
||||
primary. This test reproduces the incorrect query results and
|
||||
incorrectly set hint bits, before that was fixed.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
p_cur = primary.connect().cursor()
|
||||
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur = secondary.connect().cursor()
|
||||
|
||||
# Set hint bits for pg_class tuples. If primary's transaction is
|
||||
# not marked as in-progress in MVCC snapshot, then XMIN_INVALID
|
||||
# hint bit will be set for table's 't' tuple, making it invisible
|
||||
# even after the commit record is replayed later.
|
||||
s_cur.execute("select * from pg_class")
|
||||
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shutdown", [True, False])
|
||||
def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions.
|
||||
|
||||
This test is run in two variants: one where the primary server is shut down
|
||||
before starting the secondary, or not.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute("create table t1(pk integer primary key)")
|
||||
primary_cur.execute("create table t2(pk integer primary key)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Prepare a transaction for two-phase commit
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("insert into t1 values (1)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Prepare another transaction for two-phase commit, with a subtransaction
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("insert into t2 values (2)")
|
||||
primary_cur.execute("savepoint sp")
|
||||
primary_cur.execute("insert into t2 values (3)")
|
||||
primary_cur.execute("prepare transaction 't2'")
|
||||
|
||||
# Start a transaction in the primary. Leave the transaction open.
|
||||
#
|
||||
# The transaction has some subtransactions, but not too many to cause the
|
||||
# CLOG-scanning mechanism to give up.
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50)")
|
||||
|
||||
# Wait for the WAL to be flushed
|
||||
primary_cur.execute("select neon_xlogflush()")
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
if shutdown:
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
secondary_cur.execute("select count(*) from t1")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
secondary_cur.execute("select count(*) from t2")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
if shutdown:
|
||||
primary.start()
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
else:
|
||||
primary_cur.execute("commit")
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
primary_cur.execute("commit prepared 't2'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
if shutdown:
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
else:
|
||||
assert secondary_cur.fetchone() == (50,)
|
||||
secondary_cur.execute("select * from t1")
|
||||
assert secondary_cur.fetchall() == [(1,)]
|
||||
secondary_cur.execute("select * from t2")
|
||||
assert secondary_cur.fetchall() == [(2,), (3,)]
|
||||
|
||||
|
||||
def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions, with subtransactions.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs)
|
||||
#
|
||||
# This is interesting, because it tests that pg_subtrans is initialized correctly
|
||||
# at standby startup. (We had a bug where it didn't at one point during development.)
|
||||
while True:
|
||||
xid = int(query_scalar(primary_cur, "SELECT txid_current()"))
|
||||
log.info(f"xid now {xid}")
|
||||
# Consume 500 transactions at a time until we get close
|
||||
if xid < 65535 - 600:
|
||||
primary_cur.execute("select test_consume_xids(500);")
|
||||
else:
|
||||
break
|
||||
primary_cur.execute("checkpoint")
|
||||
|
||||
# Prepare a transaction for two-phase commit
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(1000)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Wait for the WAL to be flushed, and stop the primary
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (0,)
|
||||
|
||||
primary.start()
|
||||
|
||||
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100000,)
|
||||
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (101000,)
|
||||
|
||||
|
||||
def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the CLOG-scanning mechanism at hot standby startup in the presence of
|
||||
prepared transactions, with lots of subtransactions.
|
||||
|
||||
Like test_replica_start_with_prepared_xacts_with_subxacts, but with more
|
||||
subxacts, to test that the prepared transaction's subxids don't consume
|
||||
space in the known-assigned XIDs array. (They are set in pg_subtrans
|
||||
instead)
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env = neon_simple_env
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
primary_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
primary_cur.execute("create table t(pk serial primary key, payload integer)")
|
||||
primary_cur.execute(CREATE_SUBXACTS_FUNC)
|
||||
|
||||
# Prepare a transaction for two-phase commit, with lots of subxids
|
||||
primary_cur.execute("begin")
|
||||
primary_cur.execute("select create_subxacts(50000)")
|
||||
|
||||
# to make things a bit more varied, intersperse a few other XIDs in between
|
||||
# the prepared transaction's sub-XIDs
|
||||
with primary.connect().cursor() as primary_cur2:
|
||||
primary_cur2.execute("insert into t (payload) values (123)")
|
||||
primary_cur2.execute("begin; insert into t (payload) values (-1); rollback")
|
||||
|
||||
primary_cur.execute("select create_subxacts(50000)")
|
||||
primary_cur.execute("prepare transaction 't1'")
|
||||
|
||||
# Wait for the WAL to be flushed
|
||||
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
primary.stop(mode="fast")
|
||||
|
||||
# Create a replica. It should start up normally, thanks to the CLOG-scanning
|
||||
# mechanism.
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
|
||||
# The transaction did not commit, so it should not be visible in the secondary
|
||||
secondary_conn = secondary.connect()
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (1,)
|
||||
|
||||
primary.start()
|
||||
|
||||
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
|
||||
primary_conn = primary.connect()
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute("select create_subxacts(100000)")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100001,)
|
||||
|
||||
primary_cur.execute("commit prepared 't1'")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (200001,)
|
||||
@@ -1,32 +0,0 @@
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_replication_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("begin")
|
||||
p_cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
|
||||
p_cur.execute("select txid_current()")
|
||||
xid = p_cur.fetchall()[0][0]
|
||||
log.info(f"Master transaction {xid}")
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary"
|
||||
) as secondary:
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
# Enforce setting hint bits for pg_class tuples.
|
||||
# If master's transaction is not marked as in-progress in MVCC snapshot,
|
||||
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
|
||||
s_cur.execute("select * from pg_class")
|
||||
p_cur.execute("commit")
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
s_cur.execute("select * from t where pk = 1")
|
||||
assert s_cur.fetchone() == (1, 0)
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 223dd92595...ad73770c44
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: f54d7373eb...4874c8e52e
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: e06bebc753...b810fdfcbb
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"],
|
||||
"v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"],
|
||||
"v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"]
|
||||
"v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"],
|
||||
"v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"],
|
||||
"v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user