diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 73eb42bb30..9cb53f46d1 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection( modification.commit(&ctx).await?; uncommitted_records = 0; filtered_records = 0; + + // + // We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise + // layer size can become much larger than `checkpoint_distance`. + // It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large + // amount of data to key-value storage. So performing this check only after processing + // all WAL records in the chunk, can cause huge L0 layer files. + // + timeline + .check_checkpoint_distance() + .await + .with_context(|| { + format!( + "Failed to check checkpoint distance for timeline {}", + timeline.timeline_id + ) + })?; } } diff --git a/test_runner/regress/test_layer_bloating.py b/test_runner/regress/test_layer_bloating.py new file mode 100644 index 0000000000..70b115ad61 --- /dev/null +++ b/test_runner/regress/test_layer_bloating.py @@ -0,0 +1,66 @@ +import os +import time + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + logical_replication_sync, +) +from fixtures.pg_version import PgVersion + + +def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + + if env.pg_version != PgVersion.V16: + pytest.skip("pg_log_standby_snapshot() function is available only in PG16") + + timeline = env.neon_cli.create_branch("test_logical_replication", "empty") + endpoint = env.endpoints.create_start( + "test_logical_replication", config_lines=["log_statement=all"] + ) + + log.info("postgres is running on 'test_logical_replication' branch") + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # create table... + cur.execute("create table t(pk integer primary key)") + cur.execute("create publication pub1 for table t") + # Create slot to hold WAL + cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')") + + # now start subscriber + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key)") + + connstr = endpoint.connstr().replace("'", "''") + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + cur.execute( + """create or replace function create_snapshots(n integer) returns void as $$ + declare + i integer; + begin + for i in 1..n loop + perform pg_log_standby_snapshot(); + end loop; + end; $$ language plpgsql""" + ) + cur.execute("set statement_timeout=0") + cur.execute("select create_snapshots(10000)") + # Wait logical replication to sync + logical_replication_sync(vanilla_pg, endpoint) + time.sleep(10) + + # Check layer file sizes + timeline_path = "{}/tenants/{}/timelines/{}/".format( + env.pageserver.workdir, env.initial_tenant, timeline + ) + log.info(f"Check {timeline_path}") + for filename in os.listdir(timeline_path): + if filename.startswith("00000"): + log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}") + assert os.path.getsize(timeline_path + filename) < 512_000_000