diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index feea6c6f03..e870cecc58 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -29,7 +29,8 @@ use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::{Duration, Instant}; use std::{env, fs}; -use tokio::spawn; +use tokio::task::JoinHandle; +use tokio::{spawn, time}; use tracing::{Instrument, debug, error, info, instrument, warn}; use url::Url; use utils::id::{TenantId, TimelineId}; @@ -107,6 +108,8 @@ pub struct ComputeNodeParams { pub installed_extensions_collection_interval: Arc, } +type TaskHandle = Mutex>>; + /// Compute node info shared across several `compute_ctl` threads. pub struct ComputeNode { pub params: ComputeNodeParams, @@ -129,7 +132,8 @@ pub struct ComputeNode { pub compute_ctl_config: ComputeCtlConfig, /// Handle to the extension stats collection task - extension_stats_task: Mutex>>, + extension_stats_task: TaskHandle, + lfc_offload_task: TaskHandle, } // store some metrics about download size that might impact startup time @@ -368,7 +372,7 @@ fn maybe_cgexec(cmd: &str) -> Command { struct PostgresHandle { postgres: std::process::Child, - log_collector: tokio::task::JoinHandle>, + log_collector: JoinHandle>, } impl PostgresHandle { @@ -382,7 +386,7 @@ struct StartVmMonitorResult { #[cfg(target_os = "linux")] token: tokio_util::sync::CancellationToken, #[cfg(target_os = "linux")] - vm_monitor: Option>>, + vm_monitor: Option>>, } impl ComputeNode { @@ -433,6 +437,7 @@ impl ComputeNode { ext_download_progress: RwLock::new(HashMap::new()), compute_ctl_config: config.compute_ctl_config, extension_stats_task: Mutex::new(None), + lfc_offload_task: Mutex::new(None), }) } @@ -520,8 +525,8 @@ impl ComputeNode { None }; - // Terminate the extension stats collection task this.terminate_extension_stats_task(); + this.terminate_lfc_offload_task(); // Terminate the vm_monitor so it releases the file watcher on // /sys/fs/cgroup/neon-postgres. @@ -851,12 +856,15 @@ impl ComputeNode { // Log metrics so that we can search for slow operations in logs info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished"); - // Spawn the extension stats background task self.spawn_extension_stats_task(); if pspec.spec.autoprewarm { + info!("autoprewarming on startup as requested"); self.prewarm_lfc(None); } + if let Some(seconds) = pspec.spec.offload_lfc_interval_seconds { + self.spawn_lfc_offload_task(Duration::from_secs(seconds.into())); + }; Ok(()) } @@ -2357,10 +2365,7 @@ LIMIT 100", } pub fn spawn_extension_stats_task(&self) { - // Cancel any existing task - if let Some(handle) = self.extension_stats_task.lock().unwrap().take() { - handle.abort(); - } + self.terminate_extension_stats_task(); let conf = self.tokio_conn_conf.clone(); let atomic_interval = self.params.installed_extensions_collection_interval.clone(); @@ -2396,8 +2401,30 @@ LIMIT 100", } fn terminate_extension_stats_task(&self) { - if let Some(handle) = self.extension_stats_task.lock().unwrap().take() { - handle.abort(); + if let Some(h) = self.extension_stats_task.lock().unwrap().take() { + h.abort() + } + } + + pub fn spawn_lfc_offload_task(self: &Arc, interval: Duration) { + self.terminate_lfc_offload_task(); + let secs = interval.as_secs(); + info!("spawning lfc offload worker with {secs}s interval"); + let this = self.clone(); + let handle = spawn(async move { + let mut interval = time::interval(interval); + interval.tick().await; // returns immediately + loop { + interval.tick().await; + this.offload_lfc_async().await; + } + }); + *self.lfc_offload_task.lock().unwrap() = Some(handle); + } + + fn terminate_lfc_offload_task(&self) { + if let Some(h) = self.lfc_offload_task.lock().unwrap().take() { + h.abort() } } diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index 1c7a7bef60..4190580e5e 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -5,6 +5,7 @@ use compute_api::responses::LfcOffloadState; use compute_api::responses::LfcPrewarmState; use http::StatusCode; use reqwest::Client; +use std::mem::replace; use std::sync::Arc; use tokio::{io::AsyncReadExt, spawn}; use tracing::{error, info}; @@ -88,17 +89,15 @@ impl ComputeNode { self.state.lock().unwrap().lfc_offload_state.clone() } - /// Returns false if there is a prewarm request ongoing, true otherwise + /// If there is a prewarm request ongoing, return false, true otherwise pub fn prewarm_lfc(self: &Arc, from_endpoint: Option) -> bool { - crate::metrics::LFC_PREWARM_REQUESTS.inc(); { let state = &mut self.state.lock().unwrap().lfc_prewarm_state; - if let LfcPrewarmState::Prewarming = - std::mem::replace(state, LfcPrewarmState::Prewarming) - { + if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) { return false; } } + crate::metrics::LFC_PREWARMS.inc(); let cloned = self.clone(); spawn(async move { @@ -152,32 +151,41 @@ impl ComputeNode { .map(|_| ()) } - /// Returns false if there is an offload request ongoing, true otherwise + /// If offload request is ongoing, return false, true otherwise pub fn offload_lfc(self: &Arc) -> bool { - crate::metrics::LFC_OFFLOAD_REQUESTS.inc(); { let state = &mut self.state.lock().unwrap().lfc_offload_state; - if let LfcOffloadState::Offloading = - std::mem::replace(state, LfcOffloadState::Offloading) - { + if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading { return false; } } - let cloned = self.clone(); - spawn(async move { - let Err(err) = cloned.offload_lfc_impl().await else { - cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed; - return; - }; - error!(%err); - cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed { - error: err.to_string(), - }; - }); + spawn(async move { cloned.offload_lfc_with_state_update().await }); true } + pub async fn offload_lfc_async(self: &Arc) { + { + let state = &mut self.state.lock().unwrap().lfc_offload_state; + if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading { + return; + } + } + self.offload_lfc_with_state_update().await + } + + async fn offload_lfc_with_state_update(&self) { + crate::metrics::LFC_OFFLOADS.inc(); + let Err(err) = self.offload_lfc_impl().await else { + self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed; + return; + }; + error!(%err); + self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed { + error: err.to_string(), + }; + } + async fn offload_lfc_impl(&self) -> Result<()> { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?; info!(%url, "requesting LFC state from postgres"); diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index 90326b2074..8f81675c49 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -97,20 +97,18 @@ pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy> = Lazy:: .expect("failed to define a metric") }); -/// Needed as neon.file_cache_prewarm_batch == 0 doesn't mean we never tried to prewarm. -/// On the other hand, LFC_PREWARMED_PAGES is excessive as we can GET /lfc/prewarm -pub(crate) static LFC_PREWARM_REQUESTS: Lazy = Lazy::new(|| { +pub(crate) static LFC_PREWARMS: Lazy = Lazy::new(|| { register_int_counter!( - "compute_ctl_lfc_prewarm_requests_total", - "Total number of LFC prewarm requests made by compute_ctl", + "compute_ctl_lfc_prewarms_total", + "Total number of LFC prewarms requested by compute_ctl or autoprewarm option", ) .expect("failed to define a metric") }); -pub(crate) static LFC_OFFLOAD_REQUESTS: Lazy = Lazy::new(|| { +pub(crate) static LFC_OFFLOADS: Lazy = Lazy::new(|| { register_int_counter!( - "compute_ctl_lfc_offload_requests_total", - "Total number of LFC offload requests made by compute_ctl", + "compute_ctl_lfc_offloads_total", + "Total number of LFC offloads requested by compute_ctl or lfc_offload_period_seconds option", ) .expect("failed to define a metric") }); @@ -124,7 +122,7 @@ pub fn collect() -> Vec { metrics.extend(AUDIT_LOG_DIR_SIZE.collect()); metrics.extend(PG_CURR_DOWNTIME_MS.collect()); metrics.extend(PG_TOTAL_DOWNTIME_MS.collect()); - metrics.extend(LFC_PREWARM_REQUESTS.collect()); - metrics.extend(LFC_OFFLOAD_REQUESTS.collect()); + metrics.extend(LFC_PREWARMS.collect()); + metrics.extend(LFC_OFFLOADS.collect()); metrics } diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index 2b865c75d0..fae59082c6 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -31,6 +31,7 @@ mod pg_helpers_tests { wal_level = logical hot_standby = on autoprewarm = off +offload_lfc_interval_seconds = 20 neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501' wal_log_hints = on log_connections = on diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index c75d76260a..6021933d6a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -675,6 +675,16 @@ struct EndpointStartCmdArgs { #[arg(default_value = "90s")] start_timeout: Duration, + #[clap( + long, + help = "Download LFC cache from endpoint storage on endpoint startup", + default_value = "false" + )] + autoprewarm: bool, + + #[clap(long, help = "Upload LFC cache to endpoint storage periodically")] + offload_lfc_interval_seconds: Option, + #[clap( long, help = "Run in development mode, skipping VM-specific operations like process termination", @@ -1585,22 +1595,24 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res let endpoint_storage_token = env.generate_auth_token(&claims)?; let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string(); + let args = control_plane::endpoint::EndpointStartArgs { + auth_token, + endpoint_storage_token, + endpoint_storage_addr, + safekeepers_generation, + safekeepers, + pageservers, + remote_ext_base_url: remote_ext_base_url.clone(), + shard_stripe_size: stripe_size.0 as usize, + create_test_user: args.create_test_user, + start_timeout: args.start_timeout, + autoprewarm: args.autoprewarm, + offload_lfc_interval_seconds: args.offload_lfc_interval_seconds, + dev: args.dev, + }; + println!("Starting existing endpoint {endpoint_id}..."); - endpoint - .start( - &auth_token, - endpoint_storage_token, - endpoint_storage_addr, - safekeepers_generation, - safekeepers, - pageservers, - remote_ext_base_url.as_ref(), - stripe_size.0 as usize, - args.create_test_user, - args.start_timeout, - args.dev, - ) - .await?; + endpoint.start(args).await?; } EndpointCmd::Reconfigure(args) => { let endpoint_id = &args.endpoint_id; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 424101b9a4..74ab15dc97 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -373,6 +373,22 @@ impl std::fmt::Display for EndpointTerminateMode { } } +pub struct EndpointStartArgs { + pub auth_token: Option, + pub endpoint_storage_token: String, + pub endpoint_storage_addr: String, + pub safekeepers_generation: Option, + pub safekeepers: Vec, + pub pageservers: Vec<(PageserverProtocol, Host, u16)>, + pub remote_ext_base_url: Option, + pub shard_stripe_size: usize, + pub create_test_user: bool, + pub start_timeout: Duration, + pub autoprewarm: bool, + pub offload_lfc_interval_seconds: Option, + pub dev: bool, +} + impl Endpoint { fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result { if !entry.file_type()?.is_dir() { @@ -677,21 +693,7 @@ impl Endpoint { }) } - #[allow(clippy::too_many_arguments)] - pub async fn start( - &self, - auth_token: &Option, - endpoint_storage_token: String, - endpoint_storage_addr: String, - safekeepers_generation: Option, - safekeepers: Vec, - pageservers: Vec<(PageserverProtocol, Host, u16)>, - remote_ext_base_url: Option<&String>, - shard_stripe_size: usize, - create_test_user: bool, - start_timeout: Duration, - dev: bool, - ) -> Result<()> { + pub async fn start(&self, args: EndpointStartArgs) -> Result<()> { if self.status() == EndpointStatus::Running { anyhow::bail!("The endpoint is already running"); } @@ -704,10 +706,10 @@ impl Endpoint { std::fs::remove_dir_all(self.pgdata())?; } - let pageserver_connstring = Self::build_pageserver_connstr(&pageservers); + let pageserver_connstring = Self::build_pageserver_connstr(&args.pageservers); assert!(!pageserver_connstring.is_empty()); - let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?; + let safekeeper_connstrings = self.build_safekeepers_connstrs(args.safekeepers)?; // check for file remote_extensions_spec.json // if it is present, read it and pass to compute_ctl @@ -735,7 +737,7 @@ impl Endpoint { cluster_id: None, // project ID: not used name: None, // project name: not used state: None, - roles: if create_test_user { + roles: if args.create_test_user { vec![Role { name: PgIdent::from_str("test").unwrap(), encrypted_password: None, @@ -744,7 +746,7 @@ impl Endpoint { } else { Vec::new() }, - databases: if create_test_user { + databases: if args.create_test_user { vec![Database { name: PgIdent::from_str("neondb").unwrap(), owner: PgIdent::from_str("test").unwrap(), @@ -766,20 +768,21 @@ impl Endpoint { endpoint_id: Some(self.endpoint_id.clone()), mode: self.mode, pageserver_connstring: Some(pageserver_connstring), - safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()), + safekeepers_generation: args.safekeepers_generation.map(|g| g.into_inner()), safekeeper_connstrings, - storage_auth_token: auth_token.clone(), + storage_auth_token: args.auth_token.clone(), remote_extensions, pgbouncer_settings: None, - shard_stripe_size: Some(shard_stripe_size), + shard_stripe_size: Some(args.shard_stripe_size), local_proxy_config: None, reconfigure_concurrency: self.reconfigure_concurrency, drop_subscriptions_before_start: self.drop_subscriptions_before_start, audit_log_level: ComputeAudit::Disabled, logs_export_host: None::, - endpoint_storage_addr: Some(endpoint_storage_addr), - endpoint_storage_token: Some(endpoint_storage_token), - autoprewarm: false, + endpoint_storage_addr: Some(args.endpoint_storage_addr), + endpoint_storage_token: Some(args.endpoint_storage_token), + autoprewarm: args.autoprewarm, + offload_lfc_interval_seconds: args.offload_lfc_interval_seconds, suspend_timeout_seconds: -1, // Only used in neon_local. }; @@ -791,7 +794,7 @@ impl Endpoint { debug!("spec.cluster {:?}", spec.cluster); // fill missing fields again - if create_test_user { + if args.create_test_user { spec.cluster.roles.push(Role { name: PgIdent::from_str("test").unwrap(), encrypted_password: None, @@ -826,7 +829,7 @@ impl Endpoint { // Launch compute_ctl let conn_str = self.connstr("cloud_admin", "postgres"); println!("Starting postgres node at '{conn_str}'"); - if create_test_user { + if args.create_test_user { let conn_str = self.connstr("test", "neondb"); println!("Also at '{conn_str}'"); } @@ -858,11 +861,11 @@ impl Endpoint { .stderr(logfile.try_clone()?) .stdout(logfile); - if let Some(remote_ext_base_url) = remote_ext_base_url { - cmd.args(["--remote-ext-base-url", remote_ext_base_url]); + if let Some(remote_ext_base_url) = args.remote_ext_base_url { + cmd.args(["--remote-ext-base-url", &remote_ext_base_url]); } - if dev { + if args.dev { cmd.arg("--dev"); } @@ -894,10 +897,11 @@ impl Endpoint { Ok(state) => { match state.status { ComputeStatus::Init => { - if Instant::now().duration_since(start_at) > start_timeout { + let timeout = args.start_timeout; + if Instant::now().duration_since(start_at) > timeout { bail!( "compute startup timed out {:?}; still in Init state", - start_timeout + timeout ); } // keep retrying @@ -925,9 +929,10 @@ impl Endpoint { } } Err(e) => { - if Instant::now().duration_since(start_at) > start_timeout { + if Instant::now().duration_since(start_at) > args.start_timeout { return Err(e).context(format!( - "timed out {start_timeout:?} waiting to connect to compute_ctl HTTP", + "timed out {:?} waiting to connect to compute_ctl HTTP", + args.start_timeout )); } } diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 5cad849e3d..a54411b06a 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -58,7 +58,7 @@ pub enum LfcPrewarmState { }, } -#[derive(Serialize, Default, Debug, Clone)] +#[derive(Serialize, Default, Debug, Clone, PartialEq)] #[serde(tag = "status", rename_all = "snake_case")] pub enum LfcOffloadState { #[default] diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 6b2caa9d3a..60311aa3e6 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -181,10 +181,14 @@ pub struct ComputeSpec { /// JWT for authorizing requests to endpoint storage service pub endpoint_storage_token: Option, - /// Download LFC state from endpoint_storage and pass it to Postgres on startup #[serde(default)] + /// Download LFC state from endpoint storage and pass it to Postgres on compute startup pub autoprewarm: bool, + #[serde(default)] + /// Upload LFC state to endpoint storage periodically. Default value (None) means "don't upload" + pub offload_lfc_interval_seconds: Option, + /// Suspend timeout in seconds. /// /// We use this value to derive other values, such as the installed extensions metric. diff --git a/libs/compute_api/tests/cluster_spec.json b/libs/compute_api/tests/cluster_spec.json index 94d7f1e081..86ab8c6e32 100644 --- a/libs/compute_api/tests/cluster_spec.json +++ b/libs/compute_api/tests/cluster_spec.json @@ -90,6 +90,11 @@ "value": "off", "vartype": "bool" }, + { + "name": "offload_lfc_interval_seconds", + "value": "20", + "vartype": "integer" + }, { "name": "neon.safekeepers", "value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501", diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index f5be544439..294c52321b 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -57,6 +57,8 @@ class EndpointHttpClient(requests.Session): self.auth = BearerAuth(jwt) self.mount("http://", HTTPAdapter()) + self.prewarm_url = f"http://localhost:{external_port}/lfc/prewarm" + self.offload_url = f"http://localhost:{external_port}/lfc/offload" def dbs_and_roles(self): res = self.get(f"http://localhost:{self.external_port}/dbs_and_roles", auth=self.auth) @@ -64,33 +66,39 @@ class EndpointHttpClient(requests.Session): return res.json() def prewarm_lfc_status(self) -> dict[str, str]: - res = self.get(f"http://localhost:{self.external_port}/lfc/prewarm") + res = self.get(self.prewarm_url) res.raise_for_status() json: dict[str, str] = res.json() return json def prewarm_lfc(self, from_endpoint_id: str | None = None): - url: str = f"http://localhost:{self.external_port}/lfc/prewarm" params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict() - self.post(url, params=params).raise_for_status() + self.post(self.prewarm_url, params=params).raise_for_status() + self.prewarm_lfc_wait() + def prewarm_lfc_wait(self): def prewarmed(): json = self.prewarm_lfc_status() status, err = json["status"], json.get("error") - assert status == "completed", f"{status}, error {err}" + assert status == "completed", f"{status}, {err=}" wait_until(prewarmed, timeout=60) - def offload_lfc(self): - url = f"http://localhost:{self.external_port}/lfc/offload" - self.post(url).raise_for_status() + def offload_lfc_status(self) -> dict[str, str]: + res = self.get(self.offload_url) + res.raise_for_status() + json: dict[str, str] = res.json() + return json + def offload_lfc(self): + self.post(self.offload_url).raise_for_status() + self.offload_lfc_wait() + + def offload_lfc_wait(self): def offloaded(): - res = self.get(url) - res.raise_for_status() - json = res.json() + json = self.offload_lfc_status() status, err = json["status"], json.get("error") - assert status == "completed", f"{status}, error {err}" + assert status == "completed", f"{status}, {err=}" wait_until(offloaded) diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 1b09e5bdd0..1abd3396e4 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -568,6 +568,8 @@ class NeonLocalCli(AbstractNeonCli): timeout: str | None = None, env: dict[str, str] | None = None, dev: bool = False, + autoprewarm: bool = False, + offload_lfc_interval_seconds: int | None = None, ) -> subprocess.CompletedProcess[str]: args = [ "endpoint", @@ -593,6 +595,10 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--create-test-user"]) if timeout is not None: args.extend(["--start-timeout", str(timeout)]) + if autoprewarm: + args.extend(["--autoprewarm"]) + if offload_lfc_interval_seconds is not None: + args.extend(["--offload-lfc-interval-seconds", str(offload_lfc_interval_seconds)]) if dev: args.extend(["--dev"]) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 52ff977162..f2ec022666 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4362,6 +4362,8 @@ class Endpoint(PgProtocol, LogUtils): basebackup_request_tries: int | None = None, timeout: str | None = None, env: dict[str, str] | None = None, + autoprewarm: bool = False, + offload_lfc_interval_seconds: int | None = None, ) -> Self: """ Start the Postgres instance. @@ -4386,6 +4388,8 @@ class Endpoint(PgProtocol, LogUtils): basebackup_request_tries=basebackup_request_tries, timeout=timeout, env=env, + autoprewarm=autoprewarm, + offload_lfc_interval_seconds=offload_lfc_interval_seconds, ) self._running.release(1) self.log_config_value("shared_buffers") @@ -4601,6 +4605,8 @@ class Endpoint(PgProtocol, LogUtils): pageserver_id: int | None = None, allow_multiple: bool = False, basebackup_request_tries: int | None = None, + autoprewarm: bool = False, + offload_lfc_interval_seconds: int | None = None, ) -> Self: """ Create an endpoint, apply config, and start Postgres. @@ -4621,6 +4627,8 @@ class Endpoint(PgProtocol, LogUtils): pageserver_id=pageserver_id, allow_multiple=allow_multiple, basebackup_request_tries=basebackup_request_tries, + autoprewarm=autoprewarm, + offload_lfc_interval_seconds=offload_lfc_interval_seconds, ) return self @@ -4705,6 +4713,8 @@ class EndpointFactory: remote_ext_base_url: str | None = None, pageserver_id: int | None = None, basebackup_request_tries: int | None = None, + autoprewarm: bool = False, + offload_lfc_interval_seconds: int | None = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -4726,6 +4736,8 @@ class EndpointFactory: remote_ext_base_url=remote_ext_base_url, pageserver_id=pageserver_id, basebackup_request_tries=basebackup_request_tries, + autoprewarm=autoprewarm, + offload_lfc_interval_seconds=offload_lfc_interval_seconds, ) def create( diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index e1058cd644..1fa1ead034 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,34 +1,38 @@ import random import threading -import time -from enum import Enum +from enum import StrEnum +from time import sleep +from typing import Any import pytest from fixtures.endpoint.http import EndpointHttpClient from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv -from fixtures.utils import USE_LFC +from fixtures.utils import USE_LFC, wait_until from prometheus_client.parser import text_string_to_metric_families as prom_parse_impl +from psycopg2.extensions import cursor as Cursor -class LfcQueryMethod(Enum): - COMPUTE_CTL = False - POSTGRES = True +class PrewarmMethod(StrEnum): + POSTGRES = "postgres" + COMPUTE_CTL = "compute-ctl" + AUTOPREWARM = "autoprewarm" -PREWARM_LABEL = "compute_ctl_lfc_prewarm_requests_total" -OFFLOAD_LABEL = "compute_ctl_lfc_offload_requests_total" -QUERY_OPTIONS = LfcQueryMethod.POSTGRES, LfcQueryMethod.COMPUTE_CTL +PREWARM_LABEL = "compute_ctl_lfc_prewarms_total" +OFFLOAD_LABEL = "compute_ctl_lfc_offloads_total" +METHOD_VALUES = [e for e in PrewarmMethod] +METHOD_IDS = [e.value for e in PrewarmMethod] -def check_pinned_entries(cur): +def check_pinned_entries(cur: Cursor): # some LFC buffer can be temporary locked by autovacuum or background writer for _ in range(10): cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'") n_pinned = cur.fetchall()[0][0] if n_pinned == 0: break - time.sleep(1) + sleep(1) assert n_pinned == 0 @@ -41,21 +45,68 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]: } +def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any: + if method == PrewarmMethod.AUTOPREWARM: + client.offload_lfc_wait() + elif method == PrewarmMethod.COMPUTE_CTL: + status = client.prewarm_lfc_status() + assert status["status"] == "not_prewarmed" + assert "error" not in status + client.offload_lfc() + assert client.prewarm_lfc_status()["status"] == "not_prewarmed" + assert prom_parse(client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0} + elif method == PrewarmMethod.POSTGRES: + cur.execute("select get_local_cache_state()") + return cur.fetchall()[0][0] + else: + raise AssertionError(f"{method} not in PrewarmMethod") + + +def prewarm_endpoint( + method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor, lfc_state: str | None +): + if method == PrewarmMethod.AUTOPREWARM: + client.prewarm_lfc_wait() + elif method == PrewarmMethod.COMPUTE_CTL: + client.prewarm_lfc() + elif method == PrewarmMethod.POSTGRES: + cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + + +def check_prewarmed( + method: PrewarmMethod, client: EndpointHttpClient, desired_status: dict[str, str | int] +): + if method == PrewarmMethod.AUTOPREWARM: + assert client.prewarm_lfc_status() == desired_status + assert prom_parse(client)[PREWARM_LABEL] == 1 + elif method == PrewarmMethod.COMPUTE_CTL: + assert client.prewarm_lfc_status() == desired_status + assert prom_parse(client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1} + + @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") -@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"]) -def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod): +@pytest.mark.parametrize("method", METHOD_VALUES, ids=METHOD_IDS) +def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): env = neon_simple_env n_records = 1000000 - endpoint = env.endpoints.create_start( - branch_name="main", - config_lines=[ - "autovacuum = off", - "shared_buffers=1MB", - "neon.max_file_cache_size=1GB", - "neon.file_cache_size_limit=1GB", - "neon.file_cache_prewarm_limit=1000", - ], - ) + cfg = [ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ] + offload_secs = 2 + + if method == PrewarmMethod.AUTOPREWARM: + endpoint = env.endpoints.create_start( + branch_name="main", + config_lines=cfg, + autoprewarm=True, + offload_lfc_interval_seconds=offload_secs, + ) + else: + endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg) pg_conn = endpoint.connect() pg_cur = pg_conn.cursor() @@ -69,31 +120,21 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod): lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") log.info(f"Inserted {n_records} rows") - http_client = endpoint.http_client() - if query is LfcQueryMethod.COMPUTE_CTL: - status = http_client.prewarm_lfc_status() - assert status["status"] == "not_prewarmed" - assert "error" not in status - http_client.offload_lfc() - assert http_client.prewarm_lfc_status()["status"] == "not_prewarmed" - assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0} - else: - pg_cur.execute("select get_local_cache_state()") - lfc_state = pg_cur.fetchall()[0][0] + client = endpoint.http_client() + lfc_state = offload_lfc(method, client, pg_cur) endpoint.stop() - endpoint.start() + if method == PrewarmMethod.AUTOPREWARM: + endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=offload_secs) + else: + endpoint.start() pg_conn = endpoint.connect() pg_cur = pg_conn.cursor() lfc_conn = endpoint.connect(dbname="lfc") lfc_cur = lfc_conn.cursor() - - if query is LfcQueryMethod.COMPUTE_CTL: - http_client.prewarm_lfc() - else: - pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) + prewarm_endpoint(method, client, pg_cur, lfc_state) pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'") lfc_used_pages = pg_cur.fetchall()[0][0] @@ -111,33 +152,32 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, query: LfcQueryMethod): and prewarm_info[1] > 0 and prewarm_info[0] == prewarm_info[1] + prewarm_info[2] ) - lfc_cur.execute("select sum(pk) from t") assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 check_pinned_entries(pg_cur) - desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped} - if query is LfcQueryMethod.COMPUTE_CTL: - assert http_client.prewarm_lfc_status() == desired - assert prom_parse(http_client) == {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1} + check_prewarmed(method, client, desired) + + +# autoprewarm isn't needed as we prewarm manually +WORKLOAD_VALUES = METHOD_VALUES[:-1] +WORKLOAD_IDS = METHOD_IDS[:-1] @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") -@pytest.mark.parametrize("query", QUERY_OPTIONS, ids=["postgres", "compute-ctl"]) -def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMethod): +@pytest.mark.parametrize("method", WORKLOAD_VALUES, ids=WORKLOAD_IDS) +def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMethod): env = neon_simple_env n_records = 10000 n_threads = 4 - endpoint = env.endpoints.create_start( - branch_name="main", - config_lines=[ - "shared_buffers=1MB", - "neon.max_file_cache_size=1GB", - "neon.file_cache_size_limit=1GB", - "neon.file_cache_prewarm_limit=1000000", - ], - ) + cfg = [ + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000000", + ] + endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg) pg_conn = endpoint.connect() pg_cur = pg_conn.cursor() @@ -154,12 +194,7 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet log.info(f"Inserted {n_records} rows") http_client = endpoint.http_client() - if query is LfcQueryMethod.COMPUTE_CTL: - http_client.offload_lfc() - else: - pg_cur.execute("select get_local_cache_state()") - lfc_state = pg_cur.fetchall()[0][0] - + lfc_state = offload_lfc(method, http_client, pg_cur) running = True n_prewarms = 0 @@ -170,8 +205,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet while running: src = random.randint(1, n_records) dst = random.randint(1, n_records) - lfc_cur.execute("update accounts set balance=balance-100 where id=%s", (src,)) - lfc_cur.execute("update accounts set balance=balance+100 where id=%s", (dst,)) + lfc_cur.execute(f"update accounts set balance=balance-100 where id={src}") + lfc_cur.execute(f"update accounts set balance=balance+100 where id={dst}") n_transfers += 1 log.info(f"Number of transfers: {n_transfers}") @@ -183,13 +218,7 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet pg_cur.execute("select pg_reload_conf()") pg_cur.execute("alter system set neon.file_cache_size_limit='1GB'") pg_cur.execute("select pg_reload_conf()") - - if query is LfcQueryMethod.COMPUTE_CTL: - # Same thing as prewarm_lfc(), testing other method - http_client.prewarm_lfc(endpoint.endpoint_id) - else: - pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,)) - + prewarm_endpoint(method, http_client, pg_cur, lfc_state) nonlocal n_prewarms n_prewarms += 1 log.info(f"Number of prewarms: {n_prewarms}") @@ -203,7 +232,10 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet prewarm_thread = threading.Thread(target=prewarm) prewarm_thread.start() - time.sleep(20) + def prewarmed(): + assert n_prewarms > 5 + + wait_until(prewarmed) running = False for t in workload_threads: @@ -215,5 +247,5 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet assert total_balance == 0 check_pinned_entries(pg_cur) - if query is LfcQueryMethod.COMPUTE_CTL: + if method != PrewarmMethod.POSTGRES: assert prom_parse(http_client) == {OFFLOAD_LABEL: 1, PREWARM_LABEL: n_prewarms}