Compute/LFC: Apply limits consistently (#10449)

Otherwise we might hit ERRORs in otherwise safe situations (such as user
queries), which isn't a great user experience.

## Problem

https://github.com/neondatabase/neon/pull/10376

## Summary of changes

Instead of accepting internal errors as acceptable, we ensure we don't
exceed our allocated usage.
This commit is contained in:
Matthias van de Meent
2025-01-20 19:29:21 +01:00
committed by GitHub
parent 72130d7d6c
commit e781cf6dd8
2 changed files with 139 additions and 42 deletions

View File

@@ -911,57 +911,85 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
}
else
/*-----------
* If the chunk wasn't already in the LFC then we have these
* options, in order of preference:
*
* Unless there is no space available, we can:
* 1. Use an entry from the `holes` list, and
* 2. Create a new entry.
* We can always, regardless of space in the LFC:
* 3. evict an entry from LRU, and
* 4. ignore the write operation (the least favorite option)
*/
else if (lfc_ctl->used < lfc_ctl->limit)
{
/*
* We have two choices if all cache pages are pinned (i.e. used in IO
* operations):
*
* 1) Wait until some of this operation is completed and pages is
* unpinned.
*
* 2) Allocate one more chunk, so that specified cache size is more
* recommendation than hard limit.
*
* As far as probability of such event (that all pages are pinned) is
* considered to be very very small: there are should be very large
* number of concurrent IO operations and them are limited by
* max_connections, we prefer not to complicate code and use second
* approach.
*/
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
}
CriticalAssert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else if (!dlist_is_empty(&lfc_ctl->holes))
if (!dlist_is_empty(&lfc_ctl->holes))
{
/* We can reuse a hole that was left behind when the LFC was shrunk previously */
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found);
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;
hash_search_with_hash_value(lfc_hash, &hole->key,
hole->hash, HASH_REMOVE, &hole_found);
CriticalAssert(hole_found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
entry->offset = offset; /* reuse the hole */
}
else
{
lfc_ctl->used += 1;
entry->offset = lfc_ctl->size++; /* allocate new chunk at end
* of file */
entry->offset = lfc_ctl->size++;/* allocate new chunk at end
* of file */
}
}
/*
* We've already used up all allocated LFC entries.
*
* If we can clear an entry from the LRU, do that.
* If we can't (e.g. because all other slots are being accessed)
* then we will remove this entry from the hash and continue
* on to the next chunk, as we may not exceed the limit.
*/
else if (!dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
}
CriticalAssert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key,
victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else
{
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);
/*
* We can't process this chunk due to lack of space in LFC,
* so skip to the next one
*/
LWLockRelease(lfc_lock);
blkno += blocks_in_chunk;
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
continue;
}
if (!found)
{
entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);

View File

@@ -7,9 +7,78 @@ import threading
import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.utils import USE_LFC, query_scalar
"""
Test whether LFC doesn't error out when the LRU is empty, but the LFC is
already at its maximum size.
If we don't handle this safely, we might allocate more hash entries than
otherwise considered safe, thus causing ERRORs in hash_search(HASH_ENTER) once
we hit lfc->used >= lfc->limit.
"""
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_all_pinned(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size='1MB'",
"neon.file_cache_size_limit='1MB'",
],
)
top_cur = endpoint.connect().cursor()
stop = threading.Event()
n_rows = 10000
n_threads = 5
n_updates_per_connection = 1000
top_cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
top_cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates(n_updates_performed_q: queue.Queue[int]):
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
while not stop.is_set():
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
n_updates_performed_q.put(n_updates_performed)
n_updates_performed_q: queue.Queue[int] = queue.Queue()
threads: list[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
thread.start()
threads.append(thread)
time.sleep(15)
stop.set()
n_updates_performed = 0
for thread in threads:
thread.join()
n_updates_performed += n_updates_performed_q.get()
assert query_scalar(top_cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):