mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Support offloaded timelines during shard split (#9489)
Before, we didn't copy over the `index-part.json` of offloaded timelines to the new shard's location, resulting in the new shard not knowing the timeline even exists. In #9444, we copy over the manifest, but we also need to do this for `index-part.json`. As the operations to do are mostly the same between offloaded and non-offloaded timelines, we can iterate over all of them in the same loop, after the introduction of a `TimelineOrOffloadedArcRef` type to generalize over the two cases. This is analogous to the deletion code added in #8907. The added test also ensures that the sharded archival config endpoint works, something that has not yet been ensured by tests. Part of #8088
This commit is contained in:
@@ -597,17 +597,21 @@ pub enum TimelineOrOffloaded {
|
||||
}
|
||||
|
||||
impl TimelineOrOffloaded {
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
pub fn arc_ref(&self) -> TimelineOrOffloadedArcRef<'_> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id,
|
||||
TimelineOrOffloaded::Timeline(timeline) => {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline)
|
||||
}
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => {
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
self.arc_ref().tenant_shard_id()
|
||||
}
|
||||
pub fn timeline_id(&self) -> TimelineId {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id,
|
||||
}
|
||||
self.arc_ref().timeline_id()
|
||||
}
|
||||
pub fn delete_progress(&self) -> &Arc<tokio::sync::Mutex<DeleteTimelineFlow>> {
|
||||
match self {
|
||||
@@ -615,7 +619,7 @@ impl TimelineOrOffloaded {
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
|
||||
}
|
||||
}
|
||||
pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
|
||||
fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.remote_client.clone(),
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => match offloaded.remote_client.clone() {
|
||||
@@ -632,6 +636,38 @@ impl TimelineOrOffloaded {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum TimelineOrOffloadedArcRef<'a> {
|
||||
Timeline(&'a Arc<Timeline>),
|
||||
Offloaded(&'a Arc<OffloadedTimeline>),
|
||||
}
|
||||
|
||||
impl TimelineOrOffloadedArcRef<'_> {
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
match self {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id,
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id,
|
||||
}
|
||||
}
|
||||
pub fn timeline_id(&self) -> TimelineId {
|
||||
match self {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id,
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Arc<Timeline>> for TimelineOrOffloadedArcRef<'a> {
|
||||
fn from(timeline: &'a Arc<Timeline>) -> Self {
|
||||
Self::Timeline(timeline)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Arc<OffloadedTimeline>> for TimelineOrOffloadedArcRef<'a> {
|
||||
fn from(timeline: &'a Arc<OffloadedTimeline>) -> Self {
|
||||
Self::Offloaded(timeline)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum GetTimelineError {
|
||||
#[error("Timeline is shutting down")]
|
||||
@@ -2940,33 +2976,58 @@ impl Tenant {
|
||||
&self,
|
||||
child_shards: &Vec<TenantShardId>,
|
||||
) -> anyhow::Result<()> {
|
||||
let timelines = self.timelines.lock().unwrap().clone();
|
||||
for timeline in timelines.values() {
|
||||
let (timelines, offloaded) = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let offloaded = self.timelines_offloaded.lock().unwrap();
|
||||
(timelines.clone(), offloaded.clone())
|
||||
};
|
||||
let timelines_iter = timelines
|
||||
.values()
|
||||
.map(TimelineOrOffloadedArcRef::<'_>::from)
|
||||
.chain(
|
||||
offloaded
|
||||
.values()
|
||||
.map(TimelineOrOffloadedArcRef::<'_>::from),
|
||||
);
|
||||
for timeline in timelines_iter {
|
||||
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
|
||||
// to ensure that they do not start a split if currently in the process of doing these.
|
||||
|
||||
// Upload an index from the parent: this is partly to provide freshness for the
|
||||
// child tenants that will copy it, and partly for general ease-of-debugging: there will
|
||||
// always be a parent shard index in the same generation as we wrote the child shard index.
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index");
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
let timeline_id = timeline.timeline_id();
|
||||
|
||||
if let TimelineOrOffloadedArcRef::Timeline(timeline) = timeline {
|
||||
// Upload an index from the parent: this is partly to provide freshness for the
|
||||
// child tenants that will copy it, and partly for general ease-of-debugging: there will
|
||||
// always be a parent shard index in the same generation as we wrote the child shard index.
|
||||
tracing::info!(%timeline_id, "Uploading index");
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
}
|
||||
|
||||
let remote_client = match timeline {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.remote_client.clone(),
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => {
|
||||
let remote_client = self
|
||||
.build_timeline_client(offloaded.timeline_id, self.remote_storage.clone());
|
||||
Arc::new(remote_client)
|
||||
}
|
||||
};
|
||||
|
||||
// Shut down the timeline's remote client: this means that the indices we write
|
||||
// for child shards will not be invalidated by the parent shard deleting layers.
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client");
|
||||
timeline.remote_client.shutdown().await;
|
||||
tracing::info!(%timeline_id, "Shutting down remote storage client");
|
||||
remote_client.shutdown().await;
|
||||
|
||||
// Download methods can still be used after shutdown, as they don't flow through the remote client's
|
||||
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
|
||||
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
|
||||
// we use here really is the remotely persistent one).
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent");
|
||||
let result = timeline.remote_client
|
||||
tracing::info!(%timeline_id, "Downloading index_part from parent");
|
||||
let result = remote_client
|
||||
.download_index_file(&self.cancel)
|
||||
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
|
||||
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await?;
|
||||
let index_part = match result {
|
||||
MaybeDeletedIndexPart::Deleted(_) => {
|
||||
@@ -2976,11 +3037,11 @@ impl Tenant {
|
||||
};
|
||||
|
||||
for child_shard in child_shards {
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index());
|
||||
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
|
||||
upload_index_part(
|
||||
&self.remote_storage,
|
||||
child_shard,
|
||||
&timeline.timeline_id,
|
||||
&timeline_id,
|
||||
self.generation,
|
||||
&index_part,
|
||||
&self.cancel,
|
||||
@@ -2989,8 +3050,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: also copy index files of offloaded timelines
|
||||
|
||||
let tenant_manifest = self.tenant_manifest();
|
||||
// TODO: generation support
|
||||
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
|
||||
|
||||
@@ -1278,10 +1278,14 @@ impl RemoteTimelineClient {
|
||||
let fut = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = match &mut *guard {
|
||||
UploadQueue::Stopped(_) => return,
|
||||
UploadQueue::Stopped(_) => {
|
||||
scopeguard::ScopeGuard::into_inner(sg);
|
||||
return;
|
||||
}
|
||||
UploadQueue::Uninitialized => {
|
||||
// transition into Stopped state
|
||||
self.stop_impl(&mut guard);
|
||||
scopeguard::ScopeGuard::into_inner(sg);
|
||||
return;
|
||||
}
|
||||
UploadQueue::Initialized(ref mut init) => init,
|
||||
|
||||
@@ -44,7 +44,14 @@ from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.auth_tokens import AuthKeys, TokenScope
|
||||
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.common_types import (
|
||||
Lsn,
|
||||
NodeId,
|
||||
TenantId,
|
||||
TenantShardId,
|
||||
TimelineArchivalState,
|
||||
TimelineId,
|
||||
)
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
@@ -2132,6 +2139,24 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def timeline_archival_config(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
state: TimelineArchivalState,
|
||||
):
|
||||
config = {"state": state.value}
|
||||
log.info(
|
||||
f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}"
|
||||
)
|
||||
res = self.request(
|
||||
"PUT",
|
||||
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config",
|
||||
json=config,
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return res.json()
|
||||
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
|
||||
@@ -142,6 +142,19 @@ class TenantConfig:
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelinesInfoAndOffloaded:
|
||||
timelines: list[dict[str, Any]]
|
||||
offloaded: list[dict[str, Any]]
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: dict[str, Any]) -> TimelinesInfoAndOffloaded:
|
||||
return TimelinesInfoAndOffloaded(
|
||||
timelines=d["timelines"],
|
||||
offloaded=d["offloaded"],
|
||||
)
|
||||
|
||||
|
||||
class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -464,6 +477,18 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
def timeline_and_offloaded_list(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
) -> TimelinesInfoAndOffloaded:
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline_and_offloaded",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return TimelinesInfoAndOffloaded.from_json(res_json)
|
||||
|
||||
def timeline_create(
|
||||
self,
|
||||
pg_version: PgVersion,
|
||||
|
||||
@@ -3,11 +3,11 @@ from __future__ import annotations
|
||||
import os
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -353,6 +353,145 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_sharding_split_offloading(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that during a split, we don't miss archived and offloaded timelines.
|
||||
"""
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "3600s",
|
||||
# disable background compaction, GC and offloading. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Disable automatic creation of image layers, as we will create them explicitly when we want them
|
||||
"image_creation_threshold": 9999,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
|
||||
"max_offline": "30s",
|
||||
"max_warming_up": "300s",
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id_main = env.initial_timeline
|
||||
|
||||
# Check that we created with an unsharded TenantShardId: this is the default,
|
||||
# but check it in case we change the default in future
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None
|
||||
|
||||
workload_main = Workload(env, tenant_id, timeline_id_main, branch_name="main")
|
||||
workload_main.init()
|
||||
workload_main.write_rows(256)
|
||||
workload_main.validate()
|
||||
workload_main.stop()
|
||||
|
||||
# Create two timelines, archive one, offload the other
|
||||
timeline_id_archived = env.create_branch("archived_not_offloaded")
|
||||
timeline_id_offloaded = env.create_branch("archived_offloaded")
|
||||
|
||||
def timeline_id_set_for(list: list[dict[str, Any]]) -> set[TimelineId]:
|
||||
return set(
|
||||
map(
|
||||
lambda t: TimelineId(t["timeline_id"]),
|
||||
list,
|
||||
)
|
||||
)
|
||||
|
||||
expected_offloaded_set = {timeline_id_offloaded}
|
||||
expected_timeline_set = {timeline_id_main, timeline_id_archived}
|
||||
|
||||
with env.get_tenant_pageserver(tenant_id).http_client() as http_client:
|
||||
http_client.timeline_archival_config(
|
||||
tenant_id, timeline_id_archived, TimelineArchivalState.ARCHIVED
|
||||
)
|
||||
http_client.timeline_archival_config(
|
||||
tenant_id, timeline_id_offloaded, TimelineArchivalState.ARCHIVED
|
||||
)
|
||||
http_client.timeline_offload(tenant_id, timeline_id_offloaded)
|
||||
list = http_client.timeline_and_offloaded_list(tenant_id)
|
||||
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
|
||||
assert timeline_id_set_for(list.timelines) == expected_timeline_set
|
||||
|
||||
# Do a full image layer generation before splitting
|
||||
http_client.timeline_checkpoint(
|
||||
tenant_id, timeline_id_main, force_image_layer_creation=True, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
# Split one shard into two
|
||||
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
|
||||
|
||||
# Let all shards move into their stable locations, so that during subsequent steps we
|
||||
# don't have reconciles in progress (simpler to reason about what messages we expect in logs)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Check we got the shard IDs we expected
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
||||
|
||||
workload_main.validate()
|
||||
workload_main.stop()
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Ensure each shard has the same list of timelines and offloaded timelines
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
list = ps.http_client().timeline_and_offloaded_list(shard)
|
||||
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
|
||||
assert timeline_id_set_for(list.timelines) == expected_timeline_set
|
||||
|
||||
ps.http_client().timeline_compact(shard, timeline_id_main)
|
||||
|
||||
# Check that we can still read all the data
|
||||
workload_main.validate()
|
||||
|
||||
# Force a restart, which requires the state to be persisted.
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Ensure each shard has the same list of timelines and offloaded timelines
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
list = ps.http_client().timeline_and_offloaded_list(shard)
|
||||
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
|
||||
assert timeline_id_set_for(list.timelines) == expected_timeline_set
|
||||
|
||||
ps.http_client().timeline_compact(shard, timeline_id_main)
|
||||
|
||||
# Compaction shouldn't make anything unreadable
|
||||
workload_main.validate()
|
||||
|
||||
# Do sharded unarchival
|
||||
env.storage_controller.timeline_archival_config(
|
||||
tenant_id, timeline_id_offloaded, TimelineArchivalState.UNARCHIVED
|
||||
)
|
||||
env.storage_controller.timeline_archival_config(
|
||||
tenant_id, timeline_id_archived, TimelineArchivalState.UNARCHIVED
|
||||
)
|
||||
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
list = ps.http_client().timeline_and_offloaded_list(shard)
|
||||
assert timeline_id_set_for(list.offloaded) == set()
|
||||
assert timeline_id_set_for(list.timelines) == {
|
||||
timeline_id_main,
|
||||
timeline_id_archived,
|
||||
timeline_id_offloaded,
|
||||
}
|
||||
|
||||
|
||||
def test_sharding_split_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user