diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index 6abfea82e0..153ba5a5be 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -28,7 +28,10 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { let compute = compute.clone(); let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn); + + let runtime = tokio::runtime::Handle::current(); thread::spawn(move || { + let _rt_guard = runtime.enter(); let _entered = span.entered(); if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) { // TODO: might need stronger error feedback than logging an warning. diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index f4088ab264..22a454e422 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -423,11 +423,20 @@ communicator_new_get_lfc_state(size_t max_entries) struct FileCacheIterator iter; FileCacheState *fcs; uint8 *bitmap; + uint64_t num_pages_used; + size_t n_entries; + size_t state_size; + size_t n_pages; - /* TODO: Max(max_entries, ) */ - size_t n_entries = max_entries; - size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1); - size_t n_pages = 0; + /* + * Calculate the size of the returned state. + * + * FIXME: if the LFC is very large, this can exceed 1 GB, resulting in a + * palloc error. + */ + num_pages_used = bcomm_cache_get_num_pages_used(my_bs); + n_entries = Min(num_pages_used, max_entries); + state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries, 1); fcs = (FileCacheState *) palloc0(state_size); SET_VARSIZE(fcs, state_size); @@ -437,6 +446,7 @@ communicator_new_get_lfc_state(size_t max_entries) bitmap = FILE_CACHE_STATE_BITMAP(fcs); bcomm_cache_iterate_begin(my_bs, &iter); + n_pages = 0; while (n_pages < max_entries && bcomm_cache_iterate_next(my_bs, &iter)) { BufferTag tag; @@ -455,6 +465,7 @@ communicator_new_get_lfc_state(size_t max_entries) BITMAP_SET(bitmap, i); } fcs->n_pages = n_pages; + fcs->n_chunks = n_pages; return fcs; } diff --git a/pgxn/neon/lfc_prewarm.c b/pgxn/neon/lfc_prewarm.c index 5c23d52f3c..5e3ce1e64e 100644 --- a/pgxn/neon/lfc_prewarm.c +++ b/pgxn/neon/lfc_prewarm.c @@ -440,121 +440,131 @@ lfc_prewarm_with_async_requests(FileCacheState *fcs) n_entries = Min(fcs->n_chunks, lfc_prewarm_limit); Assert(n_entries != 0); - LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); - - /* Do not prewarm more entries than LFC limit */ - /* FIXME */ -#if 0 - if (prewarm_ctl->limit <= prewarm_ctl->size) + PG_TRY(); { - elog(LOG, "LFC: skip prewarm because LFC is already filled"); - LWLockRelease(prewarm_lock); - return; - } + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + + /* Do not prewarm more entries than LFC limit */ + /* FIXME */ +#if 0 + if (prewarm_ctl->limit <= prewarm_ctl->size) + { + elog(LOG, "LFC: skip prewarm because LFC is already filled"); + LWLockRelease(prewarm_lock); + return; + } #endif - if (prewarm_ctl->prewarm_active) - { - LWLockRelease(prewarm_lock); - elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); - } - prewarm_ctl->n_prewarm_entries = n_entries; - prewarm_ctl->n_prewarm_workers = -1; - prewarm_ctl->prewarm_active = true; - prewarm_ctl->prewarm_canceled = false; - - /* Calculate total number of pages to be prewarmed */ - prewarm_ctl->total_prewarm_pages = fcs->n_pages; - - LWLockRelease(prewarm_lock); - - elog(LOG, "LFC: start prewarming"); - lfc_do_prewarm = true; - lfc_prewarm_cancel = false; - - bitmap = FILE_CACHE_STATE_BITMAP(fcs); - - blocks_per_chunk = 1 << fcs->chunk_size_log; - - bitno = 0; - for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++) - { - BufferTag *chunk_tag = &fcs->chunks[chunkno]; - BlockNumber request_startblkno = InvalidBlockNumber; - BlockNumber request_endblkno; - - if (!BufferTagIsValid(chunk_tag)) - elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum); - - if (lfc_prewarm_cancel) + if (prewarm_ctl->prewarm_active) { - prewarm_ctl->prewarm_canceled = true; - break; + LWLockRelease(prewarm_lock); + elog(ERROR, "LFC: skip prewarm because another prewarm is still active"); } + prewarm_ctl->n_prewarm_entries = n_entries; + prewarm_ctl->n_prewarm_workers = -1; + prewarm_ctl->prewarm_active = true; + prewarm_ctl->prewarm_canceled = false; - /* take next chunk */ - for (int j = 0; j < blocks_per_chunk; j++) + /* Calculate total number of pages to be prewarmed */ + prewarm_ctl->total_prewarm_pages = fcs->n_pages; + + LWLockRelease(prewarm_lock); + + elog(LOG, "LFC: start prewarming"); + lfc_do_prewarm = true; + lfc_prewarm_cancel = false; + + bitmap = FILE_CACHE_STATE_BITMAP(fcs); + + blocks_per_chunk = 1 << fcs->chunk_size_log; + + bitno = 0; + for (uint32 chunkno = 0; chunkno < fcs->n_chunks; chunkno++) { - BlockNumber blkno = chunk_tag->blockNum + j; + BufferTag *chunk_tag = &fcs->chunks[chunkno]; + BlockNumber request_startblkno = InvalidBlockNumber; + BlockNumber request_endblkno; - if (BITMAP_ISSET(bitmap, bitno)) + if (!BufferTagIsValid(chunk_tag)) + elog(ERROR, "LFC: Invalid buffer tag: %u", chunk_tag->blockNum); + + if (lfc_prewarm_cancel) { - if (request_startblkno != InvalidBlockNumber) + prewarm_ctl->prewarm_canceled = true; + break; + } + + /* take next chunk */ + for (int j = 0; j < blocks_per_chunk; j++) + { + BlockNumber blkno = chunk_tag->blockNum + j; + + if (BITMAP_ISSET(bitmap, bitno)) { - if (request_endblkno == blkno) + if (request_startblkno != InvalidBlockNumber) { - /* append this block to the request */ - request_endblkno++; + if (request_endblkno == blkno) + { + /* append this block to the request */ + request_endblkno++; + } + else + { + /* flush this request, and start new one */ + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + request_startblkno = blkno; + request_endblkno = blkno + 1; + } } else { - /* flush this request, and start new one */ - communicator_new_prefetch_register_bufferv( - BufTagGetNRelFileInfo(*chunk_tag), - chunk_tag->forkNum, - request_startblkno, - request_endblkno - request_startblkno - ); + /* flush this request, if any, and start new one */ + if (request_startblkno != InvalidBlockNumber) + { + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + } request_startblkno = blkno; request_endblkno = blkno + 1; } + prewarm_ctl->prewarmed_pages += 1; } - else - { - /* flush this request, if any, and start new one */ - if (request_startblkno != InvalidBlockNumber) - { - communicator_new_prefetch_register_bufferv( - BufTagGetNRelFileInfo(*chunk_tag), - chunk_tag->forkNum, - request_startblkno, - request_endblkno - request_startblkno - ); - } - request_startblkno = blkno; - request_endblkno = blkno + 1; - } - prewarm_ctl->prewarmed_pages += 1; + bitno++; } - bitno++; + + /* flush this request */ + communicator_new_prefetch_register_bufferv( + BufTagGetNRelFileInfo(*chunk_tag), + chunk_tag->forkNum, + request_startblkno, + request_endblkno - request_startblkno + ); + request_startblkno = request_endblkno = InvalidBlockNumber; } - /* flush this request */ - communicator_new_prefetch_register_bufferv( - BufTagGetNRelFileInfo(*chunk_tag), - chunk_tag->forkNum, - request_startblkno, - request_endblkno - request_startblkno - ); - request_startblkno = request_endblkno = InvalidBlockNumber; + elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages); + prewarm_ctl->completed = GetCurrentTimestamp(); + + LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); + prewarm_ctl->prewarm_active = false; + LWLockRelease(prewarm_lock); } - - elog(LOG, "LFC: complete prewarming: loaded %lu pages", (unsigned long) prewarm_ctl->prewarmed_pages); - prewarm_ctl->completed = GetCurrentTimestamp(); - - LWLockAcquire(prewarm_lock, LW_EXCLUSIVE); - prewarm_ctl->prewarm_active = false; - LWLockRelease(prewarm_lock); + PG_CATCH(); + { + elog(LOG, "LFC: cancel prewarm"); + prewarm_ctl->prewarm_canceled = true; + prewarm_ctl->prewarm_active = false; + } + PG_END_TRY(); } PG_FUNCTION_INFO_V1(get_local_cache_state); @@ -623,7 +633,7 @@ get_prewarm_info(PG_FUNCTION_ARGS) { total_pages = prewarm_ctl->total_prewarm_pages; prewarmed_pages = prewarm_ctl->prewarmed_pages; - skipped_pages = prewarm_ctl->prewarmed_pages; + skipped_pages = prewarm_ctl->skipped_pages; active_workers = 1; } else diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 011c6896bd..40f8d13352 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -157,6 +157,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE [ ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*", ".*page_service_conn_main.*: query handler for 'basebackup .* ERROR: Not found: Timeline", + ".*request failed with Unavailable: Timeline .* is not active", ] ) ps_http = env.pageserver.http_client() @@ -194,7 +195,10 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE env.neon_cli.mappings_map_branch(initial_branch, env.initial_tenant, env.initial_timeline) - with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"): + with pytest.raises( + RuntimeError, + match=f"Timeline {env.initial_tenant}/{env.initial_timeline} is not active", + ): env.endpoints.create_start( initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2 ) diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index a96f18177c..e6f1bf62a1 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -101,20 +101,37 @@ def check_prewarmed_contains( @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +@pytest.mark.parametrize("grpc", [True, False]) @pytest.mark.parametrize("method", METHOD_VALUES, ids=METHOD_IDS) -def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): +def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod, grpc: bool): """ Test we can offload endpoint's LFC cache to endpoint storage. Test we can prewarm endpoint with LFC cache loaded from endpoint storage. """ env = neon_simple_env n_records = 1000000 + + # The `neon.file_cache_prewarm_limit` GUC sets the max number of *chunks* to + # load. So the number of *pages* loaded depends on the chunk size. With the + # new communicator, the new LFC implementation doesn't do chunking so the + # limit is the number of pages, while with the old implementation, the + # default chunk size 1 MB chunks. + # + # Therefore with the old implementation, 1000 chunks equals 128000 pages, if + # all the chunks are fully dense. In practice they are sparse, but should + # amount to > 10000 pages anyway. (We have an assertion below that at least + # 10000 LFC pages are in use after prewarming) + if grpc: + prewarm_limit = 15000 + else: + prewarm_limit = 1000 + cfg = [ "autovacuum = off", "shared_buffers=1MB", "neon.max_file_cache_size=1GB", "neon.file_cache_size_limit=1GB", - "neon.file_cache_prewarm_limit=1000", + f"neon.file_cache_prewarm_limit={prewarm_limit}", ] if method == PrewarmMethod.AUTOPREWARM: @@ -123,9 +140,10 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): config_lines=cfg, autoprewarm=True, offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS, + grpc=grpc, ) else: - endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg) + endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg, grpc=grpc) pg_conn = endpoint.connect() pg_cur = pg_conn.cursor() @@ -162,7 +180,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): log.info(f"Used LFC size: {lfc_used_pages}") pg_cur.execute("select * from neon.get_prewarm_info()") total, prewarmed, skipped, _ = pg_cur.fetchall()[0] - assert lfc_used_pages > 10000 + assert lfc_used_pages >= 10000 assert total > 0 assert prewarmed > 0 assert total == prewarmed + skipped @@ -186,7 +204,7 @@ def test_lfc_prewarm_cancel(neon_simple_env: NeonEnv): "shared_buffers=1MB", "neon.max_file_cache_size=1GB", "neon.file_cache_size_limit=1GB", - "neon.file_cache_prewarm_limit=1000", + "neon.file_cache_prewarm_limit=2000000", ] endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)