From 4d426f6fbe596a12c19b86bbf43313e3452ac73b Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 29 Feb 2024 13:26:29 +0200 Subject: [PATCH] feat: support lazy, queued tenant attaches (#6907) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add off-by-default support for lazy queued tenant activation on attach. This should be useful on bulk migrations as some tenants will be activated faster due to operations or endpoint startup. Eventually all tenants will get activated by reusing the same mechanism we have at startup (`PageserverConf::concurrent_tenant_warmup`). The difference to lazy attached tenants to startup ones is that we leave their initial logical size calculation be triggered by WalReceiver or consumption metrics. Fixes: #6315 Co-authored-by: Arpad Müller --- pageserver/src/config.rs | 6 +- pageserver/src/http/openapi_spec.yml | 6 + pageserver/src/http/routes.rs | 25 ++- pageserver/src/tenant.rs | 68 ++++---- pageserver/src/tenant/delete.rs | 2 +- pageserver/src/tenant/mgr.rs | 12 +- test_runner/fixtures/pageserver/http.py | 9 +- test_runner/regress/test_timeline_size.py | 200 ++++++++++++++++++++-- 8 files changed, 255 insertions(+), 73 deletions(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index d18b8d6885..0a7172bde2 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -212,9 +212,9 @@ pub struct PageServerConf { pub log_format: LogFormat, - /// Number of tenants which will be concurrently loaded from remote storage proactively on startup, - /// does not limit tenants loaded in response to client I/O. A lower value implicitly deprioritizes - /// loading such tenants, vs. other work in the system. + /// Number of tenants which will be concurrently loaded from remote storage proactively on startup or attach. + /// + /// A lower value implicitly deprioritizes loading such tenants, vs. other work in the system. pub concurrent_tenant_warmup: ConfigurableSemaphore, /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed. diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 5afb3ba63d..19b5fb7e79 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -579,6 +579,12 @@ paths: required: false schema: type: integer + - name: lazy + in: query + required: false + schema: + type: boolean + description: Set to true for attaches to queue up until activated by compute. Eager (false) is the default. put: description: | Configures a _tenant location_, that is how a particular pageserver handles diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 12bd21fd7b..9d92fbaee0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -816,13 +816,7 @@ async fn tenant_attach_handler( let tenant = state .tenant_manager - .upsert_location( - tenant_shard_id, - location_conf, - None, - SpawnMode::Normal, - &ctx, - ) + .upsert_location(tenant_shard_id, location_conf, None, SpawnMode::Eager, &ctx) .await?; let Some(tenant) = tenant else { @@ -1418,6 +1412,7 @@ async fn put_tenant_location_config_handler( let request_data: TenantLocationConfigRequest = json_request(&mut request).await?; let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis); + let lazy = parse_query_param(&request, "lazy")?.unwrap_or(false); check_permission(&request, Some(tenant_shard_id.tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); @@ -1448,15 +1443,17 @@ async fn put_tenant_location_config_handler( let location_conf = LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?; + // lazy==true queues up for activation or jumps the queue like normal when a compute connects, + // similar to at startup ordering. + let spawn_mode = if lazy { + tenant::SpawnMode::Lazy + } else { + tenant::SpawnMode::Eager + }; + let attached = state .tenant_manager - .upsert_location( - tenant_shard_id, - location_conf, - flush, - tenant::SpawnMode::Normal, - &ctx, - ) + .upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx) .await? .is_some(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 6a63a2adeb..f027e9d4b1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -227,7 +227,11 @@ pub(crate) struct TenantPreload { /// When we spawn a tenant, there is a special mode for tenant creation that /// avoids trying to read anything from remote storage. pub(crate) enum SpawnMode { - Normal, + /// Activate as soon as possible + Eager, + /// Lazy activation in the background, with the option to skip the queue if the need comes up + Lazy, + /// Tenant has been created during the lifetime of this process Create, } @@ -700,41 +704,37 @@ impl Tenant { .and_then(|x| x.initial_tenant_load_remote.take()); enum AttachType<'a> { - // During pageserver startup, we are attaching this tenant lazily in the background - Warmup(tokio::sync::SemaphorePermit<'a>), - // During pageserver startup, we are attaching this tenant as soon as we can, - // because a client tried to access it. + /// We are attaching this tenant lazily in the background. + Warmup { + _permit: tokio::sync::SemaphorePermit<'a>, + during_startup: bool + }, + /// We are attaching this tenant as soon as we can, because for example an + /// endpoint tried to access it. OnDemand, - // During normal operations after startup, we are attaching a tenant. + /// During normal operations after startup, we are attaching a tenant, and + /// eager attach was requested. Normal, } - // Before doing any I/O, wait for either or: - // - A client to attempt to access to this tenant (on-demand loading) - // - A permit to become available in the warmup semaphore (background warmup) - // - // Some-ness of init_order is how we know if we're attaching during startup or later - // in process lifetime. - let attach_type = if init_order.is_some() { + let attach_type = if matches!(mode, SpawnMode::Lazy) { + // Before doing any I/O, wait for at least one of: + // - A client attempting to access to this tenant (on-demand loading) + // - A permit becoming available in the warmup semaphore (background warmup) + tokio::select!( - _ = tenant_clone.activate_now_sem.acquire() => { + permit = tenant_clone.activate_now_sem.acquire() => { + let _ = permit.expect("activate_now_sem is never closed"); tracing::info!("Activating tenant (on-demand)"); AttachType::OnDemand }, - permit_result = conf.concurrent_tenant_warmup.inner().acquire() => { - match permit_result { - Ok(p) => { - tracing::info!("Activating tenant (warmup)"); - AttachType::Warmup(p) - } - Err(_) => { - // This is unexpected: the warmup semaphore should stay alive - // for the lifetime of init_order. Log a warning and proceed. - tracing::warn!("warmup_limit semaphore unexpectedly closed"); - AttachType::Normal - } + permit = conf.concurrent_tenant_warmup.inner().acquire() => { + let _permit = permit.expect("concurrent_tenant_warmup semaphore is never closed"); + tracing::info!("Activating tenant (warmup)"); + AttachType::Warmup { + _permit, + during_startup: init_order.is_some() } - } _ = tenant_clone.cancel.cancelled() => { // This is safe, but should be pretty rare: it is interesting if a tenant @@ -749,6 +749,8 @@ impl Tenant { }, ) } else { + // SpawnMode::{Create,Eager} always cause jumping ahead of the + // concurrent_tenant_warmup queue AttachType::Normal }; @@ -756,7 +758,7 @@ impl Tenant { (SpawnMode::Create, _) => { None }, - (SpawnMode::Normal, Some(remote_storage)) => { + (SpawnMode::Eager | SpawnMode::Lazy, Some(remote_storage)) => { let _preload_timer = TENANT.preload.start_timer(); let res = tenant_clone .preload(remote_storage, task_mgr::shutdown_token()) @@ -769,7 +771,7 @@ impl Tenant { } } } - (SpawnMode::Normal, None) => { + (_, None) => { let _preload_timer = TENANT.preload.start_timer(); None } @@ -828,7 +830,7 @@ impl Tenant { let attached = { let _attach_timer = match mode { SpawnMode::Create => None, - SpawnMode::Normal => {Some(TENANT.attach.start_timer())} + SpawnMode::Eager | SpawnMode::Lazy => Some(TENANT.attach.start_timer()), }; tenant_clone.attach(preload, mode, &ctx).await }; @@ -850,7 +852,7 @@ impl Tenant { // It also prevents the warmup proccess competing with the concurrency limit on // logical size calculations: if logical size calculation semaphore is saturated, // then warmup will wait for that before proceeding to the next tenant. - if let AttachType::Warmup(_permit) = attach_type { + if matches!(attach_type, AttachType::Warmup { during_startup: true, .. }) { let mut futs: FuturesUnordered<_> = tenant_clone.timelines.lock().unwrap().values().cloned().map(|t| t.await_initial_logical_size()).collect(); tracing::info!("Waiting for initial logical sizes while warming up..."); while futs.next().await.is_some() {} @@ -923,7 +925,7 @@ impl Tenant { deleting: false, timelines: HashMap::new(), }, - (None, SpawnMode::Normal) => { + (None, _) => { anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624"); } }; @@ -3769,7 +3771,7 @@ pub(crate) mod harness { let preload = tenant .preload(&self.remote_storage, CancellationToken::new()) .await?; - tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?; + tenant.attach(Some(preload), SpawnMode::Eager, ctx).await?; tenant.state.send_replace(TenantState::Active); for timeline in tenant.timelines.lock().unwrap().values() { diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 3d138da7af..ffb7206b1e 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -420,7 +420,7 @@ impl DeleteTenantFlow { .expect("cant be stopping or broken"); tenant - .attach(preload, super::SpawnMode::Normal, ctx) + .attach(preload, super::SpawnMode::Eager, ctx) .await .context("attach")?; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 8f0f73d4b5..805d44f93d 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -595,7 +595,7 @@ pub async fn init_tenant_mgr( shard_identity, Some(init_order.clone()), &TENANTS, - SpawnMode::Normal, + SpawnMode::Lazy, &ctx, ) { Ok(tenant) => { @@ -1106,9 +1106,9 @@ impl TenantManager { // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then // the caller thinks they're creating but the tenant already existed. We must switch to - // Normal mode so that when starting this Tenant we properly probe remote storage for timelines, + // Eager mode so that when starting this Tenant we properly probe remote storage for timelines, // rather than assuming it to be empty. - spawn_mode = SpawnMode::Normal; + spawn_mode = SpawnMode::Eager; } Some(TenantSlot::Secondary(state)) => { info!("Shutting down secondary tenant"); @@ -1300,7 +1300,7 @@ impl TenantManager { shard_identity, None, self.tenants, - SpawnMode::Normal, + SpawnMode::Eager, ctx, )?; @@ -1521,7 +1521,7 @@ impl TenantManager { *child_shard, child_location_conf, None, - SpawnMode::Normal, + SpawnMode::Eager, ctx, ) .await?; @@ -2064,7 +2064,7 @@ pub(crate) async fn load_tenant( shard_identity, None, &TENANTS, - SpawnMode::Normal, + SpawnMode::Eager, ctx, ) .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?; diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index ad3efb5837..b8e20c451f 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -286,7 +286,11 @@ class PageserverHttpClient(requests.Session, MetricsGetter): self.verbose_error(res) def tenant_location_conf( - self, tenant_id: Union[TenantId, TenantShardId], location_conf=dict[str, Any], flush_ms=None + self, + tenant_id: Union[TenantId, TenantShardId], + location_conf=dict[str, Any], + flush_ms=None, + lazy: Optional[bool] = None, ): body = location_conf.copy() body["tenant_id"] = str(tenant_id) @@ -295,6 +299,9 @@ class PageserverHttpClient(requests.Session, MetricsGetter): if flush_ms is not None: params["flush_ms"] = str(flush_ms) + if lazy is not None: + params["lazy"] = "true" if lazy else "false" + res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config", json=body, diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 327e5abe26..cbf7059c92 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -2,6 +2,7 @@ import concurrent.futures import math import random import time +from collections import defaultdict from contextlib import closing from pathlib import Path from typing import Optional @@ -14,6 +15,7 @@ from fixtures.neon_fixtures import ( Endpoint, NeonEnv, NeonEnvBuilder, + NeonPageserver, PgBin, VanillaPostgres, wait_for_last_flush_lsn, @@ -839,22 +841,40 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder): ) # Deleting a stuck tenant should prompt it to go active + # in some cases, it has already been activated because it's behind the detach + delete_lazy_activating(delete_tenant_id, env.pageserver, expect_attaching=False) + tenant_ids.remove(delete_tenant_id) + + # Check that all the stuck tenants proceed to active (apart from the one that deletes, and the one + # we detached) + wait_until(10, 1, all_active) + assert len(get_tenant_states()) == n_tenants - 2 + + +def delete_lazy_activating( + delete_tenant_id: TenantId, pageserver: NeonPageserver, expect_attaching: bool +): + pageserver_http = pageserver.http_client() + + # Deletion itself won't complete due to our failpoint: Tenant::shutdown can't complete while calculating + # logical size is paused in a failpoint. So instead we will use a log observation to check that + # on-demand activation was triggered by the tenant deletion + log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000 gen=[0-9a-f]+}}: Activating tenant \\(on-demand\\).*" + + if expect_attaching: + assert pageserver_http.tenant_status(delete_tenant_id)["state"]["slug"] == "Attaching" + with concurrent.futures.ThreadPoolExecutor() as executor: log.info("Starting background delete") + def activated_on_demand(): + assert pageserver.log_contains(log_match) is not None + def delete_tenant(): - env.pageserver.http_client().tenant_delete(delete_tenant_id) + pageserver_http.tenant_delete(delete_tenant_id) background_delete = executor.submit(delete_tenant) - # Deletion itself won't complete due to our failpoint: Tenant::shutdown can't complete while calculating - # logical size is paused in a failpoint. So instead we will use a log observation to check that - # on-demand activation was triggered by the tenant deletion - log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000 gen=[0-9a-f]+}}: Activating tenant \\(on-demand\\).*" - - def activated_on_demand(): - assert env.pageserver.log_contains(log_match) is not None - log.info(f"Waiting for activation message '{log_match}'") try: wait_until(10, 1, activated_on_demand) @@ -868,12 +888,6 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder): # Poll for deletion to complete wait_tenant_status_404(pageserver_http, tenant_id=delete_tenant_id, iterations=40) - tenant_ids.remove(delete_tenant_id) - - # Check that all the stuck tenants proceed to active (apart from the one that deletes, and the one - # we detached) - wait_until(10, 1, all_active) - assert len(get_tenant_states()) == n_tenants - 2 def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder): @@ -939,3 +953,159 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder): client.configure_failpoints( [("initial-size-calculation-permit-pause", "off"), ("walreceiver-after-ingest", "off")] ) + + +def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder): + neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'" + + env = neon_env_builder.init_start() + + # the supporting_second does nothing except queue behind env.initial_tenant + # for purposes of showing that eager_tenant breezes past the queue + supporting_second, _ = env.neon_cli.create_tenant() + eager_tenant, _ = env.neon_cli.create_tenant() + + client = env.pageserver.http_client() + client.tenant_location_conf( + eager_tenant, + { + "mode": "Detached", + "secondary_conf": None, + "tenant_conf": {}, + "generation": None, + }, + ) + + env.pageserver.stop() + + # pause at logical size calculation, also pause before walreceiver can give feedback so it will give priority to logical size calculation + env.pageserver.start( + extra_env_vars={ + "FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest=pause" + } + ) + + tenant_ids = [env.initial_tenant, supporting_second] + + def get_tenant_states() -> dict[str, list[TenantId]]: + states = defaultdict(list) + for id in tenant_ids: + state = client.tenant_status(id)["state"]["slug"] + states[state].append(id) + return dict(states) + + def one_is_active(): + states = get_tenant_states() + log.info(f"{states}") + assert len(states["Active"]) == 1 + + wait_until(10, 1, one_is_active) + + def other_is_attaching(): + states = get_tenant_states() + assert len(states["Attaching"]) == 1 + + wait_until(10, 1, other_is_attaching) + + def eager_tenant_is_active(): + resp = client.tenant_status(eager_tenant) + assert resp["state"]["slug"] == "Active" + + gen = env.attachment_service.attach_hook_issue(eager_tenant, env.pageserver.id) + client.tenant_location_conf( + eager_tenant, + { + "mode": "AttachedSingle", + "secondary_conf": None, + "tenant_conf": {}, + "generation": gen, + }, + lazy=False, + ) + wait_until(10, 1, eager_tenant_is_active) + + other_is_attaching() + + client.configure_failpoints( + [("timeline-calculate-logical-size-pause", "off"), ("walreceiver-after-ingest", "off")] + ) + + +@pytest.mark.parametrize("activation_method", ["endpoint", "branch", "delete"]) +def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_method: str): + # env.initial_tenant will take up this permit when attaching with lazy because of a failpoint activated after restart + neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'" + + env = neon_env_builder.init_start() + + # because this returns (also elsewhere in this file), we know that SpawnMode::Create skips the queue + lazy_tenant, _ = env.neon_cli.create_tenant() + + client = env.pageserver.http_client() + client.tenant_location_conf( + lazy_tenant, + { + "mode": "Detached", + "secondary_conf": None, + "tenant_conf": {}, + "generation": None, + }, + ) + + env.pageserver.stop() + + # pause at logical size calculation, also pause before walreceiver can give feedback so it will give priority to logical size calculation + env.pageserver.start( + extra_env_vars={ + "FAILPOINTS": "timeline-calculate-logical-size-pause=pause;walreceiver-after-ingest=pause" + } + ) + + def initial_tenant_is_active(): + resp = client.tenant_status(env.initial_tenant) + assert resp["state"]["slug"] == "Active" + + wait_until(10, 1, initial_tenant_is_active) + + # even though the initial tenant is now active, because it was startup time + # attach, it will consume the only permit because logical size calculation + # is paused. + + gen = env.attachment_service.attach_hook_issue(lazy_tenant, env.pageserver.id) + client.tenant_location_conf( + lazy_tenant, + { + "mode": "AttachedSingle", + "secondary_conf": None, + "tenant_conf": {}, + "generation": gen, + }, + lazy=True, + ) + + def lazy_tenant_is_attaching(): + resp = client.tenant_status(lazy_tenant) + assert resp["state"]["slug"] == "Attaching" + + # paused logical size calculation of env.initial_tenant is keeping it attaching + wait_until(10, 1, lazy_tenant_is_attaching) + + for _ in range(5): + lazy_tenant_is_attaching() + time.sleep(0.5) + + def lazy_tenant_is_active(): + resp = client.tenant_status(lazy_tenant) + assert resp["state"]["slug"] == "Active" + + if activation_method == "endpoint": + with env.endpoints.create_start("main", tenant_id=lazy_tenant): + # starting up the endpoint should make it jump the queue + wait_until(10, 1, lazy_tenant_is_active) + elif activation_method == "branch": + env.neon_cli.create_timeline("second_branch", lazy_tenant) + wait_until(10, 1, lazy_tenant_is_active) + elif activation_method == "delete": + delete_lazy_activating(lazy_tenant, env.pageserver, expect_attaching=True) + else: + raise RuntimeError(activation_method)