diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f2a11e65c1..19233a28cc 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1506,35 +1506,42 @@ impl<'a> DatadirModification<'a> { Ok(()) } - /// Drop a relation. - pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); + /// Drop some relations + pub(crate) async fn put_rel_drops( + &mut self, + drop_relations: HashMap<(u32, u32), Vec>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + for ((spc_node, db_node), rel_tags) in drop_relations { + let dir_key = rel_dir_to_key(spc_node, db_node); + let buf = self.get(dir_key, ctx).await?; + let mut dir = RelDirectory::des(&buf)?; - // Remove it from the directory entry - let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); - let buf = self.get(dir_key, ctx).await?; - let mut dir = RelDirectory::des(&buf)?; + let mut dirty = false; + for rel_tag in rel_tags { + if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) { + dirty = true; - self.pending_directory_entries - .push((DirectoryKind::Rel, dir.rels.len())); + // update logical size + let size_key = rel_size_to_key(rel_tag); + let old_size = self.get(size_key, ctx).await?.get_u32_le(); + self.pending_nblocks -= old_size as i64; - if dir.rels.remove(&(rel.relnode, rel.forknum)) { - self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?))); - } else { - warn!("dropped rel {} did not exist in rel directory", rel); + // Remove entry from relation size cache + self.tline.remove_cached_rel_size(&rel_tag); + + // Delete size entry, as well as all blocks + self.delete(rel_key_range(rel_tag)); + } + } + + if dirty { + self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?))); + self.pending_directory_entries + .push((DirectoryKind::Rel, dir.rels.len())); + } } - // update logical size - let size_key = rel_size_to_key(rel); - let old_size = self.get(size_key, ctx).await?.get_u32_le(); - self.pending_nblocks -= old_size as i64; - - // Remove enty from relation size cache - self.tline.remove_cached_rel_size(&rel); - - // Delete size entry, as well as all blocks - self.delete(rel_key_range(rel)); - Ok(()) } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index d81552ac77..9e43e10801 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -21,6 +21,7 @@ //! redo Postgres process, but some records it can handle directly with //! bespoken Rust code. +use std::collections::HashMap; use std::sync::Arc; use std::sync::OnceLock; use std::time::Duration; @@ -1620,6 +1621,12 @@ impl WalIngest { }, )?; + // Group relations to drop by dbNode. This map will contain all relations that _might_ + // exist, we will reduce it to which ones really exist later. This map can be huge if + // the transaction touches a huge number of relations (there is no bound on this in + // postgres). + let mut drop_relations: HashMap<(u32, u32), Vec> = HashMap::new(); + for xnode in &parsed.xnodes { for forknum in MAIN_FORKNUM..=INIT_FORKNUM { let rel = RelTag { @@ -1628,15 +1635,16 @@ impl WalIngest { dbnode: xnode.dbnode, relnode: xnode.relnode, }; - if modification - .tline - .get_rel_exists(rel, Version::Modified(modification), ctx) - .await? - { - self.put_rel_drop(modification, rel, ctx).await?; - } + drop_relations + .entry((xnode.spcnode, xnode.dbnode)) + .or_default() + .push(rel); } } + + // Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive + modification.put_rel_drops(drop_relations, ctx).await?; + if origin_id != 0 { modification .set_replorigin(origin_id, parsed.origin_lsn) @@ -2346,16 +2354,6 @@ impl WalIngest { Ok(()) } - async fn put_rel_drop( - &mut self, - modification: &mut DatadirModification<'_>, - rel: RelTag, - ctx: &RequestContext, - ) -> Result<()> { - modification.put_rel_drop(rel, ctx).await?; - Ok(()) - } - async fn handle_rel_extend( &mut self, modification: &mut DatadirModification<'_>, @@ -2869,7 +2867,9 @@ mod tests { // Drop rel let mut m = tline.begin_modification(Lsn(0x30)); - walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?; + let mut rel_drops = HashMap::new(); + rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]); + m.put_rel_drops(rel_drops, &ctx).await?; m.commit(&ctx).await?; // Check that rel is not visible anymore diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index 45ce5b1c5b..b97fccddf5 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -3,10 +3,13 @@ # from __future__ import annotations +import os +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import TYPE_CHECKING, cast import pytest +from fixtures.log_helper import log from fixtures.neon_fixtures import ( Endpoint, NeonEnv, @@ -324,3 +327,97 @@ def test_sql_regress( pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath) post_checks(env, test_output_dir, DBNAME, endpoint) + + +@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build") +def test_tx_abort_with_many_relations( + neon_env_builder: NeonEnvBuilder, +): + """ + This is not a pg_regress test as such, but perhaps it should be -- this test exercises postgres + behavior when aborting a transaction with lots of relations. + + Reproducer for https://github.com/neondatabase/neon/issues/9505 + """ + + env = neon_env_builder.init_start() + ep = env.endpoints.create_start( + "main", + tenant_id=env.initial_tenant, + config_lines=[ + "shared_buffers=1000MB", + "max_locks_per_transaction=16384", + ], + ) + + # How many relations: this number is tuned to be long enough to take tens of seconds + # if the rollback code path is buggy, tripping the test's timeout. + n = 4000 + + def create(): + # Create many relations + log.info(f"Creating {n} relations...") + ep.safe_psql_many( + [ + "BEGIN", + f"""DO $$ + DECLARE + i INT; + table_name TEXT; + BEGIN + FOR i IN 1..{n} LOOP + table_name := 'table_' || i; + EXECUTE 'CREATE TABLE IF NOT EXISTS ' || table_name || ' (id SERIAL PRIMARY KEY, data TEXT)'; + END LOOP; + END $$; + """, + "COMMIT", + ] + ) + + def truncate(): + # Truncate relations, then roll back the transaction containing the truncations + log.info(f"Truncating {n} relations...") + ep.safe_psql_many( + [ + "BEGIN", + f"""DO $$ + DECLARE + i INT; + table_name TEXT; + BEGIN + FOR i IN 1..{n} LOOP + table_name := 'table_' || i; + EXECUTE 'TRUNCATE ' || table_name ; + END LOOP; + END $$; + """, + ] + ) + + def rollback_and_wait(): + log.info(f"Rolling back after truncating {n} relations...") + ep.safe_psql("ROLLBACK") + + # Restart the endpoint: this ensures that we can read back what we just wrote, i.e. pageserver + # ingest has caught up. + ep.stop() + log.info(f"Starting endpoint after truncating {n} relations...") + ep.start() + log.info(f"Started endpoint after truncating {n} relations...") + + # Actual create & truncate phases may be slow, these involves lots of WAL records. We do not + # apply a special timeout, they are expected to complete within general test timeout + create() + truncate() + + # Run in a thread because the failure case is to take pathologically long time, and we don't want + # to block the test executor on that. + with ThreadPoolExecutor(max_workers=1) as exec: + try: + # Rollback phase should be fast: this is one WAL record that we should process efficiently + fut = exec.submit(rollback_and_wait) + fut.result(timeout=5) + except: + exec.shutdown(wait=False, cancel_futures=True) + raise