From 57ae9cd07f97445ac55a3f42ac44b28f5b51e184 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 30 Nov 2023 14:22:07 +0000 Subject: [PATCH] pageserver: add `flush_ms` and document `/location_config` API (#5860) - During migration of tenants, it is useful for callers to `/location_conf` to flush a tenant's layers while transitioning to AttachedStale: this optimization reduces the redundant WAL replay work that the tenant's new attached pageserver will have to do. Test coverage for this will come as part of the larger tests for live migration in #5745 #5842 - Flushing is controlled with `flush_ms` query parameter: it is the caller's job to decide how long they want to wait for a flush to complete. If flush is not complete within the time limit, the pageserver proceeds to succeed anyway: flushing is only an optimization. - Add swagger definitions for all this: the location_config API is the primary interface for driving tenant migration as described in docs/rfcs/028-pageserver-migration.md, and will eventually replace the various /attach /detach /load /ignore APIs. --------- Co-authored-by: Joonas Koivunen --- control_plane/src/pageserver.rs | 26 +++--- control_plane/src/tenant_migration.rs | 10 +-- pageserver/src/http/openapi_spec.yml | 118 ++++++++++++++++++++++++++ pageserver/src/http/routes.rs | 4 +- pageserver/src/tenant.rs | 64 ++++++++++++++ pageserver/src/tenant/mgr.rs | 48 +++++++++-- 6 files changed, 246 insertions(+), 24 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 237df48543..0a55c90e8f 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -11,6 +11,7 @@ use std::io::{BufReader, Write}; use std::num::NonZeroU64; use std::path::PathBuf; use std::process::{Child, Command}; +use std::time::Duration; use std::{io, result}; use anyhow::{bail, Context}; @@ -522,19 +523,24 @@ impl PageServerNode { &self, tenant_id: TenantId, config: LocationConfig, + flush_ms: Option, ) -> anyhow::Result<()> { let req_body = TenantLocationConfigRequest { tenant_id, config }; - self.http_request( - Method::PUT, - format!( - "{}/tenant/{}/location_config", - self.http_base_url, tenant_id - ), - )? - .json(&req_body) - .send()? - .error_from_body()?; + let path = format!( + "{}/tenant/{}/location_config", + self.http_base_url, tenant_id + ); + let path = if let Some(flush_ms) = flush_ms { + format!("{}?flush_ms={}", path, flush_ms.as_millis()) + } else { + path + }; + + self.http_request(Method::PUT, path)? + .json(&req_body) + .send()? + .error_from_body()?; Ok(()) } diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs index 42780db85c..c0c44e279f 100644 --- a/control_plane/src/tenant_migration.rs +++ b/control_plane/src/tenant_migration.rs @@ -117,7 +117,7 @@ pub fn migrate_tenant( println!("🔁 Already attached to {origin_ps_id}, freshening..."); let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?; let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None); - dest_ps.location_config(tenant_id, dest_conf)?; + dest_ps.location_config(tenant_id, dest_conf, None)?; println!("✅ Migration complete"); return Ok(()); } @@ -126,7 +126,7 @@ pub fn migrate_tenant( let stale_conf = build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None); - origin_ps.location_config(tenant_id, stale_conf)?; + origin_ps.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))?; baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?); } @@ -135,7 +135,7 @@ pub fn migrate_tenant( let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None); println!("🔁 Attaching to pageserver {}", dest_ps.conf.id); - dest_ps.location_config(tenant_id, dest_conf)?; + dest_ps.location_config(tenant_id, dest_conf, None)?; if let Some(baseline) = baseline_lsns { println!("🕑 Waiting for LSN to catch up..."); @@ -181,7 +181,7 @@ pub fn migrate_tenant( "💤 Switching to secondary mode on pageserver {}", other_ps.conf.id ); - other_ps.location_config(tenant_id, secondary_conf)?; + other_ps.location_config(tenant_id, secondary_conf, None)?; } println!( @@ -189,7 +189,7 @@ pub fn migrate_tenant( dest_ps.conf.id ); let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None); - dest_ps.location_config(tenant_id, dest_conf)?; + dest_ps.location_config(tenant_id, dest_conf, None)?; println!("✅ Migration complete"); diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 4d455243f0..2e418f4d8f 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -624,6 +624,99 @@ paths: $ref: "#/components/schemas/ServiceUnavailableError" + /v1/tenant/{tenant_id}/location_config: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + - name: flush_ms + in: query + required: false + schema: + type: integer + put: + description: | + Configures a _tenant location_, that is how a particular pageserver handles + a particular tenant. This includes _attached_ tenants, i.e. those ingesting WAL + and page service requests, and _secondary_ tenants, i.e. those which are just keeping + a warm cache in anticipation of transitioning to attached state in the future. + + This is a declarative, idempotent API: there are not separate endpoints + for different tenant location configurations. Rather, this single endpoint accepts + a description of the desired location configuration, and makes whatever changes + are required to reach that state. + + In imperative terms, this API is used to attach and detach tenants, and + to transition tenants to and from secondary mode. + + This is a synchronous API: there is no 202 response. State transitions should always + be fast (milliseconds), with the exception of requests setting `flush_ms`, in which case + the caller controls the runtime of the request. + + In some state transitions, it makes sense to flush dirty data to remote storage: this includes transitions + to AttachedStale and Detached. Flushing is never necessary for correctness, but is an + important optimization when doing migrations. The `flush_ms` parameter controls whether + flushing should be attempted, and how much time is allowed for flushing. If the time limit expires, + the requested transition will continue without waiting for any outstanding data to flush. Callers + should use a duration which is substantially less than their HTTP client's request + timeout. It is safe to supply flush_ms irrespective of the request body: in state transitions + where flushing doesn't make sense, the server will ignore it. + + It is safe to retry requests, but if one receives a 409 or 503 response, it is not + useful to retry aggressively: there is probably an existing request still ongoing. + requestBody: + required: false + content: + application/json: + schema: + $ref: "#/components/schemas/TenantLocationConfigRequest" + responses: + "200": + description: Tenant is now in requested state + "503": + description: Tenant's state cannot be changed right now. Wait a few seconds and retry. + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "409": + description: | + The tenant is already known to Pageserver in some way, + and hence this `/attach` call has been rejected. + + Some examples of how this can happen: + - tenant was created on this pageserver + - tenant attachment was started by an earlier call to `/attach`. + + Callers should poll the tenant status's `attachment_status` field, + like for status 202. See the longer description for `POST /attach` + for details. + content: + application/json: + schema: + $ref: "#/components/schemas/ConflictError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + /v1/tenant/{tenant_id}/detach: parameters: - name: tenant_id @@ -1274,6 +1367,31 @@ components: tenant_id: type: string format: hex + TenantLocationConfigRequest: + type: object + required: + - tenant_id + properties: + tenant_id: + type: string + format: hex + mode: + type: string + enum: ["AttachedSingle", "AttachedMulti", "AttachedStale", "Secondary", "Detached"] + description: Mode of functionality that this pageserver will run in for this tenant. + generation: + type: integer + description: Attachment generation number, mandatory when `mode` is an attached state + secondary_conf: + $ref: '#/components/schemas/SecondaryConfig' + tenant_conf: + $ref: '#/components/schemas/TenantConfig' + SecondaryConfig: + type: object + properties: + warm: + type: boolean + description: Whether to poll remote storage for layers to download. If false, secondary locations don't download anything. TenantConfig: type: object properties: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9cb411c95c..37159be95c 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use anyhow::{anyhow, Context, Result}; use enumset::EnumSet; @@ -1158,6 +1159,7 @@ async fn put_tenant_location_config_handler( let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let request_data: TenantLocationConfigRequest = json_request(&mut request).await?; + let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis); check_permission(&request, Some(tenant_shard_id.tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); @@ -1190,7 +1192,7 @@ async fn put_tenant_location_config_handler( state .tenant_manager - .upsert_location(tenant_shard_id, location_conf, &ctx) + .upsert_location(tenant_shard_id, location_conf, flush, &ctx) .await // TODO: badrequest assumes the caller was asking for something unreasonable, but in // principle we might have hit something like concurrent API calls to the same tenant, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2a63f193e3..08066a612d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -15,7 +15,9 @@ use anyhow::{bail, Context}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use enumset::EnumSet; +use futures::stream::FuturesUnordered; use futures::FutureExt; +use futures::StreamExt; use pageserver_api::models::TimelineState; use pageserver_api::shard::TenantShardId; use remote_storage::DownloadError; @@ -32,6 +34,7 @@ use utils::completion; use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext; use utils::sync::gate::Gate; +use utils::sync::gate::GateGuard; use self::config::AttachedLocationConfig; use self::config::AttachmentMode; @@ -51,6 +54,7 @@ use self::timeline::TimelineResources; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; +use crate::deletion_queue::DeletionQueueError; use crate::import_datadir; use crate::is_uninit_mark; use crate::metrics::TENANT_ACTIVATION; @@ -3282,6 +3286,66 @@ impl Tenant { pub fn cached_synthetic_size(&self) -> u64 { self.cached_synthetic_tenant_size.load(Ordering::Relaxed) } + + /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete. + /// + /// This function can take a long time: callers should wrap it in a timeout if calling + /// from an external API handler. + /// + /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is + /// still bounded by tenant/timeline shutdown. + #[tracing::instrument(skip_all)] + pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> { + let timelines = self.timelines.lock().unwrap().clone(); + + async fn flush_timeline(_gate: GateGuard, timeline: Arc) -> anyhow::Result<()> { + tracing::info!(timeline_id=%timeline.timeline_id, "Flushing..."); + timeline.freeze_and_flush().await?; + tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads..."); + if let Some(client) = &timeline.remote_client { + client.wait_completion().await?; + } + + Ok(()) + } + + // We do not use a JoinSet for these tasks, because we don't want them to be + // aborted when this function's future is cancelled: they should stay alive + // holding their GateGuard until they complete, to ensure their I/Os complete + // before Timeline shutdown completes. + let mut results = FuturesUnordered::new(); + + for (_timeline_id, timeline) in timelines { + // Run each timeline's flush in a task holding the timeline's gate: this + // means that if this function's future is cancelled, the Timeline shutdown + // will still wait for any I/O in here to complete. + let gate = match timeline.gate.enter() { + Ok(g) => g, + Err(_) => continue, + }; + let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await }); + results.push(jh); + } + + while let Some(r) = results.next().await { + if let Err(e) = r { + if !e.is_cancelled() && !e.is_panic() { + tracing::error!("unexpected join error: {e:?}"); + } + } + } + + // The flushes we did above were just writes, but the Tenant might have had + // pending deletions as well from recent compaction/gc: we want to flush those + // as well. This requires flushing the global delete queue. This is cheap + // because it's typically a no-op. + match self.deletion_queue_client.flush_execute().await { + Ok(_) => {} + Err(DeletionQueueError::ShuttingDown) => {} + } + + Ok(()) + } } fn remove_timeline_and_uninit_mark( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index e94d29327e..f34d62ba53 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -29,7 +29,9 @@ use crate::control_plane_client::{ use crate::deletion_queue::DeletionQueueClient; use crate::metrics::TENANT_MANAGER as METRICS; use crate::task_mgr::{self, TaskKind}; -use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; +use crate::tenant::config::{ + AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt, +}; use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::span::debug_assert_current_span_has_tenant_id; use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState}; @@ -754,6 +756,7 @@ pub(crate) async fn create_tenant( ctx: &RequestContext, ) -> Result, TenantMapInsertError> { let location_conf = LocationConf::attached_single(tenant_conf, generation); + info!("Creating tenant at location {location_conf:?}"); let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; @@ -861,6 +864,7 @@ impl TenantManager { &self, tenant_shard_id: TenantShardId, new_location_config: LocationConf, + flush: Option, ctx: &RequestContext, ) -> Result<(), anyhow::Error> { debug_assert_current_span_has_tenant_id(); @@ -869,7 +873,7 @@ impl TenantManager { // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, // then we do not need to set the slot to InProgress, we can just call into the // existng tenant. - { + let modify_tenant = { let locked = self.tenants.read().unwrap(); let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?; @@ -880,22 +884,50 @@ impl TenantManager { // take our fast path and just provide the updated configuration // to the tenant. tenant.set_new_location_config(AttachedTenantConf::try_from( - new_location_config, + new_location_config.clone(), )?); - // Persist the new config in the background, to avoid holding up any - // locks while we do so. - // TODO - - return Ok(()); + Some(tenant.clone()) } else { // Different generations, fall through to general case + None } } _ => { // Not an Attached->Attached transition, fall through to general case + None } } + }; + + // Fast-path continued: having dropped out of the self.tenants lock, do the async + // phase of waiting for flush, before returning. + if let Some(tenant) = modify_tenant { + // Transition to AttachedStale means we may well hold a valid generation + // still, and have been requested to go stale as part of a migration. If + // the caller set `flush`, then flush to remote storage. + if let LocationMode::Attached(AttachedLocationConfig { + generation: _, + attach_mode: AttachmentMode::Stale, + }) = &new_location_config.mode + { + if let Some(flush_timeout) = flush { + match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await { + Ok(Err(e)) => { + return Err(e); + } + Ok(Ok(_)) => return Ok(()), + Err(_) => { + tracing::warn!( + timeout_ms = flush_timeout.as_millis(), + "Timed out waiting for flush to remote storage, proceeding anyway." + ) + } + } + } + } + + return Ok(()); } // General case for upserts to TenantsMap, excluding the case above: we will substitute an