diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 941a21806f..3ae946c10e 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -2450,14 +2450,31 @@ LIMIT 100", pub fn spawn_lfc_offload_task(self: &Arc, interval: Duration) { self.terminate_lfc_offload_task(); let secs = interval.as_secs(); - info!("spawning lfc offload worker with {secs}s interval"); let this = self.clone(); + + info!("spawning LFC offload worker with {secs}s interval"); let handle = spawn(async move { let mut interval = time::interval(interval); interval.tick().await; // returns immediately loop { interval.tick().await; - this.offload_lfc_async().await; + + let prewarm_state = this.state.lock().unwrap().lfc_prewarm_state.clone(); + // Do not offload LFC state if we are currently prewarming or any issue occurred. + // If we'd do that, we might override the LFC state in endpoint storage with some + // incomplete state. Imagine a situation: + // 1. Endpoint started with `autoprewarm: true` + // 2. While prewarming is not completed, we upload the new incomplete state + // 3. Compute gets interrupted and restarts + // 4. We start again and try to prewarm with the state from 2. instead of the previous complete state + if matches!( + prewarm_state, + LfcPrewarmState::Completed + | LfcPrewarmState::NotPrewarmed + | LfcPrewarmState::Skipped + ) { + this.offload_lfc_async().await; + } } }); *self.lfc_offload_task.lock().unwrap() = Some(handle); diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index d014a5bb72..07b4a596cc 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -89,7 +89,7 @@ impl ComputeNode { self.state.lock().unwrap().lfc_offload_state.clone() } - /// If there is a prewarm request ongoing, return false, true otherwise + /// If there is a prewarm request ongoing, return `false`, `true` otherwise. pub fn prewarm_lfc(self: &Arc, from_endpoint: Option) -> bool { { let state = &mut self.state.lock().unwrap().lfc_prewarm_state; @@ -101,15 +101,25 @@ impl ComputeNode { let cloned = self.clone(); spawn(async move { - let Err(err) = cloned.prewarm_impl(from_endpoint).await else { - cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed; - return; - }; - crate::metrics::LFC_PREWARM_ERRORS.inc(); - error!(%err, "prewarming lfc"); - cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed { - error: err.to_string(), + let state = match cloned.prewarm_impl(from_endpoint).await { + Ok(true) => LfcPrewarmState::Completed, + Ok(false) => { + info!( + "skipping LFC prewarm because LFC state is not found in endpoint storage" + ); + LfcPrewarmState::Skipped + } + Err(err) => { + crate::metrics::LFC_PREWARM_ERRORS.inc(); + error!(%err, "could not prewarm LFC"); + + LfcPrewarmState::Failed { + error: err.to_string(), + } + } }; + + cloned.state.lock().unwrap().lfc_prewarm_state = state; }); true } @@ -120,15 +130,21 @@ impl ComputeNode { EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint) } - async fn prewarm_impl(&self, from_endpoint: Option) -> Result<()> { + /// Request LFC state from endpoint storage and load corresponding pages into Postgres. + /// Returns a result with `false` if the LFC state is not found in endpoint storage. + async fn prewarm_impl(&self, from_endpoint: Option) -> Result { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?; - info!(%url, "requesting LFC state from endpoint storage"); + info!(%url, "requesting LFC state from endpoint storage"); let request = Client::new().get(&url).bearer_auth(token); let res = request.send().await.context("querying endpoint storage")?; let status = res.status(); - if status != StatusCode::OK { - bail!("{status} querying endpoint storage") + match status { + StatusCode::OK => (), + StatusCode::NOT_FOUND => { + return Ok(false); + } + _ => bail!("{status} querying endpoint storage"), } let mut uncompressed = Vec::new(); @@ -141,7 +157,8 @@ impl ComputeNode { .await .context("decoding LFC state")?; let uncompressed_len = uncompressed.len(); - info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres"); + + info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres"); ComputeNode::get_maintenance_client(&self.tokio_conn_conf) .await @@ -149,7 +166,9 @@ impl ComputeNode { .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed]) .await .context("loading LFC state into postgres") - .map(|_| ()) + .map(|_| ())?; + + Ok(true) } /// If offload request is ongoing, return false, true otherwise @@ -177,12 +196,14 @@ impl ComputeNode { async fn offload_lfc_with_state_update(&self) { crate::metrics::LFC_OFFLOADS.inc(); + let Err(err) = self.offload_lfc_impl().await else { self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed; return; }; + crate::metrics::LFC_OFFLOAD_ERRORS.inc(); - error!(%err, "offloading lfc"); + error!(%err, "could not offload LFC state to endpoint storage"); self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed { error: err.to_string(), }; @@ -190,7 +211,7 @@ impl ComputeNode { async fn offload_lfc_impl(&self) -> Result<()> { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?; - info!(%url, "requesting LFC state from postgres"); + info!(%url, "requesting LFC state from Postgres"); let mut compressed = Vec::new(); ComputeNode::get_maintenance_client(&self.tokio_conn_conf) @@ -205,13 +226,17 @@ impl ComputeNode { .read_to_end(&mut compressed) .await .context("compressing LFC state")?; + let compressed_len = compressed.len(); info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage"); let request = Client::new().put(url).bearer_auth(token).body(compressed); match request.send().await { Ok(res) if res.status() == StatusCode::OK => Ok(()), - Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()), + Ok(res) => bail!( + "Request to endpoint storage failed with status: {}", + res.status() + ), Err(err) => Err(err).context("writing to endpoint storage"), } } diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 93a357e160..3cf5ea7c51 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -613,11 +613,11 @@ components: - skipped properties: status: - description: Lfc prewarm status - enum: [not_prewarmed, prewarming, completed, failed] + description: LFC prewarm status + enum: [not_prewarmed, prewarming, completed, failed, skipped] type: string error: - description: Lfc prewarm error, if any + description: LFC prewarm error, if any type: string total: description: Total pages processed @@ -635,11 +635,11 @@ components: - status properties: status: - description: Lfc offload status + description: LFC offload status enum: [not_offloaded, offloading, completed, failed] type: string error: - description: Lfc offload error, if any + description: LFC offload error, if any type: string PromoteState: diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 2fe233214a..5b8fc49750 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -46,16 +46,33 @@ pub struct ExtensionInstallResponse { pub version: ExtVersion, } +/// Status of the LFC prewarm process. The same state machine is reused for +/// both autoprewarm (prewarm after compute/Postgres start using the previously +/// stored LFC state) and explicit prewarming via API. #[derive(Serialize, Default, Debug, Clone, PartialEq)] #[serde(tag = "status", rename_all = "snake_case")] pub enum LfcPrewarmState { + /// Default value when compute boots up. #[default] NotPrewarmed, + /// Prewarming thread is active and loading pages into LFC. Prewarming, + /// We found requested LFC state in the endpoint storage and + /// completed prewarming successfully. Completed, - Failed { - error: String, - }, + /// Unexpected error happened during prewarming. Note, `Not Found 404` + /// response from the endpoint storage is explicitly excluded here + /// because it can normally happen on the first compute start, + /// since LFC state is not available yet. + Failed { error: String }, + /// We tried to fetch the corresponding LFC state from the endpoint storage, + /// but received `Not Found 404`. This should normally happen only during the + /// first endpoint start after creation with `autoprewarm: true`. + /// + /// During the orchestrated prewarm via API, when a caller explicitly + /// provides the LFC state key to prewarm from, it's the caller responsibility + /// to handle this status as an error state in this case. + Skipped, } impl Display for LfcPrewarmState { @@ -64,6 +81,7 @@ impl Display for LfcPrewarmState { LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"), LfcPrewarmState::Prewarming => f.write_str("Prewarming"), LfcPrewarmState::Completed => f.write_str("Completed"), + LfcPrewarmState::Skipped => f.write_str("Skipped"), LfcPrewarmState::Failed { error } => write!(f, "Error({error})"), } }