Files
neon/test_runner/batch_others/test_timeline_size.py
Thang Pham 7da47d8a0a Fix timeline physical size flaky tests (#2244)
Resolves #2212.

- use `wait_for_last_flush_lsn` in `test_timeline_physical_size_*` tests

## Context
Need to wait for the pageserver to catch up with the compute's last flush LSN because during the timeline physical size API call, it's possible that there are running `LayerFlushThread` threads. These threads flush new layers into disk and hence update the physical size. This results in a mismatch between the physical size reported by the API and the actual physical size on disk.

### Note
The `LayerFlushThread` threads are processed **concurrently**, so it's possible that the above error still persists even with this patch. However, making the tests wait to finish processing all the WALs (not flushing) before calculating the physical size should help reduce the "flakiness" significantly
2022-08-12 14:28:50 +07:00

355 lines
14 KiB
Python

from contextlib import closing
import random
from uuid import UUID
import re
import psycopg2.extras
import psycopg2.errors
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, assert_timeline_local, wait_for_last_flush_lsn
from fixtures.log_helper import log
import time
from fixtures.utils import get_timeline_dir_size
def test_timeline_size(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
timeline_details = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
assert timeline_details['local']['current_logical_size'] == timeline_details['local'][
'current_logical_size_non_incremental']
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
cur.execute("CREATE TABLE foo (t text)")
cur.execute("""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 10) g
""")
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute("TRUNCATE foo")
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
timeline_details = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
assert timeline_details['local']['current_logical_size'] == timeline_details['local'][
'current_logical_size_non_incremental']
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute('CREATE DATABASE foodb')
with closing(pgmain.connect(dbname='foodb')) as conn:
with conn.cursor() as cur2:
cur2.execute("CREATE TABLE foo (t text)")
cur2.execute("""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 10) g
""")
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute('DROP DATABASE foodb')
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
# wait until received_lsn_lag is 0
def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60):
started_at = time.time()
received_lsn_lag = 1
while received_lsn_lag > 0:
elapsed = time.time() - started_at
if elapsed > timeout:
raise RuntimeError(
f"timed out waiting for pageserver to reach pg_current_wal_flush_lsn()")
res = pgmain.safe_psql('''
SELECT
pg_size_pretty(pg_cluster_size()),
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM backpressure_lsns();
''')[0]
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
received_lsn_lag = res[1]
time.sleep(polling_interval)
def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch('test_timeline_size_quota')
client = env.pageserver.http_client()
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
assert res['local']["current_logical_size"] == res['local'][
"current_logical_size_non_incremental"]
pgmain = env.postgres.create_start(
"test_timeline_size_quota",
# Set small limit for the test
config_lines=['neon.max_cluster_size=30MB'])
log.info("postgres is running on 'test_timeline_size_quota' branch")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("CREATE TABLE foo (t text)")
wait_for_pageserver_catchup(pgmain)
# Insert many rows. This query must fail because of space limit
try:
cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
''')
wait_for_pageserver_catchup(pgmain)
cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 500000) g
''')
# If we get here, the timeline size limit failed
log.error("Query unexpectedly succeeded")
assert False
except psycopg2.errors.DiskFull as err:
log.info(f"Query expectedly failed with: {err}")
# drop table to free space
cur.execute('DROP TABLE foo')
wait_for_pageserver_catchup(pgmain)
# create it again and insert some rows. This query must succeed
cur.execute("CREATE TABLE foo (t text)")
cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 10000) g
''')
wait_for_pageserver_catchup(pgmain)
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
pg_cluster_size = cur.fetchone()
log.info(f"pg_cluster_size = {pg_cluster_size}")
def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_init')
pg = env.postgres.create_start("test_timeline_physical_size_init")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 1000) g""",
])
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
# restart the pageserer to force calculating timeline's initial physical size
env.pageserver.stop()
env.pageserver.start()
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_checkpoint')
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 1000) g""",
])
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
# Disable background compaction as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='10m'}"
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_compaction')
pg = env.postgres.create_start("test_timeline_physical_size_post_compaction")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g""",
])
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
env.pageserver.safe_psql(f"compact {env.initial_tenant.hex} {new_timeline_id.hex}")
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
neon_env_builder.pageserver_config_override = \
"tenant_config={checkpoint_distance=100000, compaction_period='10m', gc_period='10m', pitr_interval='1s'}"
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_post_gc')
pg = env.postgres.create_start("test_timeline_physical_size_post_gc")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g""",
])
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
pg.safe_psql("""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
""")
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
env.pageserver.safe_psql(f"do_gc {env.initial_tenant.hex} {new_timeline_id.hex} 0")
assert_physical_size(env, env.initial_tenant, new_timeline_id)
def test_timeline_physical_size_metric(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch('test_timeline_physical_size_metric')
pg = env.postgres.create_start("test_timeline_physical_size_metric")
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g""",
])
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant.hex} {new_timeline_id.hex}")
# get the metrics and parse the metric for the current timeline's physical size
metrics = env.pageserver.http_client().get_metrics()
matches = re.search(
f'^pageserver_current_physical_size{{tenant_id="{env.initial_tenant.hex}",timeline_id="{new_timeline_id.hex}"}} (\\S+)$',
metrics,
re.MULTILINE)
assert matches
# assert that the metric matches the actual physical size on disk
tl_physical_size_metric = int(matches.group(1))
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
def test_tenant_physical_size(neon_simple_env: NeonEnv):
random.seed(100)
env = neon_simple_env
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
def get_timeline_physical_size(timeline: UUID):
res = client.timeline_detail(tenant, timeline)
return res['local']['current_physical_size_non_incremental']
timeline_total_size = get_timeline_physical_size(timeline)
for i in range(10):
n_rows = random.randint(100, 1000)
timeline = env.neon_cli.create_branch(f"test_tenant_physical_size_{i}", tenant_id=tenant)
pg = env.postgres.create_start(f"test_tenant_physical_size_{i}", tenant_id=tenant)
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
f"INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, {n_rows}) g",
])
wait_for_last_flush_lsn(env, pg, tenant, timeline)
env.pageserver.safe_psql(f"checkpoint {tenant.hex} {timeline.hex}")
timeline_total_size += get_timeline_physical_size(timeline)
pg.stop()
tenant_physical_size = int(client.tenant_status(tenant_id=tenant)['current_physical_size'])
assert tenant_physical_size == timeline_total_size
def assert_physical_size(env: NeonEnv, tenant_id: UUID, timeline_id: UUID):
"""Check the current physical size returned from timeline API
matches the total physical size of the timeline on disk"""
client = env.pageserver.http_client()
res = assert_timeline_local(client, tenant_id, timeline_id)
timeline_path = env.timeline_dir(tenant_id, timeline_id)
assert res["local"]["current_physical_size"] == res["local"][
"current_physical_size_non_incremental"]
assert res["local"]["current_physical_size"] == get_timeline_dir_size(timeline_path)