From 4d9036bf1f3fec2a2285bf5ced349195c36c56f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 25 Oct 2024 12:32:46 +0200 Subject: [PATCH] Support offloaded timelines during shard split (#9489) Before, we didn't copy over the `index-part.json` of offloaded timelines to the new shard's location, resulting in the new shard not knowing the timeline even exists. In #9444, we copy over the manifest, but we also need to do this for `index-part.json`. As the operations to do are mostly the same between offloaded and non-offloaded timelines, we can iterate over all of them in the same loop, after the introduction of a `TimelineOrOffloadedArcRef` type to generalize over the two cases. This is analogous to the deletion code added in #8907. The added test also ensures that the sharded archival config endpoint works, something that has not yet been ensured by tests. Part of #8088 --- pageserver/src/tenant.rs | 113 ++++++++++---- .../src/tenant/remote_timeline_client.rs | 6 +- test_runner/fixtures/neon_fixtures.py | 27 +++- test_runner/fixtures/pageserver/http.py | 25 +++ test_runner/regress/test_sharding.py | 143 +++++++++++++++++- 5 files changed, 283 insertions(+), 31 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d8ce916bcb..968d093a80 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -597,17 +597,21 @@ pub enum TimelineOrOffloaded { } impl TimelineOrOffloaded { - pub fn tenant_shard_id(&self) -> TenantShardId { + pub fn arc_ref(&self) -> TimelineOrOffloadedArcRef<'_> { match self { - TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id, - TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id, + TimelineOrOffloaded::Timeline(timeline) => { + TimelineOrOffloadedArcRef::Timeline(timeline) + } + TimelineOrOffloaded::Offloaded(offloaded) => { + TimelineOrOffloadedArcRef::Offloaded(offloaded) + } } } + pub fn tenant_shard_id(&self) -> TenantShardId { + self.arc_ref().tenant_shard_id() + } pub fn timeline_id(&self) -> TimelineId { - match self { - TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id, - TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id, - } + self.arc_ref().timeline_id() } pub fn delete_progress(&self) -> &Arc> { match self { @@ -615,7 +619,7 @@ impl TimelineOrOffloaded { TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress, } } - pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc { + fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc { match self { TimelineOrOffloaded::Timeline(timeline) => timeline.remote_client.clone(), TimelineOrOffloaded::Offloaded(offloaded) => match offloaded.remote_client.clone() { @@ -632,6 +636,38 @@ impl TimelineOrOffloaded { } } +pub enum TimelineOrOffloadedArcRef<'a> { + Timeline(&'a Arc), + Offloaded(&'a Arc), +} + +impl TimelineOrOffloadedArcRef<'_> { + pub fn tenant_shard_id(&self) -> TenantShardId { + match self { + TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id, + TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id, + } + } + pub fn timeline_id(&self) -> TimelineId { + match self { + TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id, + TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id, + } + } +} + +impl<'a> From<&'a Arc> for TimelineOrOffloadedArcRef<'a> { + fn from(timeline: &'a Arc) -> Self { + Self::Timeline(timeline) + } +} + +impl<'a> From<&'a Arc> for TimelineOrOffloadedArcRef<'a> { + fn from(timeline: &'a Arc) -> Self { + Self::Offloaded(timeline) + } +} + #[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum GetTimelineError { #[error("Timeline is shutting down")] @@ -2940,33 +2976,58 @@ impl Tenant { &self, child_shards: &Vec, ) -> anyhow::Result<()> { - let timelines = self.timelines.lock().unwrap().clone(); - for timeline in timelines.values() { + let (timelines, offloaded) = { + let timelines = self.timelines.lock().unwrap(); + let offloaded = self.timelines_offloaded.lock().unwrap(); + (timelines.clone(), offloaded.clone()) + }; + let timelines_iter = timelines + .values() + .map(TimelineOrOffloadedArcRef::<'_>::from) + .chain( + offloaded + .values() + .map(TimelineOrOffloadedArcRef::<'_>::from), + ); + for timeline in timelines_iter { // We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels // to ensure that they do not start a split if currently in the process of doing these. - // Upload an index from the parent: this is partly to provide freshness for the - // child tenants that will copy it, and partly for general ease-of-debugging: there will - // always be a parent shard index in the same generation as we wrote the child shard index. - tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index"); - timeline - .remote_client - .schedule_index_upload_for_file_changes()?; - timeline.remote_client.wait_completion().await?; + let timeline_id = timeline.timeline_id(); + + if let TimelineOrOffloadedArcRef::Timeline(timeline) = timeline { + // Upload an index from the parent: this is partly to provide freshness for the + // child tenants that will copy it, and partly for general ease-of-debugging: there will + // always be a parent shard index in the same generation as we wrote the child shard index. + tracing::info!(%timeline_id, "Uploading index"); + timeline + .remote_client + .schedule_index_upload_for_file_changes()?; + timeline.remote_client.wait_completion().await?; + } + + let remote_client = match timeline { + TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.remote_client.clone(), + TimelineOrOffloadedArcRef::Offloaded(offloaded) => { + let remote_client = self + .build_timeline_client(offloaded.timeline_id, self.remote_storage.clone()); + Arc::new(remote_client) + } + }; // Shut down the timeline's remote client: this means that the indices we write // for child shards will not be invalidated by the parent shard deleting layers. - tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client"); - timeline.remote_client.shutdown().await; + tracing::info!(%timeline_id, "Shutting down remote storage client"); + remote_client.shutdown().await; // Download methods can still be used after shutdown, as they don't flow through the remote client's // queue. In principal the RemoteTimelineClient could provide this without downloading it, but this // operation is rare, so it's simpler to just download it (and robustly guarantees that the index // we use here really is the remotely persistent one). - tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent"); - let result = timeline.remote_client + tracing::info!(%timeline_id, "Downloading index_part from parent"); + let result = remote_client .download_index_file(&self.cancel) - .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id)) + .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id)) .await?; let index_part = match result { MaybeDeletedIndexPart::Deleted(_) => { @@ -2976,11 +3037,11 @@ impl Tenant { }; for child_shard in child_shards { - tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index()); + tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index()); upload_index_part( &self.remote_storage, child_shard, - &timeline.timeline_id, + &timeline_id, self.generation, &index_part, &self.cancel, @@ -2989,8 +3050,6 @@ impl Tenant { } } - // TODO: also copy index files of offloaded timelines - let tenant_manifest = self.tenant_manifest(); // TODO: generation support let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 066fd12a9a..1c72c7fff8 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1278,10 +1278,14 @@ impl RemoteTimelineClient { let fut = { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = match &mut *guard { - UploadQueue::Stopped(_) => return, + UploadQueue::Stopped(_) => { + scopeguard::ScopeGuard::into_inner(sg); + return; + } UploadQueue::Uninitialized => { // transition into Stopped state self.stop_impl(&mut guard); + scopeguard::ScopeGuard::into_inner(sg); return; } UploadQueue::Initialized(ref mut init) => init, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 747c2c0d63..a1ea056213 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -44,7 +44,14 @@ from urllib3.util.retry import Retry from fixtures import overlayfs from fixtures.auth_tokens import AuthKeys, TokenScope -from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId +from fixtures.common_types import ( + Lsn, + NodeId, + TenantId, + TenantShardId, + TimelineArchivalState, + TimelineId, +) from fixtures.endpoint.http import EndpointHttpClient from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics @@ -2132,6 +2139,24 @@ class NeonStorageController(MetricsGetter, LogUtils): response.raise_for_status() return response.json() + def timeline_archival_config( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + state: TimelineArchivalState, + ): + config = {"state": state.value} + log.info( + f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}" + ) + res = self.request( + "PUT", + f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config", + json=config, + headers=self.headers(TokenScope.ADMIN), + ) + return res.json() + def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]): if isinstance(config_strings, tuple): pairs = [config_strings] diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index db83c3ec89..706bc550e5 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -142,6 +142,19 @@ class TenantConfig: ) +@dataclass +class TimelinesInfoAndOffloaded: + timelines: list[dict[str, Any]] + offloaded: list[dict[str, Any]] + + @classmethod + def from_json(cls, d: dict[str, Any]) -> TimelinesInfoAndOffloaded: + return TimelinesInfoAndOffloaded( + timelines=d["timelines"], + offloaded=d["offloaded"], + ) + + class PageserverHttpClient(requests.Session, MetricsGetter): def __init__( self, @@ -464,6 +477,18 @@ class PageserverHttpClient(requests.Session, MetricsGetter): assert isinstance(res_json, list) return res_json + def timeline_and_offloaded_list( + self, + tenant_id: Union[TenantId, TenantShardId], + ) -> TimelinesInfoAndOffloaded: + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline_and_offloaded", + ) + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, dict) + return TimelinesInfoAndOffloaded.from_json(res_json) + def timeline_create( self, pg_version: PgVersion, diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index b1abcaa763..6c2a059098 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -3,11 +3,11 @@ from __future__ import annotations import os import time from collections import defaultdict -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import pytest import requests -from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId +from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId from fixtures.compute_reconfigure import ComputeReconfigure from fixtures.log_helper import log from fixtures.neon_fixtures import ( @@ -353,6 +353,145 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: workload.validate() +def test_sharding_split_offloading(neon_env_builder: NeonEnvBuilder): + """ + Test that during a split, we don't miss archived and offloaded timelines. + """ + + TENANT_CONF = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": 128 * 1024, + "compaction_threshold": 1, + "compaction_target_size": 128 * 1024, + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "3600s", + # disable background compaction, GC and offloading. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # Disable automatic creation of image layers, as we will create them explicitly when we want them + "image_creation_threshold": 9999, + "image_layer_creation_check_threshold": 0, + "lsn_lease_length": "0s", + } + + neon_env_builder.storage_controller_config = { + # Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts. + "max_offline": "30s", + "max_warming_up": "300s", + } + + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + tenant_id = env.initial_tenant + timeline_id_main = env.initial_timeline + + # Check that we created with an unsharded TenantShardId: this is the default, + # but check it in case we change the default in future + assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None + + workload_main = Workload(env, tenant_id, timeline_id_main, branch_name="main") + workload_main.init() + workload_main.write_rows(256) + workload_main.validate() + workload_main.stop() + + # Create two timelines, archive one, offload the other + timeline_id_archived = env.create_branch("archived_not_offloaded") + timeline_id_offloaded = env.create_branch("archived_offloaded") + + def timeline_id_set_for(list: list[dict[str, Any]]) -> set[TimelineId]: + return set( + map( + lambda t: TimelineId(t["timeline_id"]), + list, + ) + ) + + expected_offloaded_set = {timeline_id_offloaded} + expected_timeline_set = {timeline_id_main, timeline_id_archived} + + with env.get_tenant_pageserver(tenant_id).http_client() as http_client: + http_client.timeline_archival_config( + tenant_id, timeline_id_archived, TimelineArchivalState.ARCHIVED + ) + http_client.timeline_archival_config( + tenant_id, timeline_id_offloaded, TimelineArchivalState.ARCHIVED + ) + http_client.timeline_offload(tenant_id, timeline_id_offloaded) + list = http_client.timeline_and_offloaded_list(tenant_id) + assert timeline_id_set_for(list.offloaded) == expected_offloaded_set + assert timeline_id_set_for(list.timelines) == expected_timeline_set + + # Do a full image layer generation before splitting + http_client.timeline_checkpoint( + tenant_id, timeline_id_main, force_image_layer_creation=True, wait_until_uploaded=True + ) + + # Split one shard into two + shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2) + + # Let all shards move into their stable locations, so that during subsequent steps we + # don't have reconciles in progress (simpler to reason about what messages we expect in logs) + env.storage_controller.reconcile_until_idle() + + # Check we got the shard IDs we expected + assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None + assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None + + workload_main.validate() + workload_main.stop() + + env.storage_controller.consistency_check() + + # Ensure each shard has the same list of timelines and offloaded timelines + for shard in shards: + ps = env.get_tenant_pageserver(shard) + + list = ps.http_client().timeline_and_offloaded_list(shard) + assert timeline_id_set_for(list.offloaded) == expected_offloaded_set + assert timeline_id_set_for(list.timelines) == expected_timeline_set + + ps.http_client().timeline_compact(shard, timeline_id_main) + + # Check that we can still read all the data + workload_main.validate() + + # Force a restart, which requires the state to be persisted. + env.pageserver.stop() + env.pageserver.start() + + # Ensure each shard has the same list of timelines and offloaded timelines + for shard in shards: + ps = env.get_tenant_pageserver(shard) + + list = ps.http_client().timeline_and_offloaded_list(shard) + assert timeline_id_set_for(list.offloaded) == expected_offloaded_set + assert timeline_id_set_for(list.timelines) == expected_timeline_set + + ps.http_client().timeline_compact(shard, timeline_id_main) + + # Compaction shouldn't make anything unreadable + workload_main.validate() + + # Do sharded unarchival + env.storage_controller.timeline_archival_config( + tenant_id, timeline_id_offloaded, TimelineArchivalState.UNARCHIVED + ) + env.storage_controller.timeline_archival_config( + tenant_id, timeline_id_archived, TimelineArchivalState.UNARCHIVED + ) + + for shard in shards: + ps = env.get_tenant_pageserver(shard) + + list = ps.http_client().timeline_and_offloaded_list(shard) + assert timeline_id_set_for(list.offloaded) == set() + assert timeline_id_set_for(list.timelines) == { + timeline_id_main, + timeline_id_archived, + timeline_id_offloaded, + } + + def test_sharding_split_smoke( neon_env_builder: NeonEnvBuilder, ):