diff --git a/Cargo.lock b/Cargo.lock index ac8cceb5f6..51c433cd07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,7 @@ dependencies = [ "diesel_migrations", "futures", "git-version", + "humantime", "hyper", "metrics", "once_cell", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 9e1c6377ee..bfdfd4c77d 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -18,6 +18,7 @@ clap.workspace = true futures.workspace = true git-version.workspace = true hyper.workspace = true +humantime.workspace = true once_cell.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 67ab37dfc1..d85753bedc 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -4,7 +4,7 @@ use hyper::{Body, Request, Response}; use hyper::{StatusCode, Uri}; use pageserver_api::models::{ TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest, - TimelineCreateRequest, + TenantTimeTravelRequest, TimelineCreateRequest, }; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; @@ -12,7 +12,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use utils::auth::SwappableJwtAuth; use utils::http::endpoint::{auth_middleware, request_span}; -use utils::http::request::parse_request_param; +use utils::http::request::{must_get_query_param, parse_request_param}; use utils::id::{TenantId, TimelineId}; use utils::{ @@ -180,6 +180,39 @@ async fn handle_tenant_location_config( ) } +async fn handle_tenant_time_travel_remote_storage( + service: Arc, + mut req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let time_travel_req = json_request::(&mut req).await?; + + let timestamp_raw = must_get_query_param(&req, "travel_to")?; + let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| { + ApiError::BadRequest(anyhow::anyhow!( + "Invalid time for travel_to: {timestamp_raw:?}" + )) + })?; + + let done_if_after_raw = must_get_query_param(&req, "done_if_after")?; + let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| { + ApiError::BadRequest(anyhow::anyhow!( + "Invalid time for done_if_after: {done_if_after_raw:?}" + )) + })?; + + service + .tenant_time_travel_remote_storage( + &time_travel_req, + tenant_id, + timestamp_raw, + done_if_after_raw, + ) + .await?; + + json_response(StatusCode::OK, ()) +} + async fn handle_tenant_delete( service: Arc, req: Request, @@ -477,6 +510,9 @@ pub fn make_router( .put("/v1/tenant/:tenant_id/location_config", |r| { tenant_service_handler(r, handle_tenant_location_config) }) + .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| { + tenant_service_handler(r, handle_tenant_time_travel_remote_storage) + }) // Timeline operations .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { tenant_service_handler(r, handle_tenant_timeline_delete) diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs index 39d8d0a260..fb3c7f634c 100644 --- a/control_plane/attachment_service/src/scheduler.rs +++ b/control_plane/attachment_service/src/scheduler.rs @@ -175,10 +175,7 @@ impl Scheduler { } } - pub(crate) fn schedule_shard( - &mut self, - hard_exclude: &[NodeId], - ) -> Result { + pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result { if self.nodes.is_empty() { return Err(ScheduleError::NoPageservers); } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 0236496c61..74e1296709 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Cow, cmp::Ordering, collections::{BTreeMap, HashMap, HashSet}, str::FromStr, @@ -25,7 +26,7 @@ use pageserver_api::{ self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest, TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest, - TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, + TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo, }, shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId}, }; @@ -1329,6 +1330,95 @@ impl Service { Ok(result) } + pub(crate) async fn tenant_time_travel_remote_storage( + &self, + time_travel_req: &TenantTimeTravelRequest, + tenant_id: TenantId, + timestamp: Cow<'_, str>, + done_if_after: Cow<'_, str>, + ) -> Result<(), ApiError> { + let node = { + let locked = self.inner.read().unwrap(); + // Just a sanity check to prevent misuse: the API expects that the tenant is fully + // detached everywhere, and nothing writes to S3 storage. Here, we verify that, + // but only at the start of the process, so it's really just to prevent operator + // mistakes. + for (shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { + if shard.intent.get_attached().is_some() || !shard.intent.get_secondary().is_empty() + { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "We want tenant to be attached in shard with tenant_shard_id={shard_id}" + ))); + } + let maybe_attached = shard + .observed + .locations + .iter() + .filter_map(|(node_id, observed_location)| { + observed_location + .conf + .as_ref() + .map(|loc| (node_id, observed_location, loc.mode)) + }) + .find(|(_, _, mode)| *mode != LocationConfigMode::Detached); + if let Some((node_id, _observed_location, mode)) = maybe_attached { + return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}"))); + } + } + let scheduler = &locked.scheduler; + // Right now we only perform the operation on a single node without parallelization + // TODO fan out the operation to multiple nodes for better performance + let node_id = scheduler.schedule_shard(&[])?; + let node = locked + .nodes + .get(&node_id) + .expect("Pageservers may not be deleted while lock is active"); + node.clone() + }; + + // The shard count is encoded in the remote storage's URL, so we need to handle all historically used shard counts + let mut counts = time_travel_req + .shard_counts + .iter() + .copied() + .collect::>() + .into_iter() + .collect::>(); + counts.sort_unstable(); + + for count in counts { + let shard_ids = (0..count.count()) + .map(|i| TenantShardId { + tenant_id, + shard_number: ShardNumber(i), + shard_count: count, + }) + .collect::>(); + for tenant_shard_id in shard_ids { + let client = + mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + + tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",); + + client + .tenant_time_travel_remote_storage( + tenant_shard_id, + ×tamp, + &done_if_after, + ) + .await + .map_err(|e| { + ApiError::InternalServerError(anyhow::anyhow!( + "Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}", + node.id + )) + })?; + } + } + + Ok(()) + } + pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result { self.ensure_attached_wait(tenant_id).await?; diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 4ec6fdca67..7970207e27 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -495,6 +495,13 @@ impl TenantState { } } + for node_id in self.observed.locations.keys() { + if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) { + // We have observed state that isn't part of our intent: need to clean it up. + return true; + } + } + // Even if there is no pageserver work to be done, if we have a pending notification to computes, // wake up a reconciler to send it. if self.pending_compute_notification { diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 5c0d008943..f824003d01 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -616,7 +616,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local let tenant_id = get_tenant_id(create_match, env)?; let new_branch_name = create_match .get_one::("branch-name") - .ok_or_else(|| anyhow!("No branch name provided"))?; + .ok_or_else(|| anyhow!("No branch name provided"))?; // TODO let pg_version = create_match .get_one::("pg-version") diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index af3c8018c4..b68ab9fd59 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -344,7 +344,7 @@ impl ThrottleConfig { /// A flattened analog of a `pagesever::tenant::LocationMode`, which /// lists out all possible states (and the virtual "Detached" state) /// in a flat form rather than using rust-style enums. -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] pub enum LocationConfigMode { AttachedSingle, AttachedMulti, @@ -408,6 +408,12 @@ pub struct TenantLocationConfigRequest { pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it } +#[derive(Serialize, Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct TenantTimeTravelRequest { + pub shard_counts: Vec, +} + #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantShardLocation { diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index baea747d3c..969d0d99c0 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -217,6 +217,20 @@ impl Client { } } + pub async fn tenant_time_travel_remote_storage( + &self, + tenant_shard_id: TenantShardId, + timestamp: &str, + done_if_after: &str, + ) -> Result<()> { + let uri = format!( + "{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}", + self.mgmt_api_endpoint + ); + self.request(Method::PUT, &uri, ()).await?; + Ok(()) + } + pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> { let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint); self.request(Method::PUT, &uri, req).await?; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ce5ef66d22..79a4c7cde8 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -512,7 +512,7 @@ class NeonEnvBuilder: def init_start( self, - initial_tenant_conf: Optional[Dict[str, str]] = None, + initial_tenant_conf: Optional[Dict[str, Any]] = None, default_remote_storage_if_missing: bool = True, initial_tenant_shard_count: Optional[int] = None, initial_tenant_shard_stripe_size: Optional[int] = None, @@ -1497,7 +1497,7 @@ class NeonCli(AbstractNeonCli): self, tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None, - conf: Optional[Dict[str, str]] = None, + conf: Optional[Dict[str, Any]] = None, shard_count: Optional[int] = None, shard_stripe_size: Optional[int] = None, set_default: bool = False, diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index d4583308ff..98eb89d30c 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -395,12 +395,20 @@ class PageserverHttpClient(requests.Session, MetricsGetter): tenant_id: Union[TenantId, TenantShardId], timestamp: datetime, done_if_after: datetime, + shard_counts: Optional[List[int]] = None, ): """ Issues a request to perform time travel operations on the remote storage """ + + if shard_counts is None: + shard_counts = [] + body: Dict[str, Any] = { + "shard_counts": shard_counts, + } res = self.put( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z" + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z", + json=body, ) self.verbose_error(res) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 1812eb438d..225cfcd143 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -482,8 +482,8 @@ def tenant_delete_wait_completed( MANY_SMALL_LAYERS_TENANT_CONFIG = { "gc_period": "0s", "compaction_period": "0s", - "checkpoint_distance": f"{1024**2}", - "image_creation_threshold": "100", + "checkpoint_distance": 1024**2, + "image_creation_threshold": 100, } diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index d2334c7776..6525f9733f 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -1,13 +1,30 @@ import time from collections import defaultdict +from datetime import datetime, timezone +from typing import List from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, + PgBin, +) from fixtures.pageserver.http import PageserverHttpClient -from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed +from fixtures.pageserver.utils import ( + MANY_SMALL_LAYERS_TENANT_CONFIG, + enable_remote_storage_versioning, + list_prefix, + remote_storage_delete_key, + tenant_delete_wait_completed, + timeline_delete_wait_completed, +) from fixtures.pg_version import PgVersion +from fixtures.remote_storage import RemoteStorageKind, s3_storage from fixtures.types import TenantId, TimelineId -from fixtures.utils import wait_until +from fixtures.utils import run_pg_bench_small, wait_until +from mypy_boto3_s3.type_defs import ( + ObjectTypeDef, +) from pytest_httpserver import HTTPServer from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response @@ -457,3 +474,113 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder): # Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're # meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind. env.attachment_service.consistency_check() + + +def test_sharding_service_s3_time_travel_recovery( + neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, +): + """ + Test for S3 time travel + """ + + remote_storage_kind = s3_storage() + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + + # Mock S3 doesn't have versioning enabled by default, enable it + # (also do it before there is any writes to the bucket) + if remote_storage_kind == RemoteStorageKind.MOCK_S3: + remote_storage = neon_env_builder.pageserver_remote_storage + assert remote_storage, "remote storage not configured" + enable_remote_storage_versioning(remote_storage) + + neon_env_builder.num_pageservers = 1 + + env = neon_env_builder.init_start() + virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True) + + tenant_id = TenantId.generate() + env.attachment_service.tenant_create( + tenant_id, + shard_count=2, + shard_stripe_size=8192, + tenant_config=MANY_SMALL_LAYERS_TENANT_CONFIG, + ) + + # Check that the consistency check passes + env.attachment_service.consistency_check() + + branch_name = "main" + timeline_id = env.neon_cli.create_timeline( + branch_name, + tenant_id=tenant_id, + ) + # Write some nontrivial amount of data into the endpoint and wait until it is uploaded + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + run_pg_bench_small(pg_bin, endpoint.connstr()) + endpoint.safe_psql("CREATE TABLE created_foo(id integer);") + # last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + + # Give the data time to be uploaded + time.sleep(4) + + # Detach the tenant + virtual_ps_http.tenant_location_conf( + tenant_id, + { + "mode": "Detached", + "secondary_conf": None, + "tenant_conf": {}, + "generation": None, + }, + ) + + time.sleep(4) + ts_before_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None) + time.sleep(4) + + # Simulate a "disaster": delete some random files from remote storage for one of the shards + assert env.pageserver_remote_storage + shard_id_for_list = "0002" + objects: List[ObjectTypeDef] = list_prefix( + env.pageserver_remote_storage, + f"tenants/{tenant_id}-{shard_id_for_list}/timelines/{timeline_id}/", + ).get("Contents", []) + assert len(objects) > 1 + log.info(f"Found {len(objects)} objects in remote storage") + should_delete = False + for obj in objects: + obj_key = obj["Key"] + should_delete = not should_delete + if not should_delete: + log.info(f"Keeping key on remote storage: {obj_key}") + continue + log.info(f"Deleting key from remote storage: {obj_key}") + remote_storage_delete_key(env.pageserver_remote_storage, obj_key) + pass + + time.sleep(4) + ts_after_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None) + time.sleep(4) + + # Do time travel recovery + virtual_ps_http.tenant_time_travel_remote_storage( + tenant_id, ts_before_disaster, ts_after_disaster, shard_counts=[2] + ) + time.sleep(4) + + # Attach the tenant again + virtual_ps_http.tenant_location_conf( + tenant_id, + { + "mode": "AttachedSingle", + "secondary_conf": None, + "tenant_conf": {}, + "generation": 100, + }, + ) + + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + endpoint.safe_psql("SELECT * FROM created_foo;") + + env.attachment_service.consistency_check()