mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
## Problem We are currently supporting two read paths. No bueno. ## Summary of changes High level: use vectored read path to serve get page requests - gated by `get_impl` config Low level: 1. Add ps config, `get_impl` to specify which read path to use when serving get page requests 2. Fix base cached image handling for the vectored read path. This was subtly broken: previously we would not mark keys that went past their cached lsn as complete. This is a self standing change which could be its own PR, but I've included it here because writing separate tests for it is tricky. 3. Fork get page to use either the legacy or vectored implementation 4. Validate the use of vectored read path when serving get page requests against the legacy implementation. Controlled by `validate_vectored_get` ps config. 5. Use the vectored read path to serve get page requests in tests (with validation). ## Note Since the vectored read path does not go through the page cache to read buffers, this change also amounts to a removal of the buffer page cache. Materialized page cache is still used.
80 lines
2.7 KiB
Python
80 lines
2.7 KiB
Python
import os
|
|
import random
|
|
import threading
|
|
import time
|
|
from typing import List
|
|
|
|
from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
|
|
from fixtures.utils import query_scalar
|
|
|
|
|
|
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
|
|
if build_type == "debug":
|
|
# Disable vectored read path cross validation since it makes the test time out.
|
|
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
|
|
|
|
env = neon_env_builder.init_start()
|
|
|
|
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
|
os.mkdir(cache_dir)
|
|
|
|
env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME)
|
|
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
|
|
|
endpoint = env.endpoints.create_start(
|
|
"test_local_file_cache_unlink",
|
|
config_lines=[
|
|
"shared_buffers='1MB'",
|
|
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
|
"neon.max_file_cache_size='64MB'",
|
|
"neon.file_cache_size_limit='10MB'",
|
|
],
|
|
)
|
|
|
|
cur = endpoint.connect().cursor()
|
|
|
|
n_rows = 100000
|
|
n_threads = 20
|
|
n_updates_per_thread = 10000
|
|
n_updates_per_connection = 1000
|
|
n_total_updates = n_threads * n_updates_per_thread
|
|
|
|
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
|
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
|
|
|
# Start threads that will perform random UPDATEs. Each UPDATE
|
|
# increments the counter on the row, so that we can check at the
|
|
# end that the sum of all the counters match the number of updates
|
|
# performed (plus the initial 1 on each row).
|
|
#
|
|
# Furthermore, each thread will reconnect between every 1000 updates.
|
|
def run_updates():
|
|
n_updates_performed = 0
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
for _ in range(n_updates_per_thread):
|
|
id = random.randint(1, n_rows)
|
|
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
|
n_updates_performed += 1
|
|
if n_updates_performed % n_updates_per_connection == 0:
|
|
cur.close()
|
|
conn.close()
|
|
conn = endpoint.connect()
|
|
cur = conn.cursor()
|
|
|
|
threads: List[threading.Thread] = []
|
|
for _i in range(n_threads):
|
|
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
|
thread.start()
|
|
threads.append(thread)
|
|
|
|
time.sleep(5)
|
|
|
|
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
|
os.rename(cache_dir, new_cache_dir)
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|