Files
neon/test_runner/regress/test_layer_bloating.py
Konstantin Knizhnik f3d7d23805 Some small WAL records can write a lot of data to KV storage, so perform checkpoint check more frequently (#6639)
## Problem

See
https://neondb.slack.com/archives/C04DGM6SMTM/p1707149618314539?thread_ts=1707081520.140049&cid=C04DGM6SMTM

## Summary of changes


Perform checkpoint check after processing `ingest_batch_size` (default
100) WAL records.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-02-07 08:47:19 +02:00

67 lines
2.3 KiB
Python

import os
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
logical_replication_sync,
)
from fixtures.pg_version import PgVersion
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
if env.pg_version != PgVersion.V16:
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
endpoint = env.endpoints.create_start(
"test_logical_replication", config_lines=["log_statement=all"]
)
log.info("postgres is running on 'test_logical_replication' branch")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# create table...
cur.execute("create table t(pk integer primary key)")
cur.execute("create publication pub1 for table t")
# Create slot to hold WAL
cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key)")
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
cur.execute(
"""create or replace function create_snapshots(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
perform pg_log_standby_snapshot();
end loop;
end; $$ language plpgsql"""
)
cur.execute("set statement_timeout=0")
cur.execute("select create_snapshots(10000)")
# Wait logical replication to sync
logical_replication_sync(vanilla_pg, endpoint)
time.sleep(10)
# Check layer file sizes
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, env.initial_tenant, timeline
)
log.info(f"Check {timeline_path}")
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
assert os.path.getsize(timeline_path + filename) < 512_000_000