test_local_file_cache_unlink: make runtime independent of getpage latency

This commit is contained in:
Christian Schwarz
2024-06-19 12:19:21 +02:00
parent 8602096fc4
commit 1f9a39fb2e

View File

@@ -1,4 +1,5 @@
import os
import queue
import random
import threading
import time
@@ -8,11 +9,7 @@ from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
if build_type == "debug":
# Disable vectored read path cross validation since it makes the test time out.
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
cache_dir = os.path.join(env.repo_dir, "file_cache")
@@ -33,11 +30,10 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
cur = endpoint.connect().cursor()
stop = threading.Event()
n_rows = 100000
n_threads = 20
n_updates_per_thread = 10000
n_updates_per_connection = 1000
n_total_updates = n_threads * n_updates_per_thread
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
@@ -48,11 +44,11 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates():
def run_updates(n_updates_performed_q: queue.Queue[int]):
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
for _ in range(n_updates_per_thread):
while not stop.is_set():
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
@@ -61,19 +57,28 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
n_updates_performed_q.put(n_updates_performed)
n_updates_performed_q: queue.Queue[int] = queue.Queue()
threads: List[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(), daemon=True)
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
thread.start()
threads.append(thread)
time.sleep(5)
# unlink, this is what we're actually testing
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
os.rename(cache_dir, new_cache_dir)
time.sleep(10)
stop.set()
n_updates_performed = 0
for thread in threads:
thread.join()
n_updates_performed += n_updates_performed_q.get()
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed