diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index dcc233c7c4..16f89ae13b 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1021,6 +1021,13 @@ pub struct TenantConfigPatchRequest { pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it } +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantWaitLsnRequest { + #[serde(flatten)] + pub timelines: HashMap, + pub timeout: Duration, +} + /// See [`TenantState::attachment_status`] and the OpenAPI docs for context. #[derive(Serialize, Deserialize, Clone)] #[serde(tag = "slug", content = "data", rename_all = "snake_case")] diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 4e9b11879d..0359bfcd0b 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -763,4 +763,19 @@ impl Client { .await .map_err(Error::ReceiveBody) } + + pub async fn wait_lsn( + &self, + tenant_shard_id: TenantShardId, + request: TenantWaitLsnRequest, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{tenant_shard_id}/wait_lsn", + self.mgmt_api_endpoint, + ); + + self.request_noerror(Method::POST, uri, request) + .await + .map(|resp| resp.status()) + } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 5452719bcd..0f3e9fdab6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -10,6 +10,7 @@ use std::time::Duration; use anyhow::{anyhow, Context, Result}; use enumset::EnumSet; +use futures::future::join_all; use futures::StreamExt; use futures::TryFutureExt; use humantime::format_rfc3339; @@ -40,6 +41,7 @@ use pageserver_api::models::TenantShardSplitRequest; use pageserver_api::models::TenantShardSplitResponse; use pageserver_api::models::TenantSorting; use pageserver_api::models::TenantState; +use pageserver_api::models::TenantWaitLsnRequest; use pageserver_api::models::TimelineArchivalConfigRequest; use pageserver_api::models::TimelineCreateRequestMode; use pageserver_api::models::TimelineCreateRequestModeImportPgdata; @@ -95,6 +97,8 @@ use crate::tenant::timeline::CompactOptions; use crate::tenant::timeline::CompactRequest; use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::Timeline; +use crate::tenant::timeline::WaitLsnTimeout; +use crate::tenant::timeline::WaitLsnWaiter; use crate::tenant::GetTimelineError; use crate::tenant::OffloadedTimeline; use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; @@ -2790,6 +2794,63 @@ async fn secondary_download_handler( json_response(status, progress) } +async fn wait_lsn_handler( + mut request: Request, + cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let wait_lsn_request: TenantWaitLsnRequest = json_request(&mut request).await?; + + let state = get_state(&request); + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + + let mut wait_futures = Vec::default(); + for timeline in tenant.list_timelines() { + let Some(lsn) = wait_lsn_request.timelines.get(&timeline.timeline_id) else { + continue; + }; + + let fut = { + let timeline = timeline.clone(); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); + async move { + timeline + .wait_lsn( + *lsn, + WaitLsnWaiter::HttpEndpoint, + WaitLsnTimeout::Custom(wait_lsn_request.timeout), + &ctx, + ) + .await + } + }; + wait_futures.push(fut); + } + + if wait_futures.is_empty() { + return json_response(StatusCode::NOT_FOUND, ()); + } + + let all_done = tokio::select! { + results = join_all(wait_futures) => { + results.iter().all(|res| res.is_ok()) + }, + _ = cancel.cancelled() => { + return Err(ApiError::Cancelled); + } + }; + + let status = if all_done { + StatusCode::OK + } else { + StatusCode::ACCEPTED + }; + + json_response(status, ()) +} + async fn secondary_status_handler( request: Request, _cancel: CancellationToken, @@ -3577,6 +3638,9 @@ pub fn make_router( .post("/v1/tenant/:tenant_shard_id/secondary/download", |r| { api_handler(r, secondary_download_handler) }) + .post("/v1/tenant/:tenant_shard_id/wait_lsn", |r| { + api_handler(r, wait_lsn_handler) + }) .put("/v1/tenant/:tenant_shard_id/break", |r| { testing_api_handler("set tenant state to broken", r, handle_tenant_break) }) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e5063b7fc2..e103338c7c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1708,6 +1708,7 @@ impl PageServerHandler { .wait_lsn( not_modified_since, crate::tenant::timeline::WaitLsnWaiter::PageService, + timeline::WaitLsnTimeout::Default, ctx, ) .await?; @@ -2044,6 +2045,7 @@ impl PageServerHandler { .wait_lsn( lsn, crate::tenant::timeline::WaitLsnWaiter::PageService, + crate::tenant::timeline::WaitLsnTimeout::Default, ctx, ) .await?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4385fe9a9b..4361fa3d66 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2560,7 +2560,12 @@ impl Tenant { // sizes etc. and that would get confused if the previous page versions // are not in the repository yet. ancestor_timeline - .wait_lsn(*lsn, timeline::WaitLsnWaiter::Tenant, ctx) + .wait_lsn( + *lsn, + timeline::WaitLsnWaiter::Tenant, + timeline::WaitLsnTimeout::Default, + ctx, + ) .await .map_err(|e| match e { e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState { .. }) => { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index e8b0d1d4dd..dfa89a765c 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1643,6 +1643,7 @@ impl TenantManager { .wait_lsn( *target_lsn, crate::tenant::timeline::WaitLsnWaiter::Tenant, + crate::tenant::timeline::WaitLsnTimeout::Default, ctx, ) .await diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index de990a9fe4..076220df51 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -901,10 +901,17 @@ impl From for PageReconstructError { } } +pub(crate) enum WaitLsnTimeout { + Custom(Duration), + // Use the [`PageServerConf::wait_lsn_timeout`] default + Default, +} + pub(crate) enum WaitLsnWaiter<'a> { Timeline(&'a Timeline), Tenant, PageService, + HttpEndpoint, } /// Argument to [`Timeline::shutdown`]. @@ -1301,6 +1308,7 @@ impl Timeline { &self, lsn: Lsn, who_is_waiting: WaitLsnWaiter<'_>, + timeout: WaitLsnTimeout, ctx: &RequestContext, /* Prepare for use by cancellation */ ) -> Result<(), WaitLsnError> { let state = self.current_state(); @@ -1317,7 +1325,7 @@ impl Timeline { | TaskKind::WalReceiverConnectionPoller => { let is_myself = match who_is_waiting { WaitLsnWaiter::Timeline(waiter) => Weak::ptr_eq(&waiter.myself, &self.myself), - WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()), + WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService | WaitLsnWaiter::HttpEndpoint => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()), }; if is_myself { if let Err(current) = self.last_record_lsn.would_wait_for(lsn) { @@ -1333,13 +1341,14 @@ impl Timeline { } } + let timeout = match timeout { + WaitLsnTimeout::Custom(t) => t, + WaitLsnTimeout::Default => self.conf.wait_lsn_timeout, + }; + let _timer = crate::metrics::WAIT_LSN_TIME.start_timer(); - match self - .last_record_lsn - .wait_for_timeout(lsn, self.conf.wait_lsn_timeout) - .await - { + match self.last_record_lsn.wait_for_timeout(lsn, timeout).await { Ok(()) => Ok(()), Err(e) => { use utils::seqwait::SeqWaitError::*; @@ -3590,7 +3599,12 @@ impl Timeline { } } ancestor - .wait_lsn(self.ancestor_lsn, WaitLsnWaiter::Timeline(self), ctx) + .wait_lsn( + self.ancestor_lsn, + WaitLsnWaiter::Timeline(self), + WaitLsnTimeout::Default, + ctx, + ) .await .map_err(|e| match e { e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e), diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index d8ea32cd45..65f9d39078 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -274,7 +274,7 @@ pub(super) async fn connection_manager_loop_step( }; last_discovery_ts = Some(std::time::Instant::now()); - debug!("No active connection and no candidates, sending discovery request to the broker"); + info!("No active connection and no candidates, sending discovery request to the broker"); // Cancellation safety: we want to send a message to the broker, but publish_one() // function can get cancelled by the other select! arm. This is absolutely fine, because diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index b19cbc4fa3..141ff6f720 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -2,8 +2,9 @@ use pageserver_api::{ models::{ detach_ancestor::AncestorDetached, LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress, TenantScanRemoteStorageResponse, - TenantShardSplitRequest, TenantShardSplitResponse, TimelineArchivalConfigRequest, - TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse, + TenantShardSplitRequest, TenantShardSplitResponse, TenantWaitLsnRequest, + TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, + TopTenantShardsResponse, }, shard::TenantShardId, }; @@ -299,4 +300,17 @@ impl PageserverClient { self.inner.top_tenant_shards(request).await ) } + + pub(crate) async fn wait_lsn( + &self, + tenant_shard_id: TenantShardId, + request: TenantWaitLsnRequest, + ) -> Result { + measured_request!( + "wait_lsn", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner.wait_lsn(tenant_shard_id, request).await + ) + } } diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index adced3b77d..03db947263 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -3,7 +3,7 @@ use crate::persistence::Persistence; use crate::{compute_hook, service}; use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy}; use pageserver_api::models::{ - LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, + LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest, }; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use pageserver_client::mgmt_api; @@ -348,6 +348,32 @@ impl Reconciler { Ok(()) } + async fn wait_lsn( + &self, + node: &Node, + tenant_shard_id: TenantShardId, + timelines: HashMap, + ) -> Result { + const TIMEOUT: Duration = Duration::from_secs(10); + + let client = PageserverClient::new( + node.get_id(), + node.base_url(), + self.service_config.jwt_token.as_deref(), + ); + + client + .wait_lsn( + tenant_shard_id, + TenantWaitLsnRequest { + timelines, + timeout: TIMEOUT, + }, + ) + .await + .map_err(|e| e.into()) + } + async fn get_lsns( &self, tenant_shard_id: TenantShardId, @@ -461,6 +487,39 @@ impl Reconciler { node: &Node, baseline: HashMap, ) -> anyhow::Result<()> { + // Signal to the pageserver that it should ingest up to the baseline LSNs. + loop { + match self.wait_lsn(node, tenant_shard_id, baseline.clone()).await { + Ok(StatusCode::OK) => { + // Everything is caught up + return Ok(()); + } + Ok(StatusCode::ACCEPTED) => { + // Some timelines are not caught up yet. + // They'll be polled below. + break; + } + Ok(StatusCode::NOT_FOUND) => { + // None of the timelines are present on the pageserver. + // This is correct if they've all been deleted, but + // let let the polling loop below cross check. + break; + } + Ok(status_code) => { + tracing::warn!( + "Unexpected status code ({status_code}) returned by wait_lsn endpoint" + ); + break; + } + Err(e) => { + tracing::info!("🕑 Can't trigger LSN wait on {node} yet, waiting ({e})",); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } + } + } + + // Poll the LSNs until they catch up loop { let latest = match self.get_lsns(tenant_shard_id, node).await { Ok(l) => l,