mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Closes #2422. The APIs have been feature gated with the `testing_api!` macro so that they return 400s when support hasn't been compiled in.
90 lines
2.9 KiB
Python
90 lines
2.9 KiB
Python
import asyncio
|
|
import concurrent.futures
|
|
import random
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
|
|
from fixtures.types import TimelineId
|
|
from fixtures.utils import query_scalar
|
|
|
|
# Test configuration
|
|
#
|
|
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
|
|
# UPDATEs on it, using {num_connections} separate connections.
|
|
num_connections = 10
|
|
num_rows = 100000
|
|
updates_to_perform = 10000
|
|
|
|
updates_performed = 0
|
|
|
|
|
|
# Run random UPDATEs on test table
|
|
async def update_table(pg: Postgres):
|
|
global updates_performed
|
|
pg_conn = await pg.connect_async()
|
|
|
|
while updates_performed < updates_to_perform:
|
|
updates_performed += 1
|
|
id = random.randrange(1, num_rows)
|
|
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
|
|
|
|
|
|
# Perform aggressive GC with 0 horizon
|
|
async def gc(env: NeonEnv, timeline: TimelineId):
|
|
pageserver_http = env.pageserver.http_client()
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as pool:
|
|
while updates_performed < updates_to_perform:
|
|
await loop.run_in_executor(
|
|
pool, lambda: pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
|
)
|
|
|
|
|
|
# At the same time, run UPDATEs and GC
|
|
async def update_and_gc(env: NeonEnv, pg: Postgres, timeline: TimelineId):
|
|
workers = []
|
|
for worker_id in range(num_connections):
|
|
workers.append(asyncio.create_task(update_table(pg)))
|
|
workers.append(asyncio.create_task(gc(env, timeline)))
|
|
|
|
# await all workers
|
|
await asyncio.gather(*workers)
|
|
|
|
|
|
#
|
|
# Aggressively force GC, while running queries.
|
|
#
|
|
# (repro for https://github.com/neondatabase/neon/issues/1047)
|
|
#
|
|
def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
|
|
|
|
# Disable pitr, because here we want to test branch creation after GC
|
|
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
|
|
env = neon_env_builder.init_start()
|
|
env.neon_cli.create_branch("test_gc_aggressive", "main")
|
|
pg = env.postgres.create_start("test_gc_aggressive")
|
|
log.info("postgres is running on test_gc_aggressive branch")
|
|
|
|
with pg.cursor() as cur:
|
|
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
|
|
|
|
# Create table, and insert the first 100 rows
|
|
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
|
|
cur.execute(
|
|
f"""
|
|
INSERT INTO foo
|
|
SELECT g, 0, 'long string to consume some space' || g
|
|
FROM generate_series(1, {num_rows}) g
|
|
"""
|
|
)
|
|
cur.execute("CREATE INDEX ON foo(id)")
|
|
|
|
asyncio.run(update_and_gc(env, pg, timeline))
|
|
|
|
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
|
|
r = cur.fetchone()
|
|
assert r is not None
|
|
assert r == (num_rows, updates_to_perform)
|