mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Merge remote-tracking branch 'origin/communicator-rewrite' into quantumish/lfc-resize-static-shmem
This commit is contained in:
@@ -28,7 +28,10 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
|
||||
let compute = compute.clone();
|
||||
|
||||
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
|
||||
|
||||
let runtime = tokio::runtime::Handle::current();
|
||||
thread::spawn(move || {
|
||||
let _rt_guard = runtime.enter();
|
||||
let _entered = span.entered();
|
||||
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
|
||||
// TODO: might need stronger error feedback than logging an warning.
|
||||
|
||||
@@ -423,11 +423,20 @@ communicator_new_get_lfc_state(size_t max_entries)
|
||||
struct FileCacheIterator iter;
|
||||
FileCacheState *fcs;
|
||||
uint8 *bitmap;
|
||||
uint64_t num_pages_used;
|
||||
size_t n_entries;
|
||||
size_t state_size;
|
||||
size_t n_pages;
|
||||
|
||||
/* TODO: Max(max_entries, <current # of entries in cache>) */
|
||||
size_t n_entries = max_entries;
|
||||
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1);
|
||||
size_t n_pages = 0;
|
||||
/*
|
||||
* Calculate the size of the returned state.
|
||||
*
|
||||
* FIXME: if the LFC is very large, this can exceed 1 GB, resulting in a
|
||||
* palloc error.
|
||||
*/
|
||||
num_pages_used = bcomm_cache_get_num_pages_used(my_bs);
|
||||
n_entries = Min(num_pages_used, max_entries);
|
||||
state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1);
|
||||
|
||||
fcs = (FileCacheState *) palloc0(state_size);
|
||||
SET_VARSIZE(fcs, state_size);
|
||||
@@ -437,6 +446,7 @@ communicator_new_get_lfc_state(size_t max_entries)
|
||||
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
|
||||
|
||||
bcomm_cache_iterate_begin(my_bs, &iter);
|
||||
n_pages = 0;
|
||||
while (n_pages < max_entries && bcomm_cache_iterate_next(my_bs, &iter))
|
||||
{
|
||||
BufferTag tag;
|
||||
@@ -455,6 +465,7 @@ communicator_new_get_lfc_state(size_t max_entries)
|
||||
BITMAP_SET(bitmap, i);
|
||||
}
|
||||
fcs->n_pages = n_pages;
|
||||
fcs->n_chunks = n_pages;
|
||||
|
||||
return fcs;
|
||||
}
|
||||
|
||||
@@ -440,121 +440,131 @@ lfc_prewarm_with_async_requests(FileCacheState *fcs)
|
||||
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
|
||||
Assert(n_entries != 0);
|
||||
|
||||
LWLockAcquire(prewarm_lock, LW_EXCLUSIVE);
|
||||
|
||||
/* Do not prewarm more entries than LFC limit */
|
||||
/* FIXME */
|
||||
#if 0
|
||||
if (prewarm_ctl->limit <= prewarm_ctl->size)
|
||||
PG_TRY();
|
||||
{
|
||||
elog(LOG, "LFC: skip prewarm because LFC is already filled");
|
||||
LWLockRelease(prewarm_lock);
|
||||
return;
|
||||
}
|
||||
LWLockAcquire(prewarm_lock, LW_EXCLUSIVE);
|
||||
|
||||
/* Do not prewarm more entries than LFC limit */
|
||||
/* FIXME */
|
||||
#if 0
|
||||
if (prewarm_ctl->limit <= prewarm_ctl->size)
|
||||
{
|
||||
elog(LOG, "LFC: skip prewarm because LFC is already filled");
|
||||
LWLockRelease(prewarm_lock);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (prewarm_ctl->prewarm_active)
|
||||
{
|
||||
LWLockRelease(prewarm_lock);
|
||||
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
|
||||
}
|
||||
prewarm_ctl->n_prewarm_entries = n_entries;
|
||||
prewarm_ctl->n_prewarm_workers = -1;
|
||||
prewarm_ctl->prewarm_active = true;
|
||||
prewarm_ctl->prewarm_canceled = false;
|
||||
|
||||
/* Calculate total number of pages to be prewarmed */
|
||||
prewarm_ctl->total_prewarm_pages = fcs->n_pages;
|
||||
|
||||
LWLockRelease(prewarm_lock);
|
||||
|
||||
elog(LOG, "LFC: start prewarming");
|
||||
lfc_do_prewarm = true;
|
||||
lfc_prewarm_cancel = false;
|
||||
|
||||
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
|
||||
|
||||
blocks_per_chunk = 1 << fcs->chunk_size_log;
|
||||
|
||||
bitno = 0;
|
||||
for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++)
|
||||
{
|
||||
BufferTag *chunk_tag = &fcs->chunks[chunkno];
|
||||
BlockNumber request_startblkno = InvalidBlockNumber;
|
||||
BlockNumber request_endblkno;
|
||||
|
||||
if (!BufferTagIsValid(chunk_tag))
|
||||
elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum);
|
||||
|
||||
if (lfc_prewarm_cancel)
|
||||
if (prewarm_ctl->prewarm_active)
|
||||
{
|
||||
prewarm_ctl->prewarm_canceled = true;
|
||||
break;
|
||||
LWLockRelease(prewarm_lock);
|
||||
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
|
||||
}
|
||||
prewarm_ctl->n_prewarm_entries = n_entries;
|
||||
prewarm_ctl->n_prewarm_workers = -1;
|
||||
prewarm_ctl->prewarm_active = true;
|
||||
prewarm_ctl->prewarm_canceled = false;
|
||||
|
||||
/* take next chunk */
|
||||
for (int j = 0; j < blocks_per_chunk; j++)
|
||||
/* Calculate total number of pages to be prewarmed */
|
||||
prewarm_ctl->total_prewarm_pages = fcs->n_pages;
|
||||
|
||||
LWLockRelease(prewarm_lock);
|
||||
|
||||
elog(LOG, "LFC: start prewarming");
|
||||
lfc_do_prewarm = true;
|
||||
lfc_prewarm_cancel = false;
|
||||
|
||||
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
|
||||
|
||||
blocks_per_chunk = 1 << fcs->chunk_size_log;
|
||||
|
||||
bitno = 0;
|
||||
for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++)
|
||||
{
|
||||
BlockNumber blkno = chunk_tag->blockNum + j;
|
||||
BufferTag *chunk_tag = &fcs->chunks[chunkno];
|
||||
BlockNumber request_startblkno = InvalidBlockNumber;
|
||||
BlockNumber request_endblkno;
|
||||
|
||||
if (BITMAP_ISSET(bitmap, bitno))
|
||||
if (!BufferTagIsValid(chunk_tag))
|
||||
elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum);
|
||||
|
||||
if (lfc_prewarm_cancel)
|
||||
{
|
||||
if (request_startblkno != InvalidBlockNumber)
|
||||
prewarm_ctl->prewarm_canceled = true;
|
||||
break;
|
||||
}
|
||||
|
||||
/* take next chunk */
|
||||
for (int j = 0; j < blocks_per_chunk; j++)
|
||||
{
|
||||
BlockNumber blkno = chunk_tag->blockNum + j;
|
||||
|
||||
if (BITMAP_ISSET(bitmap, bitno))
|
||||
{
|
||||
if (request_endblkno == blkno)
|
||||
if (request_startblkno != InvalidBlockNumber)
|
||||
{
|
||||
/* append this block to the request */
|
||||
request_endblkno++;
|
||||
if (request_endblkno == blkno)
|
||||
{
|
||||
/* append this block to the request */
|
||||
request_endblkno++;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* flush this request, and start new one */
|
||||
communicator_new_prefetch_register_bufferv(
|
||||
BufTagGetNRelFileInfo(*chunk_tag),
|
||||
chunk_tag->forkNum,
|
||||
request_startblkno,
|
||||
request_endblkno - request_startblkno
|
||||
);
|
||||
request_startblkno = blkno;
|
||||
request_endblkno = blkno + 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* flush this request, and start new one */
|
||||
communicator_new_prefetch_register_bufferv(
|
||||
BufTagGetNRelFileInfo(*chunk_tag),
|
||||
chunk_tag->forkNum,
|
||||
request_startblkno,
|
||||
request_endblkno - request_startblkno
|
||||
);
|
||||
/* flush this request, if any, and start new one */
|
||||
if (request_startblkno != InvalidBlockNumber)
|
||||
{
|
||||
communicator_new_prefetch_register_bufferv(
|
||||
BufTagGetNRelFileInfo(*chunk_tag),
|
||||
chunk_tag->forkNum,
|
||||
request_startblkno,
|
||||
request_endblkno - request_startblkno
|
||||
);
|
||||
}
|
||||
request_startblkno = blkno;
|
||||
request_endblkno = blkno + 1;
|
||||
}
|
||||
prewarm_ctl->prewarmed_pages += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* flush this request, if any, and start new one */
|
||||
if (request_startblkno != InvalidBlockNumber)
|
||||
{
|
||||
communicator_new_prefetch_register_bufferv(
|
||||
BufTagGetNRelFileInfo(*chunk_tag),
|
||||
chunk_tag->forkNum,
|
||||
request_startblkno,
|
||||
request_endblkno - request_startblkno
|
||||
);
|
||||
}
|
||||
request_startblkno = blkno;
|
||||
request_endblkno = blkno + 1;
|
||||
}
|
||||
prewarm_ctl->prewarmed_pages += 1;
|
||||
bitno++;
|
||||
}
|
||||
bitno++;
|
||||
|
||||
/* flush this request */
|
||||
communicator_new_prefetch_register_bufferv(
|
||||
BufTagGetNRelFileInfo(*chunk_tag),
|
||||
chunk_tag->forkNum,
|
||||
request_startblkno,
|
||||
request_endblkno - request_startblkno
|
||||
);
|
||||
request_startblkno = request_endblkno = InvalidBlockNumber;
|
||||
}
|
||||
|
||||
/* flush this request */
|
||||
communicator_new_prefetch_register_bufferv(
|
||||
BufTagGetNRelFileInfo(*chunk_tag),
|
||||
chunk_tag->forkNum,
|
||||
request_startblkno,
|
||||
request_endblkno - request_startblkno
|
||||
);
|
||||
request_startblkno = request_endblkno = InvalidBlockNumber;
|
||||
elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages);
|
||||
prewarm_ctl->completed = GetCurrentTimestamp();
|
||||
|
||||
LWLockAcquire(prewarm_lock, LW_EXCLUSIVE);
|
||||
prewarm_ctl->prewarm_active = false;
|
||||
LWLockRelease(prewarm_lock);
|
||||
}
|
||||
|
||||
elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages);
|
||||
prewarm_ctl->completed = GetCurrentTimestamp();
|
||||
|
||||
LWLockAcquire(prewarm_lock, LW_EXCLUSIVE);
|
||||
prewarm_ctl->prewarm_active = false;
|
||||
LWLockRelease(prewarm_lock);
|
||||
PG_CATCH();
|
||||
{
|
||||
elog(LOG, "LFC: cancel prewarm");
|
||||
prewarm_ctl->prewarm_canceled = true;
|
||||
prewarm_ctl->prewarm_active = false;
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(get_local_cache_state);
|
||||
@@ -623,7 +633,7 @@ get_prewarm_info(PG_FUNCTION_ARGS)
|
||||
{
|
||||
total_pages = prewarm_ctl->total_prewarm_pages;
|
||||
prewarmed_pages = prewarm_ctl->prewarmed_pages;
|
||||
skipped_pages = prewarm_ctl->prewarmed_pages;
|
||||
skipped_pages = prewarm_ctl->skipped_pages;
|
||||
active_workers = 1;
|
||||
}
|
||||
else
|
||||
|
||||
@@ -157,6 +157,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
|
||||
[
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* ERROR: Not found: Timeline",
|
||||
".*request failed with Unavailable: Timeline .* is not active",
|
||||
]
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -194,7 +195,10 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
|
||||
|
||||
env.neon_cli.mappings_map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"):
|
||||
with pytest.raises(
|
||||
RuntimeError,
|
||||
match=f"Timeline {env.initial_tenant}/{env.initial_timeline} is not active",
|
||||
):
|
||||
env.endpoints.create_start(
|
||||
initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2
|
||||
)
|
||||
|
||||
@@ -101,20 +101,37 @@ def check_prewarmed_contains(
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
@pytest.mark.parametrize("grpc", [True, False])
|
||||
@pytest.mark.parametrize("method", METHOD_VALUES, ids=METHOD_IDS)
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod, grpc: bool):
|
||||
"""
|
||||
Test we can offload endpoint's LFC cache to endpoint storage.
|
||||
Test we can prewarm endpoint with LFC cache loaded from endpoint storage.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
n_records = 1000000
|
||||
|
||||
# The `neon.file_cache_prewarm_limit` GUC sets the max number of *chunks* to
|
||||
# load. So the number of *pages* loaded depends on the chunk size. With the
|
||||
# new communicator, the new LFC implementation doesn't do chunking so the
|
||||
# limit is the number of pages, while with the old implementation, the
|
||||
# default chunk size 1 MB chunks.
|
||||
#
|
||||
# Therefore with the old implementation, 1000 chunks equals 128000 pages, if
|
||||
# all the chunks are fully dense. In practice they are sparse, but should
|
||||
# amount to > 10000 pages anyway. (We have an assertion below that at least
|
||||
# 10000 LFC pages are in use after prewarming)
|
||||
if grpc:
|
||||
prewarm_limit = 15000
|
||||
else:
|
||||
prewarm_limit = 1000
|
||||
|
||||
cfg = [
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
f"neon.file_cache_prewarm_limit={prewarm_limit}",
|
||||
]
|
||||
|
||||
if method == PrewarmMethod.AUTOPREWARM:
|
||||
@@ -123,9 +140,10 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
config_lines=cfg,
|
||||
autoprewarm=True,
|
||||
offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS,
|
||||
grpc=grpc,
|
||||
)
|
||||
else:
|
||||
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
|
||||
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg, grpc=grpc)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
@@ -162,7 +180,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
log.info(f"Used LFC size: {lfc_used_pages}")
|
||||
pg_cur.execute("select * from neon.get_prewarm_info()")
|
||||
total, prewarmed, skipped, _ = pg_cur.fetchall()[0]
|
||||
assert lfc_used_pages > 10000
|
||||
assert lfc_used_pages >= 10000
|
||||
assert total > 0
|
||||
assert prewarmed > 0
|
||||
assert total == prewarmed + skipped
|
||||
@@ -186,7 +204,7 @@ def test_lfc_prewarm_cancel(neon_simple_env: NeonEnv):
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
"neon.file_cache_prewarm_limit=2000000",
|
||||
]
|
||||
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user