mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
storcon: signal LSN wait to pageserver during live migration (#10452)
## Problem We've seen the ingest connection manager get stuck shortly after a migration. ## Summary of changes A speculative mitigation is to use the same mechanism as get page requests for kicking LSN ingest. The connection manager monitors LSN waits and queries the broker if no updates are received for the timeline. Closes https://github.com/neondatabase/neon/issues/10351
This commit is contained in:
@@ -1021,6 +1021,13 @@ pub struct TenantConfigPatchRequest {
|
||||
pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantWaitLsnRequest {
|
||||
#[serde(flatten)]
|
||||
pub timelines: HashMap<TimelineId, Lsn>,
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[serde(tag = "slug", content = "data", rename_all = "snake_case")]
|
||||
|
||||
@@ -763,4 +763,19 @@ impl Client {
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn wait_lsn(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
request: TenantWaitLsnRequest,
|
||||
) -> Result<StatusCode> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/wait_lsn",
|
||||
self.mgmt_api_endpoint,
|
||||
);
|
||||
|
||||
self.request_noerror(Method::POST, uri, request)
|
||||
.await
|
||||
.map(|resp| resp.status())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use enumset::EnumSet;
|
||||
use futures::future::join_all;
|
||||
use futures::StreamExt;
|
||||
use futures::TryFutureExt;
|
||||
use humantime::format_rfc3339;
|
||||
@@ -40,6 +41,7 @@ use pageserver_api::models::TenantShardSplitRequest;
|
||||
use pageserver_api::models::TenantShardSplitResponse;
|
||||
use pageserver_api::models::TenantSorting;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::TenantWaitLsnRequest;
|
||||
use pageserver_api::models::TimelineArchivalConfigRequest;
|
||||
use pageserver_api::models::TimelineCreateRequestMode;
|
||||
use pageserver_api::models::TimelineCreateRequestModeImportPgdata;
|
||||
@@ -95,6 +97,8 @@ use crate::tenant::timeline::CompactOptions;
|
||||
use crate::tenant::timeline::CompactRequest;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::timeline::WaitLsnTimeout;
|
||||
use crate::tenant::timeline::WaitLsnWaiter;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::OffloadedTimeline;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
@@ -2790,6 +2794,63 @@ async fn secondary_download_handler(
|
||||
json_response(status, progress)
|
||||
}
|
||||
|
||||
async fn wait_lsn_handler(
|
||||
mut request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let wait_lsn_request: TenantWaitLsnRequest = json_request(&mut request).await?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
let mut wait_futures = Vec::default();
|
||||
for timeline in tenant.list_timelines() {
|
||||
let Some(lsn) = wait_lsn_request.timelines.get(&timeline.timeline_id) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let fut = {
|
||||
let timeline = timeline.clone();
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
async move {
|
||||
timeline
|
||||
.wait_lsn(
|
||||
*lsn,
|
||||
WaitLsnWaiter::HttpEndpoint,
|
||||
WaitLsnTimeout::Custom(wait_lsn_request.timeout),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
};
|
||||
wait_futures.push(fut);
|
||||
}
|
||||
|
||||
if wait_futures.is_empty() {
|
||||
return json_response(StatusCode::NOT_FOUND, ());
|
||||
}
|
||||
|
||||
let all_done = tokio::select! {
|
||||
results = join_all(wait_futures) => {
|
||||
results.iter().all(|res| res.is_ok())
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Err(ApiError::Cancelled);
|
||||
}
|
||||
};
|
||||
|
||||
let status = if all_done {
|
||||
StatusCode::OK
|
||||
} else {
|
||||
StatusCode::ACCEPTED
|
||||
};
|
||||
|
||||
json_response(status, ())
|
||||
}
|
||||
|
||||
async fn secondary_status_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3577,6 +3638,9 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
|
||||
api_handler(r, secondary_download_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_shard_id/wait_lsn", |r| {
|
||||
api_handler(r, wait_lsn_handler)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_shard_id/break", |r| {
|
||||
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
|
||||
})
|
||||
|
||||
@@ -1708,6 +1708,7 @@ impl PageServerHandler {
|
||||
.wait_lsn(
|
||||
not_modified_since,
|
||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||
timeline::WaitLsnTimeout::Default,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2044,6 +2045,7 @@ impl PageServerHandler {
|
||||
.wait_lsn(
|
||||
lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||
crate::tenant::timeline::WaitLsnTimeout::Default,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -2560,7 +2560,12 @@ impl Tenant {
|
||||
// sizes etc. and that would get confused if the previous page versions
|
||||
// are not in the repository yet.
|
||||
ancestor_timeline
|
||||
.wait_lsn(*lsn, timeline::WaitLsnWaiter::Tenant, ctx)
|
||||
.wait_lsn(
|
||||
*lsn,
|
||||
timeline::WaitLsnWaiter::Tenant,
|
||||
timeline::WaitLsnTimeout::Default,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState { .. }) => {
|
||||
|
||||
@@ -1643,6 +1643,7 @@ impl TenantManager {
|
||||
.wait_lsn(
|
||||
*target_lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::Tenant,
|
||||
crate::tenant::timeline::WaitLsnTimeout::Default,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -901,10 +901,17 @@ impl From<GetReadyAncestorError> for PageReconstructError {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum WaitLsnTimeout {
|
||||
Custom(Duration),
|
||||
// Use the [`PageServerConf::wait_lsn_timeout`] default
|
||||
Default,
|
||||
}
|
||||
|
||||
pub(crate) enum WaitLsnWaiter<'a> {
|
||||
Timeline(&'a Timeline),
|
||||
Tenant,
|
||||
PageService,
|
||||
HttpEndpoint,
|
||||
}
|
||||
|
||||
/// Argument to [`Timeline::shutdown`].
|
||||
@@ -1301,6 +1308,7 @@ impl Timeline {
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
who_is_waiting: WaitLsnWaiter<'_>,
|
||||
timeout: WaitLsnTimeout,
|
||||
ctx: &RequestContext, /* Prepare for use by cancellation */
|
||||
) -> Result<(), WaitLsnError> {
|
||||
let state = self.current_state();
|
||||
@@ -1317,7 +1325,7 @@ impl Timeline {
|
||||
| TaskKind::WalReceiverConnectionPoller => {
|
||||
let is_myself = match who_is_waiting {
|
||||
WaitLsnWaiter::Timeline(waiter) => Weak::ptr_eq(&waiter.myself, &self.myself),
|
||||
WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()),
|
||||
WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService | WaitLsnWaiter::HttpEndpoint => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()),
|
||||
};
|
||||
if is_myself {
|
||||
if let Err(current) = self.last_record_lsn.would_wait_for(lsn) {
|
||||
@@ -1333,13 +1341,14 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let timeout = match timeout {
|
||||
WaitLsnTimeout::Custom(t) => t,
|
||||
WaitLsnTimeout::Default => self.conf.wait_lsn_timeout,
|
||||
};
|
||||
|
||||
let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
|
||||
|
||||
match self
|
||||
.last_record_lsn
|
||||
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
|
||||
.await
|
||||
{
|
||||
match self.last_record_lsn.wait_for_timeout(lsn, timeout).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
use utils::seqwait::SeqWaitError::*;
|
||||
@@ -3590,7 +3599,12 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
ancestor
|
||||
.wait_lsn(self.ancestor_lsn, WaitLsnWaiter::Timeline(self), ctx)
|
||||
.wait_lsn(
|
||||
self.ancestor_lsn,
|
||||
WaitLsnWaiter::Timeline(self),
|
||||
WaitLsnTimeout::Default,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
|
||||
|
||||
@@ -274,7 +274,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
};
|
||||
|
||||
last_discovery_ts = Some(std::time::Instant::now());
|
||||
debug!("No active connection and no candidates, sending discovery request to the broker");
|
||||
info!("No active connection and no candidates, sending discovery request to the broker");
|
||||
|
||||
// Cancellation safety: we want to send a message to the broker, but publish_one()
|
||||
// function can get cancelled by the other select! arm. This is absolutely fine, because
|
||||
|
||||
@@ -2,8 +2,9 @@ use pageserver_api::{
|
||||
models::{
|
||||
detach_ancestor::AncestorDetached, LocationConfig, LocationConfigListResponse,
|
||||
PageserverUtilization, SecondaryProgress, TenantScanRemoteStorageResponse,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse, TimelineArchivalConfigRequest,
|
||||
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse, TenantWaitLsnRequest,
|
||||
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest,
|
||||
TopTenantShardsResponse,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
@@ -299,4 +300,17 @@ impl PageserverClient {
|
||||
self.inner.top_tenant_shards(request).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_lsn(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
request: TenantWaitLsnRequest,
|
||||
) -> Result<StatusCode> {
|
||||
measured_request!(
|
||||
"wait_lsn",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.wait_lsn(tenant_shard_id, request).await
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
};
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_client::mgmt_api;
|
||||
@@ -348,6 +348,32 @@ impl Reconciler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_lsn(
|
||||
&self,
|
||||
node: &Node,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timelines: HashMap<TimelineId, Lsn>,
|
||||
) -> Result<StatusCode, ReconcileError> {
|
||||
const TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
client
|
||||
.wait_lsn(
|
||||
tenant_shard_id,
|
||||
TenantWaitLsnRequest {
|
||||
timelines,
|
||||
timeout: TIMEOUT,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
async fn get_lsns(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -461,6 +487,39 @@ impl Reconciler {
|
||||
node: &Node,
|
||||
baseline: HashMap<TimelineId, Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Signal to the pageserver that it should ingest up to the baseline LSNs.
|
||||
loop {
|
||||
match self.wait_lsn(node, tenant_shard_id, baseline.clone()).await {
|
||||
Ok(StatusCode::OK) => {
|
||||
// Everything is caught up
|
||||
return Ok(());
|
||||
}
|
||||
Ok(StatusCode::ACCEPTED) => {
|
||||
// Some timelines are not caught up yet.
|
||||
// They'll be polled below.
|
||||
break;
|
||||
}
|
||||
Ok(StatusCode::NOT_FOUND) => {
|
||||
// None of the timelines are present on the pageserver.
|
||||
// This is correct if they've all been deleted, but
|
||||
// let let the polling loop below cross check.
|
||||
break;
|
||||
}
|
||||
Ok(status_code) => {
|
||||
tracing::warn!(
|
||||
"Unexpected status code ({status_code}) returned by wait_lsn endpoint"
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!("🕑 Can't trigger LSN wait on {node} yet, waiting ({e})",);
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poll the LSNs until they catch up
|
||||
loop {
|
||||
let latest = match self.get_lsns(tenant_shard_id, node).await {
|
||||
Ok(l) => l,
|
||||
|
||||
Reference in New Issue
Block a user