pageserver: add flush_ms and document /location_config API (#5860)

- During migration of tenants, it is useful for callers to
`/location_conf` to flush a tenant's layers while transitioning to
AttachedStale: this optimization reduces the redundant WAL replay work
that the tenant's new attached pageserver will have to do. Test coverage
for this will come as part of the larger tests for live migration in
#5745 #5842
- Flushing is controlled with `flush_ms` query parameter: it is the
caller's job to decide how long they want to wait for a flush to
complete. If flush is not complete within the time limit, the pageserver
proceeds to succeed anyway: flushing is only an optimization.
- Add swagger definitions for all this: the location_config API is the
primary interface for driving tenant migration as described in
docs/rfcs/028-pageserver-migration.md, and will eventually replace the
various /attach /detach /load /ignore APIs.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
John Spray
2023-11-30 14:22:07 +00:00
committed by GitHub
parent 3bb1030f5d
commit 57ae9cd07f
6 changed files with 246 additions and 24 deletions

View File

@@ -11,6 +11,7 @@ use std::io::{BufReader, Write};
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::time::Duration;
use std::{io, result};
use anyhow::{bail, Context};
@@ -522,19 +523,24 @@ impl PageServerNode {
&self,
tenant_id: TenantId,
config: LocationConfig,
flush_ms: Option<Duration>,
) -> anyhow::Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
self.http_request(
Method::PUT,
format!(
"{}/tenant/{}/location_config",
self.http_base_url, tenant_id
),
)?
.json(&req_body)
.send()?
.error_from_body()?;
let path = format!(
"{}/tenant/{}/location_config",
self.http_base_url, tenant_id
);
let path = if let Some(flush_ms) = flush_ms {
format!("{}?flush_ms={}", path, flush_ms.as_millis())
} else {
path
};
self.http_request(Method::PUT, path)?
.json(&req_body)
.send()?
.error_from_body()?;
Ok(())
}

View File

@@ -117,7 +117,7 @@ pub fn migrate_tenant(
println!("🔁 Already attached to {origin_ps_id}, freshening...");
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf)?;
dest_ps.location_config(tenant_id, dest_conf, None)?;
println!("✅ Migration complete");
return Ok(());
}
@@ -126,7 +126,7 @@ pub fn migrate_tenant(
let stale_conf =
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
origin_ps.location_config(tenant_id, stale_conf)?;
origin_ps.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))?;
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
}
@@ -135,7 +135,7 @@ pub fn migrate_tenant(
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
dest_ps.location_config(tenant_id, dest_conf)?;
dest_ps.location_config(tenant_id, dest_conf, None)?;
if let Some(baseline) = baseline_lsns {
println!("🕑 Waiting for LSN to catch up...");
@@ -181,7 +181,7 @@ pub fn migrate_tenant(
"💤 Switching to secondary mode on pageserver {}",
other_ps.conf.id
);
other_ps.location_config(tenant_id, secondary_conf)?;
other_ps.location_config(tenant_id, secondary_conf, None)?;
}
println!(
@@ -189,7 +189,7 @@ pub fn migrate_tenant(
dest_ps.conf.id
);
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf)?;
dest_ps.location_config(tenant_id, dest_conf, None)?;
println!("✅ Migration complete");

View File

@@ -624,6 +624,99 @@ paths:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/location_config:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: flush_ms
in: query
required: false
schema:
type: integer
put:
description: |
Configures a _tenant location_, that is how a particular pageserver handles
a particular tenant. This includes _attached_ tenants, i.e. those ingesting WAL
and page service requests, and _secondary_ tenants, i.e. those which are just keeping
a warm cache in anticipation of transitioning to attached state in the future.
This is a declarative, idempotent API: there are not separate endpoints
for different tenant location configurations. Rather, this single endpoint accepts
a description of the desired location configuration, and makes whatever changes
are required to reach that state.
In imperative terms, this API is used to attach and detach tenants, and
to transition tenants to and from secondary mode.
This is a synchronous API: there is no 202 response. State transitions should always
be fast (milliseconds), with the exception of requests setting `flush_ms`, in which case
the caller controls the runtime of the request.
In some state transitions, it makes sense to flush dirty data to remote storage: this includes transitions
to AttachedStale and Detached. Flushing is never necessary for correctness, but is an
important optimization when doing migrations. The `flush_ms` parameter controls whether
flushing should be attempted, and how much time is allowed for flushing. If the time limit expires,
the requested transition will continue without waiting for any outstanding data to flush. Callers
should use a duration which is substantially less than their HTTP client's request
timeout. It is safe to supply flush_ms irrespective of the request body: in state transitions
where flushing doesn't make sense, the server will ignore it.
It is safe to retry requests, but if one receives a 409 or 503 response, it is not
useful to retry aggressively: there is probably an existing request still ongoing.
requestBody:
required: false
content:
application/json:
schema:
$ref: "#/components/schemas/TenantLocationConfigRequest"
responses:
"200":
description: Tenant is now in requested state
"503":
description: Tenant's state cannot be changed right now. Wait a few seconds and retry.
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"409":
description: |
The tenant is already known to Pageserver in some way,
and hence this `/attach` call has been rejected.
Some examples of how this can happen:
- tenant was created on this pageserver
- tenant attachment was started by an earlier call to `/attach`.
Callers should poll the tenant status's `attachment_status` field,
like for status 202. See the longer description for `POST /attach`
for details.
content:
application/json:
schema:
$ref: "#/components/schemas/ConflictError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/detach:
parameters:
- name: tenant_id
@@ -1274,6 +1367,31 @@ components:
tenant_id:
type: string
format: hex
TenantLocationConfigRequest:
type: object
required:
- tenant_id
properties:
tenant_id:
type: string
format: hex
mode:
type: string
enum: ["AttachedSingle", "AttachedMulti", "AttachedStale", "Secondary", "Detached"]
description: Mode of functionality that this pageserver will run in for this tenant.
generation:
type: integer
description: Attachment generation number, mandatory when `mode` is an attached state
secondary_conf:
$ref: '#/components/schemas/SecondaryConfig'
tenant_conf:
$ref: '#/components/schemas/TenantConfig'
SecondaryConfig:
type: object
properties:
warm:
type: boolean
description: Whether to poll remote storage for layers to download. If false, secondary locations don't download anything.
TenantConfig:
type: object
properties:

View File

@@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
@@ -1158,6 +1159,7 @@ async fn put_tenant_location_config_handler(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
@@ -1190,7 +1192,7 @@ async fn put_tenant_location_config_handler(
state
.tenant_manager
.upsert_location(tenant_shard_id, location_conf, &ctx)
.upsert_location(tenant_shard_id, location_conf, flush, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,

View File

@@ -15,7 +15,9 @@ use anyhow::{bail, Context};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models::TimelineState;
use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
@@ -32,6 +34,7 @@ use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext;
use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
@@ -51,6 +54,7 @@ use self::timeline::TimelineResources;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::metrics::TENANT_ACTIVATION;
@@ -3282,6 +3286,66 @@ impl Tenant {
pub fn cached_synthetic_size(&self) -> u64 {
self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
}
/// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
///
/// This function can take a long time: callers should wrap it in a timeout if calling
/// from an external API handler.
///
/// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
/// still bounded by tenant/timeline shutdown.
#[tracing::instrument(skip_all)]
pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
timeline.freeze_and_flush().await?;
tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
if let Some(client) = &timeline.remote_client {
client.wait_completion().await?;
}
Ok(())
}
// We do not use a JoinSet for these tasks, because we don't want them to be
// aborted when this function's future is cancelled: they should stay alive
// holding their GateGuard until they complete, to ensure their I/Os complete
// before Timeline shutdown completes.
let mut results = FuturesUnordered::new();
for (_timeline_id, timeline) in timelines {
// Run each timeline's flush in a task holding the timeline's gate: this
// means that if this function's future is cancelled, the Timeline shutdown
// will still wait for any I/O in here to complete.
let gate = match timeline.gate.enter() {
Ok(g) => g,
Err(_) => continue,
};
let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
results.push(jh);
}
while let Some(r) = results.next().await {
if let Err(e) = r {
if !e.is_cancelled() && !e.is_panic() {
tracing::error!("unexpected join error: {e:?}");
}
}
}
// The flushes we did above were just writes, but the Tenant might have had
// pending deletions as well from recent compaction/gc: we want to flush those
// as well. This requires flushing the global delete queue. This is cheap
// because it's typically a no-op.
match self.deletion_queue_client.flush_execute().await {
Ok(_) => {}
Err(DeletionQueueError::ShuttingDown) => {}
}
Ok(())
}
}
fn remove_timeline_and_uninit_mark(

View File

@@ -29,7 +29,9 @@ use crate::control_plane_client::{
use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::TENANT_MANAGER as METRICS;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::{
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt,
};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
@@ -754,6 +756,7 @@ pub(crate) async fn create_tenant(
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
info!("Creating tenant at location {location_conf:?}");
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
@@ -861,6 +864,7 @@ impl TenantManager {
&self,
tenant_shard_id: TenantShardId,
new_location_config: LocationConf,
flush: Option<Duration>,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
debug_assert_current_span_has_tenant_id();
@@ -869,7 +873,7 @@ impl TenantManager {
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
// then we do not need to set the slot to InProgress, we can just call into the
// existng tenant.
{
let modify_tenant = {
let locked = self.tenants.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
@@ -880,22 +884,50 @@ impl TenantManager {
// take our fast path and just provide the updated configuration
// to the tenant.
tenant.set_new_location_config(AttachedTenantConf::try_from(
new_location_config,
new_location_config.clone(),
)?);
// Persist the new config in the background, to avoid holding up any
// locks while we do so.
// TODO
return Ok(());
Some(tenant.clone())
} else {
// Different generations, fall through to general case
None
}
}
_ => {
// Not an Attached->Attached transition, fall through to general case
None
}
}
};
// Fast-path continued: having dropped out of the self.tenants lock, do the async
// phase of waiting for flush, before returning.
if let Some(tenant) = modify_tenant {
// Transition to AttachedStale means we may well hold a valid generation
// still, and have been requested to go stale as part of a migration. If
// the caller set `flush`, then flush to remote storage.
if let LocationMode::Attached(AttachedLocationConfig {
generation: _,
attach_mode: AttachmentMode::Stale,
}) = &new_location_config.mode
{
if let Some(flush_timeout) = flush {
match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
Ok(Err(e)) => {
return Err(e);
}
Ok(Ok(_)) => return Ok(()),
Err(_) => {
tracing::warn!(
timeout_ms = flush_timeout.as_millis(),
"Timed out waiting for flush to remote storage, proceeding anyway."
)
}
}
}
}
return Ok(());
}
// General case for upserts to TenantsMap, excluding the case above: we will substitute an