mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 20:10:38 +00:00
endpoint_storage compute_ctl integration (#11550)
Add `/lfc/(prewarm|offload)` routes to `compute_ctl` which interact with endpoint storage. Add `prewarm_lfc_on_startup` spec option which, if enabled, downloads LFC prewarm data on compute startup. Resolves: https://github.com/neondatabase/cloud/issues/26343
This commit is contained in:
@@ -10,6 +10,7 @@ from requests.auth import AuthBase
|
||||
from typing_extensions import override
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from requests import PreparedRequest
|
||||
@@ -62,6 +63,35 @@ class EndpointHttpClient(requests.Session):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def prewarm_lfc_status(self) -> dict[str, str]:
|
||||
res = self.get(f"http://localhost:{self.external_port}/lfc/prewarm")
|
||||
res.raise_for_status()
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def prewarm_lfc(self):
|
||||
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
|
||||
|
||||
def prewarmed():
|
||||
json = self.prewarm_lfc_status()
|
||||
status, err = json["status"], json.get("error")
|
||||
assert status == "completed", f"{status}, error {err}"
|
||||
|
||||
wait_until(prewarmed)
|
||||
|
||||
def offload_lfc(self):
|
||||
url = f"http://localhost:{self.external_port}/lfc/offload"
|
||||
self.post(url).raise_for_status()
|
||||
|
||||
def offloaded():
|
||||
res = self.get(url)
|
||||
res.raise_for_status()
|
||||
json = res.json()
|
||||
status, err = json["status"], json.get("error")
|
||||
assert status == "completed", f"{status}, error {err}"
|
||||
|
||||
wait_until(offloaded)
|
||||
|
||||
def database_schema(self, database: str):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",
|
||||
|
||||
@@ -1185,7 +1185,9 @@ class NeonEnv:
|
||||
"broker": {},
|
||||
"safekeepers": [],
|
||||
"pageservers": [],
|
||||
"endpoint_storage": {"port": self.port_distributor.get_port()},
|
||||
"endpoint_storage": {
|
||||
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
|
||||
},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
}
|
||||
|
||||
|
||||
@@ -4,10 +4,12 @@ import pytest
|
||||
from aiohttp import ClientSession
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import run_only_on_default_postgres
|
||||
from jwcrypto import jwk, jwt
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@run_only_on_default_postgres("test doesn't use postgres")
|
||||
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Inserts, retrieves, and deletes test file using a JWT token
|
||||
@@ -35,7 +37,6 @@ async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv)
|
||||
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
log.info(f"cache key url {key}")
|
||||
log.info(f"token {token}")
|
||||
|
||||
async with ClientSession(headers=headers) as session:
|
||||
async with session.get(key) as res:
|
||||
|
||||
@@ -1,11 +1,24 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from enum import Enum
|
||||
|
||||
import pytest
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import USE_LFC
|
||||
from prometheus_client.parser import text_string_to_metric_families as prom_parse_impl
|
||||
|
||||
|
||||
class LfcQueryMethod(Enum):
|
||||
COMPUTE_CTL = False
|
||||
POSTGRES = True
|
||||
|
||||
|
||||
PREWARM_LABEL = "compute_ctl_lfc_prewarm_requests_total"
|
||||
OFFLOAD_LABEL = "compute_ctl_lfc_offload_requests_total"
|
||||
QUERY_OPTIONS = LfcQueryMethod.POSTGRES, LfcQueryMethod.COMPUTE_CTL
|
||||
|
||||
|
||||
def check_pinned_entries(cur):
|
||||
@@ -19,11 +32,20 @@ def check_pinned_entries(cur):
|
||||
assert n_pinned == 0
|
||||
|
||||
|
||||
def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
|
||||
return {
|
||||
sample.name: sample.value
|
||||
for family in prom_parse_impl(client.metrics())
|
||||
for sample in family.samples
|
||||
if sample.name in (PREWARM_LABEL, OFFLOAD_LABEL)
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod):
|
||||
env = neon_simple_env
|
||||
n_records = 1000000
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
@@ -34,30 +56,57 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon version '1.6'")
|
||||
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
|
||||
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
|
||||
cur.execute("select get_local_cache_state()")
|
||||
lfc_state = cur.fetchall()[0][0]
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("create extension neon version '1.6'")
|
||||
pg_cur.execute("create database lfc")
|
||||
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
log.info(f"Inserting {n_records} rows")
|
||||
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
|
||||
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
|
||||
log.info(f"Inserted {n_records} rows")
|
||||
|
||||
http_client = endpoint.http_client()
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
status = http_client.prewarm_lfc_status()
|
||||
assert status["status"] == "not_prewarmed"
|
||||
assert "error" not in status
|
||||
http_client.offload_lfc()
|
||||
assert http_client.prewarm_lfc_status()["status"] == "not_prewarmed"
|
||||
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0}
|
||||
else:
|
||||
pg_cur.execute("select get_local_cache_state()")
|
||||
lfc_state = pg_cur.fetchall()[0][0]
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
|
||||
cur.execute("alter extension neon update to '1.6'")
|
||||
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
# wait until compute_ctl completes downgrade of extension to default version
|
||||
time.sleep(1)
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("alter extension neon update to '1.6'")
|
||||
|
||||
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
|
||||
lfc_used_pages = cur.fetchall()[0][0]
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.prewarm_lfc()
|
||||
else:
|
||||
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
|
||||
pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
|
||||
lfc_used_pages = pg_cur.fetchall()[0][0]
|
||||
log.info(f"Used LFC size: {lfc_used_pages}")
|
||||
cur.execute("select * from get_prewarm_info()")
|
||||
prewarm_info = cur.fetchall()[0]
|
||||
pg_cur.execute("select * from get_prewarm_info()")
|
||||
prewarm_info = pg_cur.fetchall()[0]
|
||||
log.info(f"Prewarm info: {prewarm_info}")
|
||||
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
|
||||
total, prewarmed, skipped, _ = prewarm_info
|
||||
progress = (prewarmed + skipped) * 100 // total
|
||||
log.info(f"Prewarm progress: {progress}%")
|
||||
|
||||
assert lfc_used_pages > 10000
|
||||
assert (
|
||||
@@ -66,18 +115,23 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
|
||||
)
|
||||
|
||||
cur.execute("select sum(pk) from t")
|
||||
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
lfc_cur.execute("select sum(pk) from t")
|
||||
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
|
||||
check_pinned_entries(cur)
|
||||
check_pinned_entries(pg_cur)
|
||||
|
||||
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
assert http_client.prewarm_lfc_status() == desired
|
||||
assert prom_parse(http_client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1}
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
|
||||
@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"])
|
||||
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMethod):
|
||||
env = neon_simple_env
|
||||
n_records = 10000
|
||||
n_threads = 4
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
@@ -87,40 +141,58 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
|
||||
"neon.file_cache_prewarm_limit=1000000",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon version '1.6'")
|
||||
cur.execute(
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("create extension neon version '1.6'")
|
||||
pg_cur.execute("CREATE DATABASE lfc")
|
||||
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
lfc_cur.execute(
|
||||
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
|
||||
)
|
||||
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
|
||||
cur.execute("select get_local_cache_state()")
|
||||
lfc_state = cur.fetchall()[0][0]
|
||||
log.info(f"Inserting {n_records} rows")
|
||||
lfc_cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
|
||||
log.info(f"Inserted {n_records} rows")
|
||||
|
||||
http_client = endpoint.http_client()
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.offload_lfc()
|
||||
else:
|
||||
pg_cur.execute("select get_local_cache_state()")
|
||||
lfc_state = pg_cur.fetchall()[0][0]
|
||||
|
||||
running = True
|
||||
n_prewarms = 0
|
||||
|
||||
def workload():
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
n_transfers = 0
|
||||
while running:
|
||||
src = random.randint(1, n_records)
|
||||
dst = random.randint(1, n_records)
|
||||
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
|
||||
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
|
||||
lfc_cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
|
||||
lfc_cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
|
||||
n_transfers += 1
|
||||
log.info(f"Number of transfers: {n_transfers}")
|
||||
|
||||
def prewarm():
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
n_prewarms = 0
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
while running:
|
||||
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
pg_cur.execute("alter system set neon.file_cache_size_limit='1MB'")
|
||||
pg_cur.execute("select pg_reload_conf()")
|
||||
pg_cur.execute("alter system set neon.file_cache_size_limit='1GB'")
|
||||
pg_cur.execute("select pg_reload_conf()")
|
||||
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.prewarm_lfc()
|
||||
else:
|
||||
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
|
||||
nonlocal n_prewarms
|
||||
n_prewarms += 1
|
||||
log.info(f"Number of prewarms: {n_prewarms}")
|
||||
|
||||
@@ -140,8 +212,10 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
|
||||
t.join()
|
||||
prewarm_thread.join()
|
||||
|
||||
cur.execute("select sum(balance) from accounts")
|
||||
total_balance = cur.fetchall()[0][0]
|
||||
lfc_cur.execute("select sum(balance) from accounts")
|
||||
total_balance = lfc_cur.fetchall()[0][0]
|
||||
assert total_balance == 0
|
||||
|
||||
check_pinned_entries(cur)
|
||||
check_pinned_entries(pg_cur)
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: n_prewarms}
|
||||
|
||||
Reference in New Issue
Block a user