From 53a05e8ccbb8b17a5eec07d96c0a1182cf717ffd Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Thu, 17 Jul 2025 23:43:43 +0200 Subject: [PATCH] fix(compute_ctl): Only offload LFC state if no prewarming is in progress (#12645) ## Problem We currently offload LFC state unconditionally, which can cause problems. Imagine a situation: 1. Endpoint started with `autoprewarm: true`. 2. While prewarming is not completed, we upload the new incomplete state. 3. Compute gets interrupted and restarts. 4. We start again and try to prewarm with the state from 2. instead of the previous complete state. During the orchestrated prewarming, it's probably not a big issue, but it's still better to do not interfere with the prewarm process. ## Summary of changes Do not offload LFC state if we are currently prewarming or any issue occurred. While on it, also introduce `Skipped` LFC prewarm status, which is used when the corresponding LFC state is not present in the endpoint storage. It's primarily needed to distinguish the first compute start for particular endpoint, as it's completely valid to do not have LFC state yet. --- compute_tools/src/compute.rs | 21 +++++++- compute_tools/src/compute_prewarm.rs | 61 +++++++++++++++++------- compute_tools/src/http/openapi_spec.yaml | 10 ++-- libs/compute_api/src/responses.rs | 24 ++++++++-- 4 files changed, 88 insertions(+), 28 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 941a21806f..3ae946c10e 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -2450,14 +2450,31 @@ LIMIT 100", pub fn spawn_lfc_offload_task(self: &Arc, interval: Duration) { self.terminate_lfc_offload_task(); let secs = interval.as_secs(); - info!("spawning lfc offload worker with {secs}s interval"); let this = self.clone(); + + info!("spawning LFC offload worker with {secs}s interval"); let handle = spawn(async move { let mut interval = time::interval(interval); interval.tick().await; // returns immediately loop { interval.tick().await; - this.offload_lfc_async().await; + + let prewarm_state = this.state.lock().unwrap().lfc_prewarm_state.clone(); + // Do not offload LFC state if we are currently prewarming or any issue occurred. + // If we'd do that, we might override the LFC state in endpoint storage with some + // incomplete state. Imagine a situation: + // 1. Endpoint started with `autoprewarm: true` + // 2. While prewarming is not completed, we upload the new incomplete state + // 3. Compute gets interrupted and restarts + // 4. We start again and try to prewarm with the state from 2. instead of the previous complete state + if matches!( + prewarm_state, + LfcPrewarmState::Completed + | LfcPrewarmState::NotPrewarmed + | LfcPrewarmState::Skipped + ) { + this.offload_lfc_async().await; + } } }); *self.lfc_offload_task.lock().unwrap() = Some(handle); diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index d014a5bb72..07b4a596cc 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -89,7 +89,7 @@ impl ComputeNode { self.state.lock().unwrap().lfc_offload_state.clone() } - /// If there is a prewarm request ongoing, return false, true otherwise + /// If there is a prewarm request ongoing, return `false`, `true` otherwise. pub fn prewarm_lfc(self: &Arc, from_endpoint: Option) -> bool { { let state = &mut self.state.lock().unwrap().lfc_prewarm_state; @@ -101,15 +101,25 @@ impl ComputeNode { let cloned = self.clone(); spawn(async move { - let Err(err) = cloned.prewarm_impl(from_endpoint).await else { - cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed; - return; - }; - crate::metrics::LFC_PREWARM_ERRORS.inc(); - error!(%err, "prewarming lfc"); - cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed { - error: err.to_string(), + let state = match cloned.prewarm_impl(from_endpoint).await { + Ok(true) => LfcPrewarmState::Completed, + Ok(false) => { + info!( + "skipping LFC prewarm because LFC state is not found in endpoint storage" + ); + LfcPrewarmState::Skipped + } + Err(err) => { + crate::metrics::LFC_PREWARM_ERRORS.inc(); + error!(%err, "could not prewarm LFC"); + + LfcPrewarmState::Failed { + error: err.to_string(), + } + } }; + + cloned.state.lock().unwrap().lfc_prewarm_state = state; }); true } @@ -120,15 +130,21 @@ impl ComputeNode { EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint) } - async fn prewarm_impl(&self, from_endpoint: Option) -> Result<()> { + /// Request LFC state from endpoint storage and load corresponding pages into Postgres. + /// Returns a result with `false` if the LFC state is not found in endpoint storage. + async fn prewarm_impl(&self, from_endpoint: Option) -> Result { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?; - info!(%url, "requesting LFC state from endpoint storage"); + info!(%url, "requesting LFC state from endpoint storage"); let request = Client::new().get(&url).bearer_auth(token); let res = request.send().await.context("querying endpoint storage")?; let status = res.status(); - if status != StatusCode::OK { - bail!("{status} querying endpoint storage") + match status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => { + return Ok(false); + } + _ => bail!("{status} querying endpoint storage"), } let mut uncompressed = Vec::new(); @@ -141,7 +157,8 @@ impl ComputeNode { .await .context("decoding LFC state")?; let uncompressed_len = uncompressed.len(); - info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres"); + + info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres"); ComputeNode::get_maintenance_client(&self.tokio_conn_conf) .await @@ -149,7 +166,9 @@ impl ComputeNode { .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed]) .await .context("loading LFC state into postgres") - .map(|_| ()) + .map(|_| ())?; + + Ok(true) } /// If offload request is ongoing, return false, true otherwise @@ -177,12 +196,14 @@ impl ComputeNode { async fn offload_lfc_with_state_update(&self) { crate::metrics::LFC_OFFLOADS.inc(); + let Err(err) = self.offload_lfc_impl().await else { self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed; return; }; + crate::metrics::LFC_OFFLOAD_ERRORS.inc(); - error!(%err, "offloading lfc"); + error!(%err, "could not offload LFC state to endpoint storage"); self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed { error: err.to_string(), }; @@ -190,7 +211,7 @@ impl ComputeNode { async fn offload_lfc_impl(&self) -> Result<()> { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?; - info!(%url, "requesting LFC state from postgres"); + info!(%url, "requesting LFC state from Postgres"); let mut compressed = Vec::new(); ComputeNode::get_maintenance_client(&self.tokio_conn_conf) @@ -205,13 +226,17 @@ impl ComputeNode { .read_to_end(&mut compressed) .await .context("compressing LFC state")?; + let compressed_len = compressed.len(); info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage"); let request = Client::new().put(url).bearer_auth(token).body(compressed); match request.send().await { Ok(res) if res.status() == StatusCode::OK => Ok(()), - Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()), + Ok(res) => bail!( + "Request to endpoint storage failed with status: {}", + res.status() + ), Err(err) => Err(err).context("writing to endpoint storage"), } } diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 93a357e160..3cf5ea7c51 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -613,11 +613,11 @@ components: - skipped properties: status: - description: Lfc prewarm status - enum: [not_prewarmed, prewarming, completed, failed] + description: LFC prewarm status + enum: [not_prewarmed, prewarming, completed, failed, skipped] type: string error: - description: Lfc prewarm error, if any + description: LFC prewarm error, if any type: string total: description: Total pages processed @@ -635,11 +635,11 @@ components: - status properties: status: - description: Lfc offload status + description: LFC offload status enum: [not_offloaded, offloading, completed, failed] type: string error: - description: Lfc offload error, if any + description: LFC offload error, if any type: string PromoteState: diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 2fe233214a..5b8fc49750 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -46,16 +46,33 @@ pub struct ExtensionInstallResponse { pub version: ExtVersion, } +/// Status of the LFC prewarm process. The same state machine is reused for +/// both autoprewarm (prewarm after compute/Postgres start using the previously +/// stored LFC state) and explicit prewarming via API. #[derive(Serialize, Default, Debug, Clone, PartialEq)] #[serde(tag = "status", rename_all = "snake_case")] pub enum LfcPrewarmState { + /// Default value when compute boots up. #[default] NotPrewarmed, + /// Prewarming thread is active and loading pages into LFC. Prewarming, + /// We found requested LFC state in the endpoint storage and + /// completed prewarming successfully. Completed, - Failed { - error: String, - }, + /// Unexpected error happened during prewarming. Note, `Not Found 404` + /// response from the endpoint storage is explicitly excluded here + /// because it can normally happen on the first compute start, + /// since LFC state is not available yet. + Failed { error: String }, + /// We tried to fetch the corresponding LFC state from the endpoint storage, + /// but received `Not Found 404`. This should normally happen only during the + /// first endpoint start after creation with `autoprewarm: true`. + /// + /// During the orchestrated prewarm via API, when a caller explicitly + /// provides the LFC state key to prewarm from, it's the caller responsibility + /// to handle this status as an error state in this case. + Skipped, } impl Display for LfcPrewarmState { @@ -64,6 +81,7 @@ impl Display for LfcPrewarmState { LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"), LfcPrewarmState::Prewarming => f.write_str("Prewarming"), LfcPrewarmState::Completed => f.write_str("Completed"), + LfcPrewarmState::Skipped => f.write_str("Skipped"), LfcPrewarmState::Failed { error } => write!(f, "Error({error})"), } }