From cca517fb942cf8b7afb70f11410ca446fb15eb40 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 23 Dec 2024 11:36:47 +0200 Subject: [PATCH] Add test for prewarm under workload --- test_runner/regress/test_lfc_prewarm.py | 75 +++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index b71f2a6efa..a27f1423e5 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,3 +1,5 @@ +import random +import threading import time import pytest @@ -53,3 +55,76 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv): assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 assert prewarm_info[1] > 0 + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + n_threads = 4 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute( + "create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 128))" + ) + cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + running = True + + def workload(): + conn = endpoint.connect() + cur = conn.cursor() + n_transfers = 0 + while running: + src = random.randint(1, n_records) + dst = random.randint(1, n_records) + cur.execute("update accounts set balance=balance-100 where id=%s", (src,)) + cur.execute("update accounts set balance=balance+100 where id=%s", (dst,)) + n_transfers += 1 + log.info(f"Number of transfers: {n_transfers}") + + def prewarm(): + conn = endpoint.connect() + cur = conn.cursor() + n_prewarms = 0 + while running: + cur.execute("alter system set neon.file_cache_size_limit='1MB'") + cur.execute("select pg_reload_conf()") + cur.execute("alter system set neon.file_cache_size_limit='1GB'") + cur.execute("select pg_reload_conf()") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + n_prewarms += 1 + log.info(f"Number of prewarms: {n_prewarms}") + + workload_threads = [] + for _ in range(n_threads): + t = threading.Thread(target=workload) + workload_threads.append(t) + t.start() + + prewarm_thread = threading.Thread(target=prewarm) + prewarm_thread.start() + + time.sleep(100) + + running = False + for t in workload_threads: + t.join() + prewarm_thread.join() + + cur.execute("select sum(balance) from accounts") + total_balance = cur.fetchall()[0][0] + assert total_balance == 0