diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 237df48543..0a55c90e8f 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -11,6 +11,7 @@ use std::io::{BufReader, Write}; use std::num::NonZeroU64; use std::path::PathBuf; use std::process::{Child, Command}; +use std::time::Duration; use std::{io, result}; use anyhow::{bail, Context}; @@ -522,19 +523,24 @@ impl PageServerNode { &self, tenant_id: TenantId, config: LocationConfig, + flush_ms: Option, ) -> anyhow::Result<()> { let req_body = TenantLocationConfigRequest { tenant_id, config }; - self.http_request( - Method::PUT, - format!( - "{}/tenant/{}/location_config", - self.http_base_url, tenant_id - ), - )? - .json(&req_body) - .send()? - .error_from_body()?; + let path = format!( + "{}/tenant/{}/location_config", + self.http_base_url, tenant_id + ); + let path = if let Some(flush_ms) = flush_ms { + format!("{}?flush_ms={}", path, flush_ms.as_millis()) + } else { + path + }; + + self.http_request(Method::PUT, path)? + .json(&req_body) + .send()? + .error_from_body()?; Ok(()) } diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs index 42780db85c..c0c44e279f 100644 --- a/control_plane/src/tenant_migration.rs +++ b/control_plane/src/tenant_migration.rs @@ -117,7 +117,7 @@ pub fn migrate_tenant( println!("🔁 Already attached to {origin_ps_id}, freshening..."); let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None); - dest_ps.location_config(tenant_id, dest_conf)?; + dest_ps.location_config(tenant_id, dest_conf, None)?; println!("✅ Migration complete"); return Ok(()); } @@ -126,7 +126,7 @@ pub fn migrate_tenant( let stale_conf = build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None); - origin_ps.location_config(tenant_id, stale_conf)?; + origin_ps.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))?; baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?); } @@ -135,7 +135,7 @@ pub fn migrate_tenant( let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None); println!("🔁 Attaching to pageserver {}", dest_ps.conf.id); - dest_ps.location_config(tenant_id, dest_conf)?; + dest_ps.location_config(tenant_id, dest_conf, None)?; if let Some(baseline) = baseline_lsns { println!("🕑 Waiting for LSN to catch up..."); @@ -181,7 +181,7 @@ pub fn migrate_tenant( "💤 Switching to secondary mode on pageserver {}", other_ps.conf.id ); - other_ps.location_config(tenant_id, secondary_conf)?; + other_ps.location_config(tenant_id, secondary_conf, None)?; } println!( @@ -189,7 +189,7 @@ pub fn migrate_tenant( dest_ps.conf.id ); let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None); - dest_ps.location_config(tenant_id, dest_conf)?; + dest_ps.location_config(tenant_id, dest_conf, None)?; println!("✅ Migration complete"); diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 4d455243f0..2e418f4d8f 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -624,6 +624,99 @@ paths: $ref: "#/components/schemas/ServiceUnavailableError" + /v1/tenant/{tenant_id}/location_config: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + - name: flush_ms + in: query + required: false + schema: + type: integer + put: + description: | + Configures a _tenant location_, that is how a particular pageserver handles + a particular tenant. This includes _attached_ tenants, i.e. those ingesting WAL + and page service requests, and _secondary_ tenants, i.e. those which are just keeping + a warm cache in anticipation of transitioning to attached state in the future. + + This is a declarative, idempotent API: there are not separate endpoints + for different tenant location configurations. Rather, this single endpoint accepts + a description of the desired location configuration, and makes whatever changes + are required to reach that state. + + In imperative terms, this API is used to attach and detach tenants, and + to transition tenants to and from secondary mode. + + This is a synchronous API: there is no 202 response. State transitions should always + be fast (milliseconds), with the exception of requests setting `flush_ms`, in which case + the caller controls the runtime of the request. + + In some state transitions, it makes sense to flush dirty data to remote storage: this includes transitions + to AttachedStale and Detached. Flushing is never necessary for correctness, but is an + important optimization when doing migrations. The `flush_ms` parameter controls whether + flushing should be attempted, and how much time is allowed for flushing. If the time limit expires, + the requested transition will continue without waiting for any outstanding data to flush. Callers + should use a duration which is substantially less than their HTTP client's request + timeout. It is safe to supply flush_ms irrespective of the request body: in state transitions + where flushing doesn't make sense, the server will ignore it. + + It is safe to retry requests, but if one receives a 409 or 503 response, it is not + useful to retry aggressively: there is probably an existing request still ongoing. + requestBody: + required: false + content: + application/json: + schema: + $ref: "#/components/schemas/TenantLocationConfigRequest" + responses: + "200": + description: Tenant is now in requested state + "503": + description: Tenant's state cannot be changed right now. Wait a few seconds and retry. + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "409": + description: | + The tenant is already known to Pageserver in some way, + and hence this `/attach` call has been rejected. + + Some examples of how this can happen: + - tenant was created on this pageserver + - tenant attachment was started by an earlier call to `/attach`. + + Callers should poll the tenant status's `attachment_status` field, + like for status 202. See the longer description for `POST /attach` + for details. + content: + application/json: + schema: + $ref: "#/components/schemas/ConflictError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + /v1/tenant/{tenant_id}/detach: parameters: - name: tenant_id @@ -1274,6 +1367,31 @@ components: tenant_id: type: string format: hex + TenantLocationConfigRequest: + type: object + required: + - tenant_id + properties: + tenant_id: + type: string + format: hex + mode: + type: string + enum: ["AttachedSingle", "AttachedMulti", "AttachedStale", "Secondary", "Detached"] + description: Mode of functionality that this pageserver will run in for this tenant. + generation: + type: integer + description: Attachment generation number, mandatory when `mode` is an attached state + secondary_conf: + $ref: '#/components/schemas/SecondaryConfig' + tenant_conf: + $ref: '#/components/schemas/TenantConfig' + SecondaryConfig: + type: object + properties: + warm: + type: boolean + description: Whether to poll remote storage for layers to download. If false, secondary locations don't download anything. TenantConfig: type: object properties: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9cb411c95c..37159be95c 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use anyhow::{anyhow, Context, Result}; use enumset::EnumSet; @@ -1158,6 +1159,7 @@ async fn put_tenant_location_config_handler( let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let request_data: TenantLocationConfigRequest = json_request(&mut request).await?; + let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis); check_permission(&request, Some(tenant_shard_id.tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); @@ -1190,7 +1192,7 @@ async fn put_tenant_location_config_handler( state .tenant_manager - .upsert_location(tenant_shard_id, location_conf, &ctx) + .upsert_location(tenant_shard_id, location_conf, flush, &ctx) .await // TODO: badrequest assumes the caller was asking for something unreasonable, but in // principle we might have hit something like concurrent API calls to the same tenant, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2a63f193e3..08066a612d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -15,7 +15,9 @@ use anyhow::{bail, Context}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use enumset::EnumSet; +use futures::stream::FuturesUnordered; use futures::FutureExt; +use futures::StreamExt; use pageserver_api::models::TimelineState; use pageserver_api::shard::TenantShardId; use remote_storage::DownloadError; @@ -32,6 +34,7 @@ use utils::completion; use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext; use utils::sync::gate::Gate; +use utils::sync::gate::GateGuard; use self::config::AttachedLocationConfig; use self::config::AttachmentMode; @@ -51,6 +54,7 @@ use self::timeline::TimelineResources; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; +use crate::deletion_queue::DeletionQueueError; use crate::import_datadir; use crate::is_uninit_mark; use crate::metrics::TENANT_ACTIVATION; @@ -3282,6 +3286,66 @@ impl Tenant { pub fn cached_synthetic_size(&self) -> u64 { self.cached_synthetic_tenant_size.load(Ordering::Relaxed) } + + /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete. + /// + /// This function can take a long time: callers should wrap it in a timeout if calling + /// from an external API handler. + /// + /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is + /// still bounded by tenant/timeline shutdown. + #[tracing::instrument(skip_all)] + pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> { + let timelines = self.timelines.lock().unwrap().clone(); + + async fn flush_timeline(_gate: GateGuard, timeline: Arc) -> anyhow::Result<()> { + tracing::info!(timeline_id=%timeline.timeline_id, "Flushing..."); + timeline.freeze_and_flush().await?; + tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads..."); + if let Some(client) = &timeline.remote_client { + client.wait_completion().await?; + } + + Ok(()) + } + + // We do not use a JoinSet for these tasks, because we don't want them to be + // aborted when this function's future is cancelled: they should stay alive + // holding their GateGuard until they complete, to ensure their I/Os complete + // before Timeline shutdown completes. + let mut results = FuturesUnordered::new(); + + for (_timeline_id, timeline) in timelines { + // Run each timeline's flush in a task holding the timeline's gate: this + // means that if this function's future is cancelled, the Timeline shutdown + // will still wait for any I/O in here to complete. + let gate = match timeline.gate.enter() { + Ok(g) => g, + Err(_) => continue, + }; + let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await }); + results.push(jh); + } + + while let Some(r) = results.next().await { + if let Err(e) = r { + if !e.is_cancelled() && !e.is_panic() { + tracing::error!("unexpected join error: {e:?}"); + } + } + } + + // The flushes we did above were just writes, but the Tenant might have had + // pending deletions as well from recent compaction/gc: we want to flush those + // as well. This requires flushing the global delete queue. This is cheap + // because it's typically a no-op. + match self.deletion_queue_client.flush_execute().await { + Ok(_) => {} + Err(DeletionQueueError::ShuttingDown) => {} + } + + Ok(()) + } } fn remove_timeline_and_uninit_mark( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index e94d29327e..f34d62ba53 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -29,7 +29,9 @@ use crate::control_plane_client::{ use crate::deletion_queue::DeletionQueueClient; use crate::metrics::TENANT_MANAGER as METRICS; use crate::task_mgr::{self, TaskKind}; -use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; +use crate::tenant::config::{ + AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt, +}; use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::span::debug_assert_current_span_has_tenant_id; use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState}; @@ -754,6 +756,7 @@ pub(crate) async fn create_tenant( ctx: &RequestContext, ) -> Result, TenantMapInsertError> { let location_conf = LocationConf::attached_single(tenant_conf, generation); + info!("Creating tenant at location {location_conf:?}"); let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; @@ -861,6 +864,7 @@ impl TenantManager { &self, tenant_shard_id: TenantShardId, new_location_config: LocationConf, + flush: Option, ctx: &RequestContext, ) -> Result<(), anyhow::Error> { debug_assert_current_span_has_tenant_id(); @@ -869,7 +873,7 @@ impl TenantManager { // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, // then we do not need to set the slot to InProgress, we can just call into the // existng tenant. - { + let modify_tenant = { let locked = self.tenants.read().unwrap(); let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?; @@ -880,22 +884,50 @@ impl TenantManager { // take our fast path and just provide the updated configuration // to the tenant. tenant.set_new_location_config(AttachedTenantConf::try_from( - new_location_config, + new_location_config.clone(), )?); - // Persist the new config in the background, to avoid holding up any - // locks while we do so. - // TODO - - return Ok(()); + Some(tenant.clone()) } else { // Different generations, fall through to general case + None } } _ => { // Not an Attached->Attached transition, fall through to general case + None } } + }; + + // Fast-path continued: having dropped out of the self.tenants lock, do the async + // phase of waiting for flush, before returning. + if let Some(tenant) = modify_tenant { + // Transition to AttachedStale means we may well hold a valid generation + // still, and have been requested to go stale as part of a migration. If + // the caller set `flush`, then flush to remote storage. + if let LocationMode::Attached(AttachedLocationConfig { + generation: _, + attach_mode: AttachmentMode::Stale, + }) = &new_location_config.mode + { + if let Some(flush_timeout) = flush { + match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await { + Ok(Err(e)) => { + return Err(e); + } + Ok(Ok(_)) => return Ok(()), + Err(_) => { + tracing::warn!( + timeout_ms = flush_timeout.as_millis(), + "Timed out waiting for flush to remote storage, proceeding anyway." + ) + } + } + } + } + + return Ok(()); } // General case for upserts to TenantsMap, excluding the case above: we will substitute an