mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
Implement a sharded time travel recovery endpoint (#6821)
The sharding service didn't have support for S3 disaster recovery. This PR adds a new endpoint to the attachment service, which is slightly different from the endpoint on the pageserver, in that it takes the shard count history of the tenant as json parameters: we need to do time travel recovery for both the shard count at the target time and the shard count at the current moment in time, as well as the past shard counts that either still reference. Fixes #6604, part of https://github.com/neondatabase/cloud/issues/8233 --------- Co-authored-by: John Spray <john@neon.tech>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -284,6 +284,7 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"futures",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
|
||||
@@ -18,6 +18,7 @@ clap.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
|
||||
@@ -4,7 +4,7 @@ use hyper::{Body, Request, Response};
|
||||
use hyper::{StatusCode, Uri};
|
||||
use pageserver_api::models::{
|
||||
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TimelineCreateRequest,
|
||||
TenantTimeTravelRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
@@ -12,7 +12,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
use utils::http::endpoint::{auth_middleware, request_span};
|
||||
use utils::http::request::parse_request_param;
|
||||
use utils::http::request::{must_get_query_param, parse_request_param};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::{
|
||||
@@ -180,6 +180,39 @@ async fn handle_tenant_location_config(
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_time_travel_remote_storage(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
|
||||
|
||||
let timestamp_raw = must_get_query_param(&req, "travel_to")?;
|
||||
let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
|
||||
ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Invalid time for travel_to: {timestamp_raw:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
|
||||
let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
|
||||
ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Invalid time for done_if_after: {done_if_after_raw:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
service
|
||||
.tenant_time_travel_remote_storage(
|
||||
&time_travel_req,
|
||||
tenant_id,
|
||||
timestamp_raw,
|
||||
done_if_after_raw,
|
||||
)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_delete(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -477,6 +510,9 @@ pub fn make_router(
|
||||
.put("/v1/tenant/:tenant_id/location_config", |r| {
|
||||
tenant_service_handler(r, handle_tenant_location_config)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
|
||||
tenant_service_handler(r, handle_tenant_time_travel_remote_storage)
|
||||
})
|
||||
// Timeline operations
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
tenant_service_handler(r, handle_tenant_timeline_delete)
|
||||
|
||||
@@ -175,10 +175,7 @@ impl Scheduler {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
str::FromStr,
|
||||
@@ -25,7 +26,7 @@ use pageserver_api::{
|
||||
self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
|
||||
TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
|
||||
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
|
||||
TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
|
||||
TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
|
||||
};
|
||||
@@ -1329,6 +1330,95 @@ impl Service {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_time_travel_remote_storage(
|
||||
&self,
|
||||
time_travel_req: &TenantTimeTravelRequest,
|
||||
tenant_id: TenantId,
|
||||
timestamp: Cow<'_, str>,
|
||||
done_if_after: Cow<'_, str>,
|
||||
) -> Result<(), ApiError> {
|
||||
let node = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
// Just a sanity check to prevent misuse: the API expects that the tenant is fully
|
||||
// detached everywhere, and nothing writes to S3 storage. Here, we verify that,
|
||||
// but only at the start of the process, so it's really just to prevent operator
|
||||
// mistakes.
|
||||
for (shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
|
||||
if shard.intent.get_attached().is_some() || !shard.intent.get_secondary().is_empty()
|
||||
{
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"We want tenant to be attached in shard with tenant_shard_id={shard_id}"
|
||||
)));
|
||||
}
|
||||
let maybe_attached = shard
|
||||
.observed
|
||||
.locations
|
||||
.iter()
|
||||
.filter_map(|(node_id, observed_location)| {
|
||||
observed_location
|
||||
.conf
|
||||
.as_ref()
|
||||
.map(|loc| (node_id, observed_location, loc.mode))
|
||||
})
|
||||
.find(|(_, _, mode)| *mode != LocationConfigMode::Detached);
|
||||
if let Some((node_id, _observed_location, mode)) = maybe_attached {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}")));
|
||||
}
|
||||
}
|
||||
let scheduler = &locked.scheduler;
|
||||
// Right now we only perform the operation on a single node without parallelization
|
||||
// TODO fan out the operation to multiple nodes for better performance
|
||||
let node_id = scheduler.schedule_shard(&[])?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while lock is active");
|
||||
node.clone()
|
||||
};
|
||||
|
||||
// The shard count is encoded in the remote storage's URL, so we need to handle all historically used shard counts
|
||||
let mut counts = time_travel_req
|
||||
.shard_counts
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
counts.sort_unstable();
|
||||
|
||||
for count in counts {
|
||||
let shard_ids = (0..count.count())
|
||||
.map(|i| TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(i),
|
||||
shard_count: count,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
for tenant_shard_id in shard_ids {
|
||||
let client =
|
||||
mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
|
||||
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
|
||||
|
||||
client
|
||||
.tenant_time_travel_remote_storage(
|
||||
tenant_shard_id,
|
||||
×tamp,
|
||||
&done_if_after,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
|
||||
node.id
|
||||
))
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
|
||||
@@ -495,6 +495,13 @@ impl TenantState {
|
||||
}
|
||||
}
|
||||
|
||||
for node_id in self.observed.locations.keys() {
|
||||
if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
|
||||
// We have observed state that isn't part of our intent: need to clean it up.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
|
||||
// wake up a reconciler to send it.
|
||||
if self.pending_compute_notification {
|
||||
|
||||
@@ -616,7 +616,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
let tenant_id = get_tenant_id(create_match, env)?;
|
||||
let new_branch_name = create_match
|
||||
.get_one::<String>("branch-name")
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?;
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?; // TODO
|
||||
|
||||
let pg_version = create_match
|
||||
.get_one::<u32>("pg-version")
|
||||
|
||||
@@ -344,7 +344,7 @@ impl ThrottleConfig {
|
||||
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
|
||||
/// lists out all possible states (and the virtual "Detached" state)
|
||||
/// in a flat form rather than using rust-style enums.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum LocationConfigMode {
|
||||
AttachedSingle,
|
||||
AttachedMulti,
|
||||
@@ -408,6 +408,12 @@ pub struct TenantLocationConfigRequest {
|
||||
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantTimeTravelRequest {
|
||||
pub shard_counts: Vec<ShardCount>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantShardLocation {
|
||||
|
||||
@@ -217,6 +217,20 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tenant_time_travel_remote_storage(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timestamp: &str,
|
||||
done_if_after: &str,
|
||||
) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
self.request(Method::PUT, &uri, ()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
|
||||
let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, &uri, req).await?;
|
||||
|
||||
@@ -512,7 +512,7 @@ class NeonEnvBuilder:
|
||||
|
||||
def init_start(
|
||||
self,
|
||||
initial_tenant_conf: Optional[Dict[str, str]] = None,
|
||||
initial_tenant_conf: Optional[Dict[str, Any]] = None,
|
||||
default_remote_storage_if_missing: bool = True,
|
||||
initial_tenant_shard_count: Optional[int] = None,
|
||||
initial_tenant_shard_stripe_size: Optional[int] = None,
|
||||
@@ -1497,7 +1497,7 @@ class NeonCli(AbstractNeonCli):
|
||||
self,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
conf: Optional[Dict[str, str]] = None,
|
||||
conf: Optional[Dict[str, Any]] = None,
|
||||
shard_count: Optional[int] = None,
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
set_default: bool = False,
|
||||
|
||||
@@ -395,12 +395,20 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timestamp: datetime,
|
||||
done_if_after: datetime,
|
||||
shard_counts: Optional[List[int]] = None,
|
||||
):
|
||||
"""
|
||||
Issues a request to perform time travel operations on the remote storage
|
||||
"""
|
||||
|
||||
if shard_counts is None:
|
||||
shard_counts = []
|
||||
body: Dict[str, Any] = {
|
||||
"shard_counts": shard_counts,
|
||||
}
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z"
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z",
|
||||
json=body,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
|
||||
@@ -482,8 +482,8 @@ def tenant_delete_wait_completed(
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG = {
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"checkpoint_distance": f"{1024**2}",
|
||||
"image_creation_threshold": "100",
|
||||
"checkpoint_distance": 1024**2,
|
||||
"image_creation_threshold": 100,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,13 +1,30 @@
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from typing import List
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed
|
||||
from fixtures.pageserver.utils import (
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
enable_remote_storage_versioning,
|
||||
list_prefix,
|
||||
remote_storage_delete_key,
|
||||
tenant_delete_wait_completed,
|
||||
timeline_delete_wait_completed,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -457,3 +474,113 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
# Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
|
||||
# meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_s3_time_travel_recovery(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
"""
|
||||
Test for S3 time travel
|
||||
"""
|
||||
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
# Mock S3 doesn't have versioning enabled by default, enable it
|
||||
# (also do it before there is any writes to the bucket)
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
remote_storage = neon_env_builder.pageserver_remote_storage
|
||||
assert remote_storage, "remote storage not configured"
|
||||
enable_remote_storage_versioning(remote_storage)
|
||||
|
||||
neon_env_builder.num_pageservers = 1
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.attachment_service.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=2,
|
||||
shard_stripe_size=8192,
|
||||
tenant_config=MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
)
|
||||
|
||||
# Check that the consistency check passes
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
branch_name = "main"
|
||||
timeline_id = env.neon_cli.create_timeline(
|
||||
branch_name,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
# Write some nontrivial amount of data into the endpoint and wait until it is uploaded
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
endpoint.safe_psql("CREATE TABLE created_foo(id integer);")
|
||||
# last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# Give the data time to be uploaded
|
||||
time.sleep(4)
|
||||
|
||||
# Detach the tenant
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
|
||||
time.sleep(4)
|
||||
ts_before_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
# Simulate a "disaster": delete some random files from remote storage for one of the shards
|
||||
assert env.pageserver_remote_storage
|
||||
shard_id_for_list = "0002"
|
||||
objects: List[ObjectTypeDef] = list_prefix(
|
||||
env.pageserver_remote_storage,
|
||||
f"tenants/{tenant_id}-{shard_id_for_list}/timelines/{timeline_id}/",
|
||||
).get("Contents", [])
|
||||
assert len(objects) > 1
|
||||
log.info(f"Found {len(objects)} objects in remote storage")
|
||||
should_delete = False
|
||||
for obj in objects:
|
||||
obj_key = obj["Key"]
|
||||
should_delete = not should_delete
|
||||
if not should_delete:
|
||||
log.info(f"Keeping key on remote storage: {obj_key}")
|
||||
continue
|
||||
log.info(f"Deleting key from remote storage: {obj_key}")
|
||||
remote_storage_delete_key(env.pageserver_remote_storage, obj_key)
|
||||
pass
|
||||
|
||||
time.sleep(4)
|
||||
ts_after_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
# Do time travel recovery
|
||||
virtual_ps_http.tenant_time_travel_remote_storage(
|
||||
tenant_id, ts_before_disaster, ts_after_disaster, shard_counts=[2]
|
||||
)
|
||||
time.sleep(4)
|
||||
|
||||
# Attach the tenant again
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "AttachedSingle",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": 100,
|
||||
},
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql("SELECT * FROM created_foo;")
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
Reference in New Issue
Block a user