From 3dbd34aa78258928344d4de80ddcdcf46b35dfbc Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Sat, 7 Sep 2024 00:42:55 +0300 Subject: [PATCH] feat(storcon): forward gc blocking and unblocking (#8956) Currently using gc blocking and unblocking with storage controller managed pageservers is painful. Implement the API on storage controller. Fixes: #8893 --- pageserver/client/src/lib.rs | 18 ++++ pageserver/client/src/mgmt_api.rs | 16 ++++ storage_controller/src/http.rs | 39 +++++++- storage_controller/src/pageserver_client.rs | 23 ++++- storage_controller/src/service.rs | 54 +++++++++++- .../regress/test_timeline_gc_blocking.py | 88 +++++++++++++++---- 6 files changed, 220 insertions(+), 18 deletions(-) diff --git a/pageserver/client/src/lib.rs b/pageserver/client/src/lib.rs index 4a3f4dea47..cc8db37173 100644 --- a/pageserver/client/src/lib.rs +++ b/pageserver/client/src/lib.rs @@ -1,2 +1,20 @@ pub mod mgmt_api; pub mod page_service; + +/// For timeline_block_unblock_gc, distinguish the two different operations. This could be a bool. +// If file structure is per-kind not per-feature then where to put this? +#[derive(Clone, Copy)] +pub enum BlockUnblock { + Block, + Unblock, +} + +impl std::fmt::Display for BlockUnblock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + BlockUnblock::Block => "block", + BlockUnblock::Unblock => "unblock", + }; + f.write_str(s) + } +} diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 737cb00835..a68f45a6d9 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -12,6 +12,8 @@ use utils::{ pub use reqwest::Body as ReqwestBody; +use crate::BlockUnblock; + pub mod util; #[derive(Debug, Clone)] @@ -454,6 +456,20 @@ impl Client { .map_err(Error::ReceiveBody) } + pub async fn timeline_block_unblock_gc( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + dir: BlockUnblock, + ) -> Result<()> { + let uri = format!( + "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc", + self.mgmt_api_endpoint, + ); + + self.request(Method::POST, &uri, ()).await.map(|_| ()) + } + pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> { let uri = format!( "{}/v1/tenant/{}/reset", diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 96bdd5039d..a6638f5191 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -21,7 +21,7 @@ use pageserver_api::models::{ TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, }; use pageserver_api::shard::TenantShardId; -use pageserver_client::mgmt_api; +use pageserver_client::{mgmt_api, BlockUnblock}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio_util::sync::CancellationToken; @@ -369,6 +369,23 @@ async fn handle_tenant_timeline_detach_ancestor( json_response(StatusCode::OK, res) } +async fn handle_tenant_timeline_block_unblock_gc( + service: Arc, + req: Request, + dir: BlockUnblock, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + + service + .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir) + .await?; + + json_response(StatusCode::OK, ()) +} + async fn handle_tenant_timeline_passthrough( service: Arc, req: Request, @@ -1292,6 +1309,26 @@ pub fn make_router( ) }, ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc", + |r| { + tenant_service_handler( + r, + |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block), + RequestName("v1_tenant_timeline_block_unblock_gc"), + ) + }, + ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc", + |r| { + tenant_service_handler( + r, + |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock), + RequestName("v1_tenant_timeline_block_unblock_gc"), + ) + }, + ) // Tenant detail GET passthrough to shard zero: .get("/v1/tenant/:tenant_id", |r| { tenant_service_handler( diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index 20770ed703..961a1f78dd 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -7,7 +7,10 @@ use pageserver_api::{ }, shard::TenantShardId, }; -use pageserver_client::mgmt_api::{Client, Result}; +use pageserver_client::{ + mgmt_api::{Client, Result}, + BlockUnblock, +}; use reqwest::StatusCode; use utils::id::{NodeId, TenantId, TimelineId}; @@ -258,6 +261,24 @@ impl PageserverClient { ) } + pub(crate) async fn timeline_block_unblock_gc( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + dir: BlockUnblock, + ) -> Result<()> { + // measuring these makes no sense because we synchronize with the gc loop and remote + // storage on block_gc so there should be huge outliers + measured_request!( + "timeline_block_unblock_gc", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner + .timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir) + .await + ) + } + pub(crate) async fn get_utilization(&self) -> Result { measured_request!( "utilization", diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 6365423e10..be3efaf688 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -69,7 +69,7 @@ use pageserver_api::{ ValidateResponse, ValidateResponseTenant, }, }; -use pageserver_client::mgmt_api; +use pageserver_client::{mgmt_api, BlockUnblock}; use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use utils::{ @@ -142,6 +142,7 @@ enum TenantOperations { AttachHook, TimelineArchivalConfig, TimelineDetachAncestor, + TimelineGcBlockUnblock, } #[derive(Clone, strum_macros::Display)] @@ -3197,6 +3198,57 @@ impl Service { }).await? } + pub(crate) async fn tenant_timeline_block_unblock_gc( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + dir: BlockUnblock, + ) -> Result<(), ApiError> { + let _tenant_lock = trace_shared_lock( + &self.tenant_op_locks, + tenant_id, + TenantOperations::TimelineGcBlockUnblock, + ) + .await; + + self.tenant_remote_mutation(tenant_id, move |targets| async move { + if targets.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); + } + + async fn do_one( + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + node: Node, + jwt: Option, + dir: BlockUnblock, + ) -> Result<(), ApiError> { + let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); + + client + .timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir) + .await + .map_err(|e| passthrough_api_error(&node, e)) + } + + // no shard needs to go first/last; the operation should be idempotent + self.tenant_for_shards(targets, |tenant_shard_id, node| { + futures::FutureExt::boxed(do_one( + tenant_shard_id, + timeline_id, + node, + self.config.jwt_token.clone(), + dir, + )) + }) + .await + }) + .await??; + Ok(()) + } + /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation. /// /// On success, the returned vector contains exactly the same number of elements as the input `locations`. diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index 24de894687..ddfe9b911f 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -1,17 +1,32 @@ import time +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +from typing import List, Optional +import pytest +from fixtures.log_helper import log from fixtures.neon_fixtures import ( + LogCursor, NeonEnvBuilder, + NeonPageserver, ) from fixtures.pageserver.utils import wait_timeline_detail_404 -def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize("sharded", [True, False]) +def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool): + neon_env_builder.num_pageservers = 2 if sharded else 1 env = neon_env_builder.init_start( - initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"} + initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}, + initial_tenant_shard_count=2 if sharded else None, ) - ps = env.pageserver - http = ps.http_client() + + if sharded: + http = env.storage_controller.pageserver_api() + else: + http = env.pageserver.http_client() + + pss = ManyPageservers(list(map(lambda ps: ScrollableLog(ps, None), env.pageservers))) foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant) @@ -22,9 +37,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder): tenant_before = http.tenant_status(env.initial_tenant) wait_for_another_gc_round() - _, offset = ps.assert_log_contains(gc_active_line) - - assert ps.log_contains(gc_skipped_line, offset) is None + pss.assert_log_contains(gc_active_line) + pss.assert_log_does_not_contain(gc_skipped_line) http.timeline_block_gc(env.initial_tenant, foo_branch) @@ -34,34 +48,78 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder): assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }" wait_for_another_gc_round() - _, offset = ps.assert_log_contains(gc_skipped_line, offset) + pss.assert_log_contains(gc_skipped_line) - ps.restart() - ps.quiesce_tenants() + pss.restart() + pss.quiesce_tenants() - _, offset = env.pageserver.assert_log_contains(init_gc_skipped, offset) + pss.assert_log_contains(init_gc_skipped) wait_for_another_gc_round() - _, offset = ps.assert_log_contains(gc_skipped_line, offset) + pss.assert_log_contains(gc_skipped_line) # deletion unblocks gc http.timeline_delete(env.initial_tenant, foo_branch) wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0) wait_for_another_gc_round() - _, offset = ps.assert_log_contains(gc_active_line, offset) + pss.assert_log_contains(gc_active_line) http.timeline_block_gc(env.initial_tenant, env.initial_timeline) wait_for_another_gc_round() - _, offset = ps.assert_log_contains(gc_skipped_line, offset) + pss.assert_log_contains(gc_skipped_line) # removing the manual block also unblocks gc http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline) wait_for_another_gc_round() - _, offset = ps.assert_log_contains(gc_active_line, offset) + pss.assert_log_contains(gc_active_line) def wait_for_another_gc_round(): time.sleep(2) + + +@dataclass +class ScrollableLog: + pageserver: NeonPageserver + offset: Optional[LogCursor] + + def assert_log_contains(self, what: str): + msg, offset = self.pageserver.assert_log_contains(what, offset=self.offset) + old = self.offset + self.offset = offset + log.info(f"{old} -> {offset}: {msg}") + + def assert_log_does_not_contain(self, what: str): + assert self.pageserver.log_contains(what) is None + + +@dataclass(frozen=True) +class ManyPageservers: + many: List[ScrollableLog] + + def assert_log_contains(self, what: str): + for one in self.many: + one.assert_log_contains(what) + + def assert_log_does_not_contain(self, what: str): + for one in self.many: + one.assert_log_does_not_contain(what) + + def restart(self): + def do_restart(x: ScrollableLog): + x.pageserver.restart() + + with ThreadPoolExecutor(max_workers=len(self.many)) as rt: + rt.map(do_restart, self.many) + rt.shutdown(wait=True) + + def quiesce_tenants(self): + def do_quiesce(x: ScrollableLog): + x.pageserver.quiesce_tenants() + + with ThreadPoolExecutor(max_workers=len(self.many)) as rt: + rt.map(do_quiesce, self.many) + rt.shutdown(wait=True)