diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 426b176af9..8bcc6bf924 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -36,6 +36,8 @@ DATA = \ neon--1.2--1.3.sql \ neon--1.3--1.4.sql \ neon--1.4--1.5.sql \ + neon--1.5--1.6.sql \ + neon--1.6--1.5.sql \ neon--1.5--1.4.sql \ neon--1.4--1.3.sql \ neon--1.3--1.2.sql \ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 924e0055c1..ecc55bb540 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -793,8 +793,10 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers) for (uint32 i = 0; i < n_workers; i++) { - while (true) + bool interrupted; + do { + interrupted = false; PG_TRY(); { BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]); @@ -802,15 +804,16 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers) { elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status); } - break; } PG_CATCH(); { elog(LOG, "LFC: cancel prewarm"); lfc_ctl->prewarm_canceled = true; + interrupted = true; } PG_END_TRY(); - } + } while (interrupted); + if (!lfc_ctl->prewarm_workers[i].completed) { /* Background worker doesn't set completion time: it means that it was abnormally terminated */ @@ -2125,3 +2128,82 @@ approximate_working_set_size(PG_FUNCTION_ARGS) } PG_RETURN_NULL(); } + +PG_FUNCTION_INFO_V1(get_local_cache_state); + +Datum +get_local_cache_state(PG_FUNCTION_ARGS) +{ + size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0); + FileCacheState* fcs = lfc_get_state(max_entries); + if (fcs != NULL) + PG_RETURN_BYTEA_P((bytea*)fcs); + else + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(prewarm_local_cache); + +Datum +prewarm_local_cache(PG_FUNCTION_ARGS) +{ + bytea* state = PG_GETARG_BYTEA_PP(0); + uint32 n_workers = PG_GETARG_INT32(1); + FileCacheState* fcs = (FileCacheState*)state; + + lfc_prewarm(fcs, n_workers); + + PG_RETURN_NULL(); +} + +PG_FUNCTION_INFO_V1(get_prewarm_info); + +Datum +get_prewarm_info(PG_FUNCTION_ARGS) +{ + Datum values[4]; + bool nulls[4]; + TupleDesc tupdesc; + uint32 prewarmed_pages = 0; + uint32 skipped_pages = 0; + uint32 active_workers = 0; + uint32 total_pages; + size_t n_workers; + + if (lfc_size_limit == 0) + PG_RETURN_NULL(); + + LWLockAcquire(lfc_lock, LW_SHARED); + if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0) + { + LWLockRelease(lfc_lock); + PG_RETURN_NULL(); + } + n_workers = lfc_ctl->n_prewarm_workers; + total_pages = lfc_ctl->total_prewarm_pages; + for (size_t i = 0; i < n_workers; i++) + { + PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i]; + prewarmed_pages += ws->prewarmed_pages; + skipped_pages += ws->skipped_pages; + active_workers += ws->completed != 0; + } + LWLockRelease(lfc_lock); + + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(total_pages); + values[1] = Int32GetDatum(prewarmed_pages); + values[2] = Int32GetDatum(skipped_pages); + values[3] = Int32GetDatum(active_workers); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + diff --git a/pgxn/neon/neon--1.5--1.6.sql b/pgxn/neon/neon--1.5--1.6.sql new file mode 100644 index 0000000000..c05f0f87aa --- /dev/null +++ b/pgxn/neon/neon--1.5--1.6.sql @@ -0,0 +1,22 @@ +\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit + +CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer) +RETURNS record +AS 'MODULE_PATHNAME', 'get_prewarm_info' +LANGUAGE C STRICT +PARALLEL SAFE; + +CREATE FUNCTION get_local_cache_state(max_chunks integer default null) +RETURNS bytea +AS 'MODULE_PATHNAME', 'get_local_cache_state' +LANGUAGE C +PARALLEL UNSAFE; + +CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1) +RETURNS void +AS 'MODULE_PATHNAME', 'prewarm_local_cache' +LANGUAGE C STRICT +PARALLEL UNSAFE; + + + diff --git a/pgxn/neon/neon--1.6--1.5.sql b/pgxn/neon/neon--1.6--1.5.sql new file mode 100644 index 0000000000..57512980f5 --- /dev/null +++ b/pgxn/neon/neon--1.6--1.5.sql @@ -0,0 +1,7 @@ +DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer); + +DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer); + +DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1); + + diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py new file mode 100644 index 0000000000..dd0ae1921d --- /dev/null +++ b/test_runner/regress/test_lfc_prewarm.py @@ -0,0 +1,147 @@ +import random +import threading +import time + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import USE_LFC + + +def check_pinned_entries(cur): + # some LFC buffer can be temporary locked by autovacuum or background writer + for _ in range(10): + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'") + n_pinned = cur.fetchall()[0][0] + if n_pinned == 0: + break + time.sleep(1) + assert n_pinned == 0 + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 1000000 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))") + cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + endpoint.stop() + endpoint.start() + + conn = endpoint.connect() + cur = conn.cursor() + time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version + cur.execute("alter extension neon update to '1.6'") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + + cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") + lfc_used_pages = cur.fetchall()[0][0] + log.info(f"Used LFC size: {lfc_used_pages}") + cur.execute("select * from get_prewarm_info()") + prewarm_info = cur.fetchall()[0] + log.info(f"Prewarm info: {prewarm_info}") + log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%") + + assert lfc_used_pages > 10000 + assert ( + prewarm_info[0] > 0 + and prewarm_info[1] > 0 + and prewarm_info[0] == prewarm_info[1] + prewarm_info[2] + ) + + cur.execute("select sum(pk) from t") + assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 + + check_pinned_entries(cur) + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv): + env = neon_simple_env + n_records = 10000 + n_threads = 4 + + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=[ + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000000", + ], + ) + conn = endpoint.connect() + cur = conn.cursor() + cur.execute("create extension neon version '1.6'") + cur.execute( + "create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)" + ) + cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))") + cur.execute("select get_local_cache_state()") + lfc_state = cur.fetchall()[0][0] + + running = True + + def workload(): + conn = endpoint.connect() + cur = conn.cursor() + n_transfers = 0 + while running: + src = random.randint(1, n_records) + dst = random.randint(1, n_records) + cur.execute("update accounts set balance=balance-100 where id=%s", (src,)) + cur.execute("update accounts set balance=balance+100 where id=%s", (dst,)) + n_transfers += 1 + log.info(f"Number of transfers: {n_transfers}") + + def prewarm(): + conn = endpoint.connect() + cur = conn.cursor() + n_prewarms = 0 + while running: + cur.execute("alter system set neon.file_cache_size_limit='1MB'") + cur.execute("select pg_reload_conf()") + cur.execute("alter system set neon.file_cache_size_limit='1GB'") + cur.execute("select pg_reload_conf()") + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + n_prewarms += 1 + log.info(f"Number of prewarms: {n_prewarms}") + + workload_threads = [] + for _ in range(n_threads): + t = threading.Thread(target=workload) + workload_threads.append(t) + t.start() + + prewarm_thread = threading.Thread(target=prewarm) + prewarm_thread.start() + + time.sleep(20) + + running = False + for t in workload_threads: + t.join() + prewarm_thread.join() + + cur.execute("select sum(balance) from accounts") + total_balance = cur.fetchall()[0][0] + assert total_balance == 0 + + check_pinned_entries(cur)