diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7d39badfdc..1ff863eaf6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use anyhow::{anyhow, Context, Result}; use futures::TryFutureExt; @@ -1118,6 +1119,9 @@ async fn put_tenant_location_config_handler( _cancel: CancellationToken, ) -> Result, ApiError> { let request_data: TenantLocationConfigRequest = json_request(&mut request).await?; + + let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis); + let tenant_id = request_data.tenant_id; check_permission(&request, Some(tenant_id))?; @@ -1147,7 +1151,7 @@ async fn put_tenant_location_config_handler( state .tenant_manager - .upsert_location(tenant_id, location_conf, &ctx) + .upsert_location(tenant_id, location_conf, flush, &ctx) .await // TODO: badrequest assumes the caller was asking for something unreasonable, but in // principle we might have hit something like concurrent API calls to the same tenant, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0c23f2b1da..73eee590a6 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -63,6 +63,7 @@ use self::timeline::TimelineResources; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; +use crate::deletion_queue::DeletionQueueError; use crate::import_datadir; use crate::is_uninit_mark; use crate::metrics::TENANT_ACTIVATION; @@ -3378,6 +3379,30 @@ impl Tenant { pub fn cached_synthetic_size(&self) -> u64 { self.cached_synthetic_tenant_size.load(Ordering::Relaxed) } + + /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete. + /// + /// This function can take a long time: callers should wrap it in a timeout if calling + /// from an external API handler. + pub async fn flush_remote(&self) -> anyhow::Result<()> { + let timelines = self.timelines.lock().unwrap().clone(); + + for (timeline_id, timeline) in timelines { + tracing::info!(%timeline_id, "Flushing..."); + timeline.freeze_and_flush().await?; + tracing::info!(%timeline_id, "Waiting for uploads..."); + if let Some(client) = &timeline.remote_client { + client.wait_completion().await?; + } + } + + match self.deletion_queue_client.flush_execute().await { + Ok(_) => {} + Err(DeletionQueueError::ShuttingDown) => {} + } + + Ok(()) + } } fn remove_timeline_and_uninit_mark( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a137e38d14..379e526e7f 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -7,7 +7,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; -use std::time::Instant; +use std::time::Duration; use tokio::fs; use anyhow::Context; @@ -26,7 +26,9 @@ use crate::control_plane_client::{ }; use crate::deletion_queue::DeletionQueueClient; use crate::task_mgr::{self, TaskKind}; -use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; +use crate::tenant::config::{ + AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt, +}; use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::{ create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant, @@ -705,6 +707,7 @@ pub(crate) async fn create_tenant( let tenant_guard = tenant_map_acquire_slot(&tenant_id, Some(false))?; let location_conf = LocationConf::attached_single(tenant_conf, generation); + info!("Creating tenant at location {location_conf:?}"); // We're holding the tenants lock in write mode while doing local IO. // If this section ever becomes contentious, introduce a new `TenantState::Creating` @@ -776,6 +779,7 @@ impl TenantManager { &self, tenant_id: TenantId, new_location_config: LocationConf, + flush: Option, ctx: &RequestContext, ) -> Result<(), anyhow::Error> { info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); @@ -783,7 +787,7 @@ impl TenantManager { // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, // then we do not need to set the slot to InProgress, we can just call into the // existng tenant. - { + let modify_tenant = { let locked = self.tenants.read().unwrap(); let peek_slot = tenant_map_peek_slot(&locked, &tenant_id)?; match (&new_location_config.mode, peek_slot) { @@ -793,22 +797,58 @@ impl TenantManager { // take our fast path and just provide the updated configuration // to the tenant. tenant.set_new_location_config(AttachedTenantConf::try_from( - new_location_config, + new_location_config.clone(), )?); - // Persist the new config in the background, to avoid holding up any - // locks while we do so. - // TODO - - return Ok(()); + Some(tenant.clone()) } else { // Different generations, fall through to general case + None } } _ => { // Not an Attached->Attached transition, fall through to general case + None } } + }; + + // Fast-path continued: having dropped out of the self.tenants lock, do the async + // phase of waiting for flush, before returning. + if let Some(tenant) = modify_tenant { + // Transition to AttachedStale means we may well hold a valid generation + // still, and have been requested to go stale as part of a migration. If + // the caller set `flush`, then flush to remote storage. + if let LocationMode::Attached(AttachedLocationConfig { + generation: _, + attach_mode: AttachmentMode::Stale, + }) = &new_location_config.mode + { + if let Some(flush_timeout) = flush { + info!(timeout_ms = flush_timeout.as_millis(), "Flushing"); + match tokio::time::timeout( + flush_timeout, + tenant.flush_remote().instrument(info_span!("flush_remote")), + ) + .await + { + Ok(r) => { + if let Err(e) = r { + tracing::error!("Failed to flush to remote storage: {e}"); + return Err(e); + } + } + Err(_) => { + tracing::warn!( + timeout_ms = flush_timeout.as_millis(), + "Timed out waiting for flush to remote storage, proceeding anyway." + ) + } + } + } + } + + return Ok(()); } // General case for upserts to TenantsMap, excluding the case above: we will substitute an