diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index 4190580e5e..3f6f9a7ecc 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -105,7 +105,8 @@ impl ComputeNode { cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed; return; }; - error!(%err); + crate::metrics::LFC_PREWARM_ERRORS.inc(); + error!(%err, "prewarming lfc"); cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed { error: err.to_string(), }; @@ -180,7 +181,8 @@ impl ComputeNode { self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed; return; }; - error!(%err); + crate::metrics::LFC_OFFLOAD_ERRORS.inc(); + error!(%err, "offloading lfc"); self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed { error: err.to_string(), }; diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index 8f81675c49..91dedbb42a 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -105,6 +105,14 @@ pub(crate) static LFC_PREWARMS: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +pub(crate) static LFC_PREWARM_ERRORS: Lazy = Lazy::new(|| { + register_int_counter!( + "compute_ctl_lfc_prewarm_errors_total", + "Total number of LFC prewarms errors requested by compute_ctl or autoprewarm option", + ) + .expect("failed to define a metric") +}); + pub(crate) static LFC_OFFLOADS: Lazy = Lazy::new(|| { register_int_counter!( "compute_ctl_lfc_offloads_total", @@ -113,6 +121,14 @@ pub(crate) static LFC_OFFLOADS: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +pub(crate) static LFC_OFFLOAD_ERRORS: Lazy = Lazy::new(|| { + register_int_counter!( + "compute_ctl_lfc_offload_errors_total", + "Total number of LFC offload errors requested by compute_ctl or lfc_offload_period_seconds option", + ) + .expect("failed to define a metric") +}); + pub fn collect() -> Vec { let mut metrics = COMPUTE_CTL_UP.collect(); metrics.extend(INSTALLED_EXTENSIONS.collect()); @@ -123,6 +139,8 @@ pub fn collect() -> Vec { metrics.extend(PG_CURR_DOWNTIME_MS.collect()); metrics.extend(PG_TOTAL_DOWNTIME_MS.collect()); metrics.extend(LFC_PREWARMS.collect()); + metrics.extend(LFC_PREWARM_ERRORS.collect()); metrics.extend(LFC_OFFLOADS.collect()); + metrics.extend(LFC_OFFLOAD_ERRORS.collect()); metrics } diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index 1fa1ead034..ae36bbda79 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,7 +1,6 @@ import random import threading from enum import StrEnum -from time import sleep from typing import Any import pytest @@ -20,28 +19,32 @@ class PrewarmMethod(StrEnum): PREWARM_LABEL = "compute_ctl_lfc_prewarms_total" +PREWARM_ERR_LABEL = "compute_ctl_lfc_prewarm_errors_total" OFFLOAD_LABEL = "compute_ctl_lfc_offloads_total" +OFFLOAD_ERR_LABEL = "compute_ctl_lfc_offload_errors_total" METHOD_VALUES = [e for e in PrewarmMethod] METHOD_IDS = [e.value for e in PrewarmMethod] def check_pinned_entries(cur: Cursor): - # some LFC buffer can be temporary locked by autovacuum or background writer - for _ in range(10): + """ + Wait till none of LFC buffers are pinned + """ + + def none_pinned(): cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'") - n_pinned = cur.fetchall()[0][0] - if n_pinned == 0: - break - sleep(1) - assert n_pinned == 0 + assert cur.fetchall()[0][0] == 0 + + wait_until(none_pinned) def prom_parse(client: EndpointHttpClient) -> dict[str, float]: + labels = PREWARM_LABEL, OFFLOAD_LABEL, PREWARM_ERR_LABEL, OFFLOAD_ERR_LABEL return { - sample.name: sample.value + sample.name: int(sample.value) for family in prom_parse_impl(client.metrics()) for sample in family.samples - if sample.name in (PREWARM_LABEL, OFFLOAD_LABEL) + if sample.name in labels } @@ -54,7 +57,9 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) assert "error" not in status client.offload_lfc() assert client.prewarm_lfc_status()["status"] == "not_prewarmed" - assert prom_parse(client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0} + parsed = prom_parse(client) + desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0} + assert parsed == desired, f"{parsed=} != {desired=}" elif method == PrewarmMethod.POSTGRES: cur.execute("select get_local_cache_state()") return cur.fetchall()[0][0] @@ -81,12 +86,17 @@ def check_prewarmed( assert prom_parse(client)[PREWARM_LABEL] == 1 elif method == PrewarmMethod.COMPUTE_CTL: assert client.prewarm_lfc_status() == desired_status - assert prom_parse(client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1} + desired = {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1, PREWARM_ERR_LABEL: 0, OFFLOAD_ERR_LABEL: 0} + assert prom_parse(client) == desired @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") @pytest.mark.parametrize("method", METHOD_VALUES, ids=METHOD_IDS) def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): + """ + Test we can offload endpoint's LFC cache to endpoint storage. + Test we can prewarm endpoint with LFC cache loaded from endpoint storage. + """ env = neon_simple_env n_records = 1000000 cfg = [ @@ -140,18 +150,15 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): lfc_used_pages = pg_cur.fetchall()[0][0] log.info(f"Used LFC size: {lfc_used_pages}") pg_cur.execute("select * from get_prewarm_info()") - prewarm_info = pg_cur.fetchall()[0] - log.info(f"Prewarm info: {prewarm_info}") - total, prewarmed, skipped, _ = prewarm_info + total, prewarmed, skipped, _ = pg_cur.fetchall()[0] + log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}") progress = (prewarmed + skipped) * 100 // total log.info(f"Prewarm progress: {progress}%") - assert lfc_used_pages > 10000 - assert ( - prewarm_info[0] > 0 - and prewarm_info[1] > 0 - and prewarm_info[0] == prewarm_info[1] + prewarm_info[2] - ) + assert total > 0 + assert prewarmed > 0 + assert total == prewarmed + skipped + lfc_cur.execute("select sum(pk) from t") assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 @@ -168,6 +175,9 @@ WORKLOAD_IDS = METHOD_IDS[:-1] @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") @pytest.mark.parametrize("method", WORKLOAD_VALUES, ids=WORKLOAD_IDS) def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMethod): + """ + Test continiously prewarming endpoint when there is a write-heavy workload going in parallel + """ env = neon_simple_env n_records = 10000 n_threads = 4 @@ -247,5 +257,12 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet assert total_balance == 0 check_pinned_entries(pg_cur) - if method != PrewarmMethod.POSTGRES: - assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: n_prewarms} + if method == PrewarmMethod.POSTGRES: + return + desired = { + OFFLOAD_LABEL: 1, + PREWARM_LABEL: n_prewarms, + OFFLOAD_ERR_LABEL: 0, + PREWARM_ERR_LABEL: 0, + } + assert prom_parse(http_client) == desired