mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
feat(storcon): forward gc blocking and unblocking (#8956)
Currently using gc blocking and unblocking with storage controller managed pageservers is painful. Implement the API on storage controller. Fixes: #8893
This commit is contained in:
@@ -1,2 +1,20 @@
|
||||
pub mod mgmt_api;
|
||||
pub mod page_service;
|
||||
|
||||
/// For timeline_block_unblock_gc, distinguish the two different operations. This could be a bool.
|
||||
// If file structure is per-kind not per-feature then where to put this?
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum BlockUnblock {
|
||||
Block,
|
||||
Unblock,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BlockUnblock {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let s = match self {
|
||||
BlockUnblock::Block => "block",
|
||||
BlockUnblock::Unblock => "unblock",
|
||||
};
|
||||
f.write_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ use utils::{
|
||||
|
||||
pub use reqwest::Body as ReqwestBody;
|
||||
|
||||
use crate::BlockUnblock;
|
||||
|
||||
pub mod util;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -454,6 +456,20 @@ impl Client {
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn timeline_block_unblock_gc(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/{dir}_gc",
|
||||
self.mgmt_api_endpoint,
|
||||
);
|
||||
|
||||
self.request(Method::POST, &uri, ()).await.map(|_| ())
|
||||
}
|
||||
|
||||
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/reset",
|
||||
|
||||
@@ -21,7 +21,7 @@ use pageserver_api::models::{
|
||||
TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -369,6 +369,23 @@ async fn handle_tenant_timeline_detach_ancestor(
|
||||
json_response(StatusCode::OK, res)
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_block_unblock_gc(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
service
|
||||
.tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_passthrough(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -1292,6 +1309,26 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
|
||||
RequestName("v1_tenant_timeline_block_unblock_gc"),
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
|s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
|
||||
RequestName("v1_tenant_timeline_block_unblock_gc"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// Tenant detail GET passthrough to shard zero:
|
||||
.get("/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -7,7 +7,10 @@ use pageserver_api::{
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use pageserver_client::mgmt_api::{Client, Result};
|
||||
use pageserver_client::{
|
||||
mgmt_api::{Client, Result},
|
||||
BlockUnblock,
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -258,6 +261,24 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn timeline_block_unblock_gc(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<()> {
|
||||
// measuring these makes no sense because we synchronize with the gc loop and remote
|
||||
// storage on block_gc so there should be huge outliers
|
||||
measured_request!(
|
||||
"timeline_block_unblock_gc",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
|
||||
measured_request!(
|
||||
"utilization",
|
||||
|
||||
@@ -69,7 +69,7 @@ use pageserver_api::{
|
||||
ValidateResponse, ValidateResponseTenant,
|
||||
},
|
||||
};
|
||||
use pageserver_client::mgmt_api;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
@@ -142,6 +142,7 @@ enum TenantOperations {
|
||||
AttachHook,
|
||||
TimelineArchivalConfig,
|
||||
TimelineDetachAncestor,
|
||||
TimelineGcBlockUnblock,
|
||||
}
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
@@ -3197,6 +3198,57 @@ impl Service {
|
||||
}).await?
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_block_unblock_gc(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<(), ApiError> {
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineGcBlockUnblock,
|
||||
)
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |targets| async move {
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
|
||||
async fn do_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<(), ApiError> {
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
|
||||
.await
|
||||
.map_err(|e| passthrough_api_error(&node, e))
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
self.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(do_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
dir,
|
||||
))
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
|
||||
///
|
||||
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
|
||||
|
||||
@@ -1,17 +1,32 @@
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
LogCursor,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_timeline_detail_404
|
||||
|
||||
|
||||
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("sharded", [True, False])
|
||||
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool):
|
||||
neon_env_builder.num_pageservers = 2 if sharded else 1
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}
|
||||
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"},
|
||||
initial_tenant_shard_count=2 if sharded else None,
|
||||
)
|
||||
ps = env.pageserver
|
||||
http = ps.http_client()
|
||||
|
||||
if sharded:
|
||||
http = env.storage_controller.pageserver_api()
|
||||
else:
|
||||
http = env.pageserver.http_client()
|
||||
|
||||
pss = ManyPageservers(list(map(lambda ps: ScrollableLog(ps, None), env.pageservers)))
|
||||
|
||||
foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant)
|
||||
|
||||
@@ -22,9 +37,8 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_before = http.tenant_status(env.initial_tenant)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line)
|
||||
|
||||
assert ps.log_contains(gc_skipped_line, offset) is None
|
||||
pss.assert_log_contains(gc_active_line)
|
||||
pss.assert_log_does_not_contain(gc_skipped_line)
|
||||
|
||||
http.timeline_block_gc(env.initial_tenant, foo_branch)
|
||||
|
||||
@@ -34,34 +48,78 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
ps.restart()
|
||||
ps.quiesce_tenants()
|
||||
pss.restart()
|
||||
pss.quiesce_tenants()
|
||||
|
||||
_, offset = env.pageserver.assert_log_contains(init_gc_skipped, offset)
|
||||
pss.assert_log_contains(init_gc_skipped)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
# deletion unblocks gc
|
||||
http.timeline_delete(env.initial_tenant, foo_branch)
|
||||
wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line, offset)
|
||||
pss.assert_log_contains(gc_active_line)
|
||||
|
||||
http.timeline_block_gc(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
# removing the manual block also unblocks gc
|
||||
http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line, offset)
|
||||
pss.assert_log_contains(gc_active_line)
|
||||
|
||||
|
||||
def wait_for_another_gc_round():
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScrollableLog:
|
||||
pageserver: NeonPageserver
|
||||
offset: Optional[LogCursor]
|
||||
|
||||
def assert_log_contains(self, what: str):
|
||||
msg, offset = self.pageserver.assert_log_contains(what, offset=self.offset)
|
||||
old = self.offset
|
||||
self.offset = offset
|
||||
log.info(f"{old} -> {offset}: {msg}")
|
||||
|
||||
def assert_log_does_not_contain(self, what: str):
|
||||
assert self.pageserver.log_contains(what) is None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ManyPageservers:
|
||||
many: List[ScrollableLog]
|
||||
|
||||
def assert_log_contains(self, what: str):
|
||||
for one in self.many:
|
||||
one.assert_log_contains(what)
|
||||
|
||||
def assert_log_does_not_contain(self, what: str):
|
||||
for one in self.many:
|
||||
one.assert_log_does_not_contain(what)
|
||||
|
||||
def restart(self):
|
||||
def do_restart(x: ScrollableLog):
|
||||
x.pageserver.restart()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
|
||||
rt.map(do_restart, self.many)
|
||||
rt.shutdown(wait=True)
|
||||
|
||||
def quiesce_tenants(self):
|
||||
def do_quiesce(x: ScrollableLog):
|
||||
x.pageserver.quiesce_tenants()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=len(self.many)) as rt:
|
||||
rt.map(do_quiesce, self.many)
|
||||
rt.shutdown(wait=True)
|
||||
|
||||
Reference in New Issue
Block a user