mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
This introduces a new 'neon_tenant: NeonTestTenant' fixture, which can be used instead of a NeonEnv. All tests using 'neon_tenant' share the same NeonEnv. NeonTestTenant provides many of the same members as NeonEnv, like 'create_branch', 'endpoints' etc. But not the ones that deal with storage, nor create_tenant. I converted some existing tests to use the new fixture. More could be converted, if we add more features to NeonTestTenant: - Many tests grab the pageserver HTTP client and call GC / compact functions on the pageserver. They are per-tenant operations, so they could safely be done one a shared pageserver too. - Some tests use failpoints to add delays or pauses to various operations in the pageserver. If the failpoints were tenant- or timeline-scoped, they could be used on a shared pageserver too. - Some tests print intoduce errors in the pageserver logs. They set allowlists for those error messages, because any unexpected errors in the logs cause the test to fail. If the allowlist was tenant- or timleine-scoped, we could allow those tests to share the env. - Some tests use helper functions like check_restored_datadir_content or fork_at_current_lsn, which take a NeonEnv as argument. They could use NeonTestTenant instead (or become methods in NeonTestTenant). - Some tests want to use extra tenant config for the initial tenant. We could expand the neon_tenant fixture to allow that. (Perhaps introduce NeonTestTenantBuilder which allows setting config before creating the initial tenant) - Some tests create multiple tenants. That could be allowed in a shared env, as long as we track which tenants belong to the test. See https://github.com/neondatabase/neon/issues/9193
81 lines
2.6 KiB
Python
81 lines
2.6 KiB
Python
import os
|
|
import queue
|
|
import random
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
from fixtures.neon_tenant import NeonTestTenant
|
|
from fixtures.utils import query_scalar
|
|
|
|
|
|
def test_local_file_cache_unlink(neon_tenant: NeonTestTenant, test_output_dir: Path):
|
|
cache_dir = test_output_dir / Path("file_cache")
|
|
os.mkdir(cache_dir)
|
|
|
|
endpoint = neon_tenant.endpoints.create_start(
|
|
"main",
|
|
config_lines=[
|
|
"shared_buffers='1MB'",
|
|
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
|
"neon.max_file_cache_size='64MB'",
|
|
"neon.file_cache_size_limit='10MB'",
|
|
],
|
|
)
|
|
|
|
cur = endpoint.connect().cursor()
|
|
|
|
stop = threading.Event()
|
|
n_rows = 100000
|
|
n_threads = 20
|
|
n_updates_per_connection = 1000
|
|
|
|
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
|
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
|
|
|
# Start threads that will perform random UPDATEs. Each UPDATE
|
|
# increments the counter on the row, so that we can check at the
|
|
# end that the sum of all the counters match the number of updates
|
|
# performed (plus the initial 1 on each row).
|
|
#
|
|
# Furthermore, each thread will reconnect between every 1000 updates.
|
|
def run_updates(n_updates_performed_q: queue.Queue[int]):
|
|
n_updates_performed = 0
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
while not stop.is_set():
|
|
id = random.randint(1, n_rows)
|
|
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
|
n_updates_performed += 1
|
|
if n_updates_performed % n_updates_per_connection == 0:
|
|
cur.close()
|
|
conn.close()
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
n_updates_performed_q.put(n_updates_performed)
|
|
|
|
n_updates_performed_q: queue.Queue[int] = queue.Queue()
|
|
threads: List[threading.Thread] = []
|
|
for _i in range(n_threads):
|
|
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
|
|
thread.start()
|
|
threads.append(thread)
|
|
|
|
time.sleep(5)
|
|
|
|
# unlink, this is what we're actually testing
|
|
new_cache_dir = test_output_dir / Path("file_cache_new")
|
|
os.rename(cache_dir, new_cache_dir)
|
|
|
|
time.sleep(10)
|
|
|
|
stop.set()
|
|
|
|
n_updates_performed = 0
|
|
for thread in threads:
|
|
thread.join()
|
|
n_updates_performed += n_updates_performed_q.get()
|
|
|
|
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed
|