pageserver: fix N^2 I/O when processing relation drops in transaction abort (#9507)

## Problem

We have some known N^2 behaviors when it comes to large relation counts,
due to the monolithic encoding and full rewrites of of RelDirectory each
time a relation is added. Ordinarily our backpressure mechanisms give
"slow but steady" performance when creating/dropping/truncating
relations. However, in the case of a transaction abort, it is possible
for a single WAL record to drop an unbounded number of relations. The
results in an unavailable compute, as when it sends one of these
records, it can stall the pageserver's ingest for many minutes, even
though the compute only sent a small amount of WAL.

Closes https://github.com/neondatabase/neon/issues/9505

## Summary of changes

- Rewrite relation-dropping code to do one read/modify/write cycle of
RelDirectory, instead of doing it separately for each relation in a
loop.
- Add a test for the bug scenario encountered:
`test_tx_abort_with_many_relations`

The test has ~40s runtime on my workstation. About 1 second of that is
the part where we wait for ingest to catch up after a rollback, the rest
is the slowness of creating and truncating a large number of relations.


---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
John Spray
2024-10-25 15:09:02 +01:00
committed by GitHub
parent 2090e928d1
commit 8297f7a181
3 changed files with 146 additions and 42 deletions

View File

@@ -1506,35 +1506,42 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
/// Drop a relation.
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
/// Drop some relations
pub(crate) async fn put_rel_drops(
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
// Remove it from the directory entry
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
let mut dirty = false;
for rel_tag in rel_tags {
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
} else {
warn!("dropped rel {} did not exist in rel directory", rel);
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
}
}
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
}
}
// update logical size
let size_key = rel_size_to_key(rel);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// Remove enty from relation size cache
self.tline.remove_cached_rel_size(&rel);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel));
Ok(())
}

View File

@@ -21,6 +21,7 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
@@ -1620,6 +1621,12 @@ impl WalIngest {
},
)?;
// Group relations to drop by dbNode. This map will contain all relations that _might_
// exist, we will reduce it to which ones really exist later. This map can be huge if
// the transaction touches a huge number of relations (there is no bound on this in
// postgres).
let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
for xnode in &parsed.xnodes {
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
let rel = RelTag {
@@ -1628,15 +1635,16 @@ impl WalIngest {
dbnode: xnode.dbnode,
relnode: xnode.relnode,
};
if modification
.tline
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
self.put_rel_drop(modification, rel, ctx).await?;
}
drop_relations
.entry((xnode.spcnode, xnode.dbnode))
.or_default()
.push(rel);
}
}
// Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
modification.put_rel_drops(drop_relations, ctx).await?;
if origin_id != 0 {
modification
.set_replorigin(origin_id, parsed.origin_lsn)
@@ -2346,16 +2354,6 @@ impl WalIngest {
Ok(())
}
async fn put_rel_drop(
&mut self,
modification: &mut DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
) -> Result<()> {
modification.put_rel_drop(rel, ctx).await?;
Ok(())
}
async fn handle_rel_extend(
&mut self,
modification: &mut DatadirModification<'_>,
@@ -2869,7 +2867,9 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
let mut rel_drops = HashMap::new();
rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
m.put_rel_drops(rel_drops, &ctx).await?;
m.commit(&ctx).await?;
// Check that rel is not visible anymore

View File

@@ -3,10 +3,13 @@
#
from __future__ import annotations
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import TYPE_CHECKING, cast
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
@@ -324,3 +327,97 @@ def test_sql_regress(
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
post_checks(env, test_output_dir, DBNAME, endpoint)
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
def test_tx_abort_with_many_relations(
neon_env_builder: NeonEnvBuilder,
):
"""
This is not a pg_regress test as such, but perhaps it should be -- this test exercises postgres
behavior when aborting a transaction with lots of relations.
Reproducer for https://github.com/neondatabase/neon/issues/9505
"""
env = neon_env_builder.init_start()
ep = env.endpoints.create_start(
"main",
tenant_id=env.initial_tenant,
config_lines=[
"shared_buffers=1000MB",
"max_locks_per_transaction=16384",
],
)
# How many relations: this number is tuned to be long enough to take tens of seconds
# if the rollback code path is buggy, tripping the test's timeout.
n = 4000
def create():
# Create many relations
log.info(f"Creating {n} relations...")
ep.safe_psql_many(
[
"BEGIN",
f"""DO $$
DECLARE
i INT;
table_name TEXT;
BEGIN
FOR i IN 1..{n} LOOP
table_name := 'table_' || i;
EXECUTE 'CREATE TABLE IF NOT EXISTS ' || table_name || ' (id SERIAL PRIMARY KEY, data TEXT)';
END LOOP;
END $$;
""",
"COMMIT",
]
)
def truncate():
# Truncate relations, then roll back the transaction containing the truncations
log.info(f"Truncating {n} relations...")
ep.safe_psql_many(
[
"BEGIN",
f"""DO $$
DECLARE
i INT;
table_name TEXT;
BEGIN
FOR i IN 1..{n} LOOP
table_name := 'table_' || i;
EXECUTE 'TRUNCATE ' || table_name ;
END LOOP;
END $$;
""",
]
)
def rollback_and_wait():
log.info(f"Rolling back after truncating {n} relations...")
ep.safe_psql("ROLLBACK")
# Restart the endpoint: this ensures that we can read back what we just wrote, i.e. pageserver
# ingest has caught up.
ep.stop()
log.info(f"Starting endpoint after truncating {n} relations...")
ep.start()
log.info(f"Started endpoint after truncating {n} relations...")
# Actual create & truncate phases may be slow, these involves lots of WAL records. We do not
# apply a special timeout, they are expected to complete within general test timeout
create()
truncate()
# Run in a thread because the failure case is to take pathologically long time, and we don't want
# to block the test executor on that.
with ThreadPoolExecutor(max_workers=1) as exec:
try:
# Rollback phase should be fast: this is one WAL record that we should process efficiently
fut = exec.submit(rollback_and_wait)
fut.result(timeout=5)
except:
exec.shutdown(wait=False, cancel_futures=True)
raise