feat(storcon): timeline detach ancestor passthrough (#8353)

Currently storage controller does not support forwarding timeline detach
ancestor requests to pageservers. Add support for forwarding `PUT
.../:tenant_id/timelines/:timeline_id/detach_ancestor`. Implement the
support mostly as is, because the timeline detach ancestor will be made
(mostly) idempotent in future PR.

Cc: #6994
This commit is contained in:
Joonas Koivunen
2024-07-15 18:08:24 +03:00
committed by GitHub
parent b49b450dc4
commit 324e4e008f
7 changed files with 281 additions and 26 deletions

View File

@@ -1,6 +1,6 @@
use utils::id::TimelineId;
#[derive(Default, serde::Serialize)]
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct AncestorDetached {
pub reparented_timelines: Vec<TimelineId>,
}

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use bytes::Bytes;
use detach_ancestor::AncestorDetached;
use pageserver_api::{models::*, shard::TenantShardId};
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
@@ -418,6 +419,23 @@ impl Client {
}
}
pub async fn timeline_detach_ancestor(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<AncestorDetached> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor",
self.mgmt_api_endpoint
);
self.request(Method::PUT, &uri, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{}/reset",

View File

@@ -330,6 +330,22 @@ async fn handle_tenant_timeline_delete(
.await
}
async fn handle_tenant_timeline_detach_ancestor(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
let res = service
.tenant_timeline_detach_ancestor(tenant_id, timeline_id)
.await?;
json_response(StatusCode::OK, res)
}
async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
@@ -1006,6 +1022,16 @@ pub fn make_router(
RequestName("v1_tenant_timeline"),
)
})
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_detach_ancestor,
RequestName("v1_tenant_timeline_detach_ancestor"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(

View File

@@ -1,8 +1,9 @@
use pageserver_api::{
models::{
LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress,
TenantScanRemoteStorageResponse, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
detach_ancestor::AncestorDetached, LocationConfig, LocationConfigListResponse,
PageserverUtilization, SecondaryProgress, TenantScanRemoteStorageResponse,
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
TopTenantShardsRequest, TopTenantShardsResponse,
},
shard::TenantShardId,
};
@@ -226,6 +227,21 @@ impl PageserverClient {
)
}
pub(crate) async fn timeline_detach_ancestor(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<AncestorDetached> {
measured_request!(
"timeline_detach_ancestor",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
.await
)
}
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
measured_request!(
"utilization",

View File

@@ -117,6 +117,7 @@ enum TenantOperations {
TimelineCreate,
TimelineDelete,
AttachHook,
TimelineDetachAncestor,
}
#[derive(Clone, strum_macros::Display)]
@@ -2376,18 +2377,18 @@ impl Service {
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
client
.tenant_time_travel_remote_storage(
tenant_shard_id,
&timestamp,
&done_if_after,
)
.await
.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
node
))
})?;
.tenant_time_travel_remote_storage(
tenant_shard_id,
&timestamp,
&done_if_after,
)
.await
.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
node
))
})?;
}
}
Ok(())
@@ -2757,7 +2758,7 @@ impl Service {
// Create timeline on remaining shards with number >0
if !targets.is_empty() {
// If we had multiple shards, issue requests for the remainder now.
let jwt = self.config.jwt_token.clone();
let jwt = &self.config.jwt_token;
self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
let create_req = create_req.clone();
Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
@@ -2768,6 +2769,115 @@ impl Service {
Ok(timeline_info)
}
pub(crate) async fn tenant_timeline_detach_ancestor(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<models::detach_ancestor::AncestorDetached, ApiError> {
tracing::info!("Detaching timeline {tenant_id}/{timeline_id}",);
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineDetachAncestor,
)
.await;
self.ensure_attached_wait(tenant_id).await?;
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let node_id = shard.intent.get_attached().ok_or_else(|| {
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
})?;
let node = locked
.nodes
.get(&node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
}
targets
};
if targets.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
));
}
async fn detach_one(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
node: Node,
jwt: Option<String>,
) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> {
tracing::info!(
"Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
client
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
.await
.map_err(|e| {
use mgmt_api::Error;
match e {
// no ancestor (ever)
Error::ApiError(StatusCode::CONFLICT, msg) => {
ApiError::Conflict(format!("{node}: {msg}"))
}
// too many ancestors
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
}
// rest can be mapped
other => passthrough_api_error(&node, other),
}
})
.map(|res| (tenant_shard_id.shard_number, res))
}
// no shard needs to go first/last; the operation should be idempotent
// TODO: it would be great to ensure that all shards return the same error
let mut results = self
.tenant_for_shards(targets, |tenant_shard_id, node| {
futures::FutureExt::boxed(detach_one(
tenant_shard_id,
timeline_id,
node,
self.config.jwt_token.clone(),
))
})
.await?;
let any = results.pop().expect("we must have at least one response");
// FIXME: the ordering is not stable yet on pageserver, should be (ancestor_lsn,
// TimelineId)
let mismatching = results
.iter()
.filter(|(_, res)| res != &any.1)
.collect::<Vec<_>>();
if !mismatching.is_empty() {
let matching = results.len() - mismatching.len();
tracing::error!(
matching,
compared_against=?any,
?mismatching,
"shards returned different results"
);
}
Ok(any.1)
}
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
///
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
@@ -2894,8 +3004,8 @@ impl Service {
.await
.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}",
))
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}",
))
})
}

View File

@@ -2400,7 +2400,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int}
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int}
"""
response = self.request(
"GET",

View File

@@ -11,11 +11,12 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
flush_ep_to_pageserver,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import HistoricLayerInfo, PageserverApiException
from fixtures.pageserver.utils import wait_timeline_detail_404
from fixtures.remote_storage import LocalFsStorage
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_timeline_detail_404
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import assert_pageserver_backups_equal
@@ -559,11 +560,24 @@ def test_compaction_induced_by_detaches_in_history(
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, set())
def test_timeline_ancestor_errors(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
@pytest.mark.parametrize("sharded", [True, False])
def test_timeline_ancestor_detach_errors(neon_env_builder: NeonEnvBuilder, sharded: bool):
shards = 2 if sharded else 1
client = env.pageserver.http_client()
neon_env_builder.num_pageservers = shards
env = neon_env_builder.init_start(initial_tenant_shard_count=shards if sharded else None)
pageservers = dict((int(p.id), p) for p in env.pageservers)
for ps in pageservers.values():
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
if sharded:
# FIXME: should this be in the neon_env_builder.init_start?
env.storage_controller.reconcile_until_idle()
client = env.storage_controller.pageserver_api()
else:
client = env.pageserver.http_client()
with pytest.raises(PageserverApiException, match=".* no ancestors") as info:
client.detach_ancestor(env.initial_tenant, env.initial_timeline)
@@ -577,6 +591,17 @@ def test_timeline_ancestor_errors(neon_env_builder: NeonEnvBuilder):
client.detach_ancestor(env.initial_tenant, second_branch)
assert info.value.status_code == 400
client.detach_ancestor(env.initial_tenant, first_branch)
# FIXME: this should be done by the http req handler
for ps in pageservers.values():
ps.quiesce_tenants()
with pytest.raises(PageserverApiException, match=".* no ancestors") as info:
client.detach_ancestor(env.initial_tenant, first_branch)
# FIXME: this should be 200 OK because we've already completed it
assert info.value.status_code == 409
client.tenant_delete(env.initial_tenant)
with pytest.raises(PageserverApiException) as e:
@@ -584,6 +609,58 @@ def test_timeline_ancestor_errors(neon_env_builder: NeonEnvBuilder):
assert e.value.status_code == 404
def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
branch_name = "soon_detached"
shard_count = 4
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
for ps in env.pageservers:
ps.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
# FIXME: should this be in the neon_env_builder.init_start?
env.storage_controller.reconcile_until_idle()
shards = env.storage_controller.locate(env.initial_tenant)
branch_timeline_id = env.neon_cli.create_branch(branch_name, tenant_id=env.initial_tenant)
with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep:
ep.safe_psql(
"create table foo as select 1::bigint, i::bigint from generate_series(1, 10000) v(i)"
)
lsn = flush_ep_to_pageserver(env, ep, env.initial_tenant, branch_timeline_id)
pageservers = dict((int(p.id), p) for p in env.pageservers)
for shard_info in shards:
node_id = int(shard_info["node_id"])
shard_id = shard_info["shard_id"]
detail = pageservers[node_id].http_client().timeline_detail(shard_id, branch_timeline_id)
assert Lsn(detail["last_record_lsn"]) >= lsn
assert Lsn(detail["initdb_lsn"]) < lsn
assert TimelineId(detail["ancestor_timeline_id"]) == env.initial_timeline
env.storage_controller.pageserver_api().detach_ancestor(env.initial_tenant, branch_timeline_id)
for shard_info in shards:
node_id = int(shard_info["node_id"])
shard_id = shard_info["shard_id"]
# TODO: ensure quescing is done on pageserver?
pageservers[node_id].quiesce_tenants()
detail = pageservers[node_id].http_client().timeline_detail(shard_id, branch_timeline_id)
wait_for_last_record_lsn(
pageservers[node_id].http_client(), shard_id, branch_timeline_id, lsn
)
assert detail.get("ancestor_timeline_id") is None
with env.endpoints.create_start(branch_name, tenant_id=env.initial_tenant) as ep:
count = int(ep.safe_psql("select count(*) from foo")[0][0])
assert count == 10000
# TODO:
# - after starting the operation, tenant is deleted
# - after starting the operation, pageserver is shutdown, restarted
@@ -591,3 +668,11 @@ def test_timeline_ancestor_errors(neon_env_builder: NeonEnvBuilder):
# - deletion of reparented while reparenting should fail once, then succeed (?)
# - branch near existing L1 boundary, image layers?
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.
#
# TEST: 1. tad which partially succeeds, one returns 500
# 2. create branch below timeline? or delete timeline below
# 3. on retry all should report the same reparented timelines
#
# TEST: 1. tad is started, one node stalls, other restarts
# 2. client timeout before stall over
# 3. on retry with stalled and other being able to proceed