mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
endpoint_storage: add ?from_endpoint= to /lfc/prewarm (#12195)
Related: https://github.com/neondatabase/cloud/issues/24225 Add optional from_endpoint parameter to allow prewarming from other endpoint
This commit is contained in:
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -753,6 +753,7 @@ dependencies = [
|
|||||||
"axum",
|
"axum",
|
||||||
"axum-core",
|
"axum-core",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"form_urlencoded",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"headers",
|
"headers",
|
||||||
"http 1.1.0",
|
"http 1.1.0",
|
||||||
@@ -761,6 +762,8 @@ dependencies = [
|
|||||||
"mime",
|
"mime",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_html_form",
|
||||||
|
"serde_path_to_error",
|
||||||
"tower 0.5.2",
|
"tower 0.5.2",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
@@ -6422,6 +6425,19 @@ dependencies = [
|
|||||||
"syn 2.0.100",
|
"syn 2.0.100",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_html_form"
|
||||||
|
version = "0.2.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
|
||||||
|
dependencies = [
|
||||||
|
"form_urlencoded",
|
||||||
|
"indexmap 2.9.0",
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_json"
|
name = "serde_json"
|
||||||
version = "1.0.125"
|
version = "1.0.125"
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ aws-credential-types = "1.2.0"
|
|||||||
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
||||||
aws-types = "1.3"
|
aws-types = "1.3"
|
||||||
axum = { version = "0.8.1", features = ["ws"] }
|
axum = { version = "0.8.1", features = ["ws"] }
|
||||||
axum-extra = { version = "0.10.0", features = ["typed-header"] }
|
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
|
||||||
base64 = "0.13.0"
|
base64 = "0.13.0"
|
||||||
bincode = "1.3"
|
bincode = "1.3"
|
||||||
bindgen = "0.71"
|
bindgen = "0.71"
|
||||||
|
|||||||
@@ -785,7 +785,7 @@ impl ComputeNode {
|
|||||||
self.spawn_extension_stats_task();
|
self.spawn_extension_stats_task();
|
||||||
|
|
||||||
if pspec.spec.autoprewarm {
|
if pspec.spec.autoprewarm {
|
||||||
self.prewarm_lfc();
|
self.prewarm_lfc(None);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,11 +25,16 @@ struct EndpointStoragePair {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const KEY: &str = "lfc_state";
|
const KEY: &str = "lfc_state";
|
||||||
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
|
impl EndpointStoragePair {
|
||||||
type Error = anyhow::Error;
|
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
|
||||||
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
|
/// If not None, takes precedence over pspec.spec.endpoint_id
|
||||||
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
|
fn from_spec_and_endpoint(
|
||||||
bail!("pspec.endpoint_id missing")
|
pspec: &crate::compute::ParsedSpec,
|
||||||
|
endpoint_id: Option<String>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
|
||||||
|
let Some(ref endpoint_id) = endpoint_id else {
|
||||||
|
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
|
||||||
};
|
};
|
||||||
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
|
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
|
||||||
bail!("pspec.endpoint_storage_addr missing")
|
bail!("pspec.endpoint_storage_addr missing")
|
||||||
@@ -84,7 +89,7 @@ impl ComputeNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns false if there is a prewarm request ongoing, true otherwise
|
/// Returns false if there is a prewarm request ongoing, true otherwise
|
||||||
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
|
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
|
||||||
crate::metrics::LFC_PREWARM_REQUESTS.inc();
|
crate::metrics::LFC_PREWARM_REQUESTS.inc();
|
||||||
{
|
{
|
||||||
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
||||||
@@ -97,7 +102,7 @@ impl ComputeNode {
|
|||||||
|
|
||||||
let cloned = self.clone();
|
let cloned = self.clone();
|
||||||
spawn(async move {
|
spawn(async move {
|
||||||
let Err(err) = cloned.prewarm_impl().await else {
|
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
|
||||||
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
|
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
@@ -109,13 +114,14 @@ impl ComputeNode {
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
|
/// from_endpoint: None for endpoint managed by this compute_ctl
|
||||||
|
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
state.pspec.as_ref().unwrap().try_into()
|
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn prewarm_impl(&self) -> Result<()> {
|
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
|
||||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
|
||||||
info!(%url, "requesting LFC state from endpoint storage");
|
info!(%url, "requesting LFC state from endpoint storage");
|
||||||
|
|
||||||
let request = Client::new().get(&url).bearer_auth(token);
|
let request = Client::new().get(&url).bearer_auth(token);
|
||||||
@@ -173,7 +179,7 @@ impl ComputeNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn offload_lfc_impl(&self) -> Result<()> {
|
async fn offload_lfc_impl(&self) -> Result<()> {
|
||||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
|
||||||
info!(%url, "requesting LFC state from postgres");
|
info!(%url, "requesting LFC state from postgres");
|
||||||
|
|
||||||
let mut compressed = Vec::new();
|
let mut compressed = Vec::new();
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
|
|||||||
use crate::http::JsonResponse;
|
use crate::http::JsonResponse;
|
||||||
use axum::response::{IntoResponse, Response};
|
use axum::response::{IntoResponse, Response};
|
||||||
use axum::{Json, http::StatusCode};
|
use axum::{Json, http::StatusCode};
|
||||||
|
use axum_extra::extract::OptionalQuery;
|
||||||
use compute_api::responses::LfcOffloadState;
|
use compute_api::responses::LfcOffloadState;
|
||||||
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
||||||
|
|
||||||
@@ -16,8 +17,16 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
|
|||||||
Json(compute.lfc_offload_state())
|
Json(compute.lfc_offload_state())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
|
#[derive(serde::Deserialize)]
|
||||||
if compute.prewarm_lfc() {
|
pub struct PrewarmQuery {
|
||||||
|
pub from_endpoint: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(in crate::http) async fn prewarm(
|
||||||
|
compute: Compute,
|
||||||
|
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
|
||||||
|
) -> Response {
|
||||||
|
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
|
||||||
StatusCode::ACCEPTED.into_response()
|
StatusCode::ACCEPTED.into_response()
|
||||||
} else {
|
} else {
|
||||||
JsonResponse::error(
|
JsonResponse::error(
|
||||||
|
|||||||
@@ -69,8 +69,10 @@ class EndpointHttpClient(requests.Session):
|
|||||||
json: dict[str, str] = res.json()
|
json: dict[str, str] = res.json()
|
||||||
return json
|
return json
|
||||||
|
|
||||||
def prewarm_lfc(self):
|
def prewarm_lfc(self, from_endpoint_id: str | None = None):
|
||||||
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
|
url: str = f"http://localhost:{self.external_port}/lfc/prewarm"
|
||||||
|
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
|
||||||
|
self.post(url, params=params).raise_for_status()
|
||||||
|
|
||||||
def prewarmed():
|
def prewarmed():
|
||||||
json = self.prewarm_lfc_status()
|
json = self.prewarm_lfc_status()
|
||||||
|
|||||||
@@ -188,7 +188,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
|
|||||||
pg_cur.execute("select pg_reload_conf()")
|
pg_cur.execute("select pg_reload_conf()")
|
||||||
|
|
||||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||||
http_client.prewarm_lfc()
|
# Same thing as prewarm_lfc(), testing other method
|
||||||
|
http_client.prewarm_lfc(endpoint.endpoint_id)
|
||||||
else:
|
else:
|
||||||
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user