mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 20:40:37 +00:00
pageserver: fixes + test updates for sharding (#6186)
This is a precursor to: - https://github.com/neondatabase/neon/pull/6185 While that PR contains big changes to neon_local and attachment_service, this PR contains a few unrelated standalone changes generated while working on that branch: - Fix restarting a pageserver when it contains multiple shards for the same tenant - When using location_config api to attach a tenant, create its timelines dir - Update test paths where generations were previously optional to make them always-on: this avoids tests having to spuriously assert that attachment_service is not None in order to make the linter happy. - Add a TenantShardId python implementation for subsequent use in test helpers that will be made shard-aware - Teach scrubber to read across shards when checking for layer existence: this is a refactor to track the list of existent layers at tenant-level rather than locally to each timeline. This is a precursor to testing shard splitting.
This commit is contained in:
@@ -159,7 +159,7 @@ impl From<[u8; 18]> for TenantShardId {
|
||||
/// shard we're dealing with, but do not need to know the full ShardIdentity (because
|
||||
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified
|
||||
/// TenantShardId.
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
|
||||
pub struct ShardIndex {
|
||||
pub shard_number: ShardNumber,
|
||||
pub shard_count: ShardCount,
|
||||
|
||||
@@ -514,10 +514,7 @@ pub async fn init_tenant_mgr(
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(
|
||||
TenantShardId::unsharded(tenant.tenant_id()),
|
||||
TenantSlot::Attached(tenant),
|
||||
);
|
||||
tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
|
||||
@@ -962,35 +959,27 @@ impl TenantManager {
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
// Directory structure is the same for attached and secondary modes:
|
||||
// create it if it doesn't exist. Timeline load/creation expects the
|
||||
// timelines/ subdir to already exist.
|
||||
//
|
||||
// Does not need to be fsync'd because local storage is just a cache.
|
||||
tokio::fs::create_dir_all(&timelines_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
// Before activating either secondary or attached mode, persist the
|
||||
// configuration, so that on restart we will re-attach (or re-start
|
||||
// secondary) on the tenant.
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
TenantSlot::Secondary
|
||||
}
|
||||
LocationMode::Secondary(_) => TenantSlot::Secondary,
|
||||
LocationMode::Attached(_attach_config) => {
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let shard_identity = new_location_config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::cloud_admin_api::BranchData;
|
||||
use crate::metadata_stream::stream_listing;
|
||||
@@ -40,7 +43,7 @@ impl TimelineAnalysis {
|
||||
|
||||
pub(crate) fn branch_cleanup_and_check_errors(
|
||||
id: &TenantShardTimelineId,
|
||||
s3_root: &RootTarget,
|
||||
tenant_objects: &mut TenantObjectListing,
|
||||
s3_active_branch: Option<&BranchData>,
|
||||
console_branch: Option<BranchData>,
|
||||
s3_data: Option<S3TimelineBlobData>,
|
||||
@@ -72,8 +75,8 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
match s3_data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
mut s3_layers,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} => {
|
||||
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
|
||||
result.errors.push(format!(
|
||||
@@ -111,65 +114,19 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
))
|
||||
}
|
||||
|
||||
let layer_map_key = (layer, metadata.generation);
|
||||
if !s3_layers.remove(&layer_map_key) {
|
||||
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
|
||||
// FIXME: this will emit false positives if an index was
|
||||
// uploaded concurrently with our scan. To make this check
|
||||
// correct, we need to try sending a HEAD request for the
|
||||
// layer we think is missing.
|
||||
result.errors.push(format!(
|
||||
"index_part.json contains a layer {}{} that is not present in remote storage",
|
||||
layer_map_key.0.file_name(),
|
||||
layer_map_key.1.get_suffix()
|
||||
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
|
||||
layer.file_name(),
|
||||
metadata.generation.get_suffix(),
|
||||
metadata.shard
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
let orphan_layers: Vec<(LayerFileName, Generation)> = s3_layers
|
||||
.into_iter()
|
||||
.filter(|(_layer_name, gen)|
|
||||
// A layer is only considered orphaned if it has a generation below
|
||||
// the index. If the generation is >= the index, then the layer may
|
||||
// be an upload from a running pageserver, or even an upload from
|
||||
// a new generation that didn't upload an index yet.
|
||||
//
|
||||
// Even so, a layer that is not referenced by the index could just
|
||||
// be something enqueued for deletion, so while this check is valid
|
||||
// for indicating that a layer is garbage, it is not an indicator
|
||||
// of a problem.
|
||||
gen < &index_part_generation)
|
||||
.collect();
|
||||
|
||||
if !orphan_layers.is_empty() {
|
||||
// An orphan layer is not an error: it's arguably not even a warning, but it is helpful to report
|
||||
// these as a hint that there is something worth cleaning up here.
|
||||
result.warnings.push(format!(
|
||||
"index_part.json does not contain layers from S3: {:?}",
|
||||
orphan_layers
|
||||
.iter()
|
||||
.map(|(layer_name, gen)| format!(
|
||||
"{}{}",
|
||||
layer_name.file_name(),
|
||||
gen.get_suffix()
|
||||
))
|
||||
.collect::<Vec<_>>(),
|
||||
));
|
||||
result.garbage_keys.extend(orphan_layers.iter().map(
|
||||
|(layer_name, layer_gen)| {
|
||||
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
|
||||
let delimiter = s3_root.delimiter();
|
||||
if !key.ends_with(delimiter) {
|
||||
key.push_str(delimiter);
|
||||
}
|
||||
key.push_str(&format!(
|
||||
"{}{}",
|
||||
&layer_name.file_name(),
|
||||
layer_gen.get_suffix()
|
||||
));
|
||||
key
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
BlobDataParseResult::Relic => {}
|
||||
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
|
||||
@@ -204,6 +161,83 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
result
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct LayerRef {
|
||||
ref_count: usize,
|
||||
}
|
||||
|
||||
/// Top-level index of objects in a tenant. This may be used by any shard-timeline within
|
||||
/// the tenant to query whether an object exists.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TenantObjectListing {
|
||||
shard_timelines:
|
||||
HashMap<(ShardIndex, TimelineId), HashMap<(LayerFileName, Generation), LayerRef>>,
|
||||
}
|
||||
|
||||
impl TenantObjectListing {
|
||||
/// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
|
||||
/// list of layer keys for the Tenant.
|
||||
pub(crate) fn push(
|
||||
&mut self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layers: HashSet<(LayerFileName, Generation)>,
|
||||
) {
|
||||
let shard_index = ShardIndex::new(
|
||||
ttid.tenant_shard_id.shard_number,
|
||||
ttid.tenant_shard_id.shard_count,
|
||||
);
|
||||
let replaced = self.shard_timelines.insert(
|
||||
(shard_index, ttid.timeline_id),
|
||||
layers
|
||||
.into_iter()
|
||||
.map(|l| (l, LayerRef::default()))
|
||||
.collect(),
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"Built from an S3 object listing, which should never repeat a key"
|
||||
);
|
||||
}
|
||||
|
||||
/// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
|
||||
/// the layer's refcount will be incremented. Later, after calling this for all references in all indices
|
||||
/// in a tenant, orphan layers may be detected by their zero refcounts.
|
||||
///
|
||||
/// Returns true if the layer exists
|
||||
pub(crate) fn check_ref(
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
layer_file: &LayerFileName,
|
||||
metadata: &IndexLayerMetadata,
|
||||
) -> bool {
|
||||
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
layer_ref.ref_count += 1;
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerFileName, Generation)> {
|
||||
let mut result = Vec::new();
|
||||
for ((shard_index, timeline_id), layers) in &self.shard_timelines {
|
||||
for ((layer_file, generation), layer_ref) in layers {
|
||||
if layer_ref.ref_count == 0 {
|
||||
result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct S3TimelineBlobData {
|
||||
pub(crate) blob_data: BlobDataParseResult,
|
||||
|
||||
@@ -2,22 +2,25 @@ use std::collections::{HashMap, HashSet};
|
||||
|
||||
use crate::checks::{
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
|
||||
TimelineAnalysis,
|
||||
TenantObjectListing, TimelineAnalysis,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{pin_mut, StreamExt, TryStreamExt};
|
||||
use histogram::Histogram;
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MetadataSummary {
|
||||
count: usize,
|
||||
with_errors: HashSet<TenantShardTimelineId>,
|
||||
with_warnings: HashSet<TenantShardTimelineId>,
|
||||
with_garbage: HashSet<TenantShardTimelineId>,
|
||||
with_orphans: HashSet<TenantShardTimelineId>,
|
||||
indices_by_version: HashMap<usize, usize>,
|
||||
|
||||
layer_count: MinMaxHisto,
|
||||
@@ -87,7 +90,7 @@ impl MetadataSummary {
|
||||
count: 0,
|
||||
with_errors: HashSet::new(),
|
||||
with_warnings: HashSet::new(),
|
||||
with_garbage: HashSet::new(),
|
||||
with_orphans: HashSet::new(),
|
||||
indices_by_version: HashMap::new(),
|
||||
layer_count: MinMaxHisto::new(),
|
||||
timeline_size_bytes: MinMaxHisto::new(),
|
||||
@@ -141,6 +144,10 @@ impl MetadataSummary {
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
|
||||
self.with_orphans.insert(*ttid);
|
||||
}
|
||||
|
||||
/// Long-form output for printing at end of a scan
|
||||
pub fn summary_string(&self) -> String {
|
||||
let version_summary: String = itertools::join(
|
||||
@@ -154,7 +161,7 @@ impl MetadataSummary {
|
||||
"Timelines: {0}
|
||||
With errors: {1}
|
||||
With warnings: {2}
|
||||
With garbage: {3}
|
||||
With orphan layers: {3}
|
||||
Index versions: {version_summary}
|
||||
Timeline size bytes: {4}
|
||||
Layer size bytes: {5}
|
||||
@@ -163,7 +170,7 @@ Timeline layer count: {6}
|
||||
self.count,
|
||||
self.with_errors.len(),
|
||||
self.with_warnings.len(),
|
||||
self.with_garbage.len(),
|
||||
self.with_orphans.len(),
|
||||
self.timeline_size_bytes.oneline(),
|
||||
self.layer_size_bytes.oneline(),
|
||||
self.layer_count.oneline(),
|
||||
@@ -191,7 +198,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
@@ -204,17 +211,118 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||
|
||||
let mut tenant_id = None;
|
||||
let mut tenant_objects = TenantObjectListing::default();
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
fn analyze_tenant(
|
||||
tenant_id: TenantId,
|
||||
summary: &mut MetadataSummary,
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
|
||||
) {
|
||||
let mut timeline_generations = HashMap::new();
|
||||
for (ttid, data) in timelines {
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
timeline_generations.insert(ttid, *index_part_generation);
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis =
|
||||
branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
|
||||
// Identifying orphan layers must be done on a tenant-wide basis, because individual
|
||||
// shards' layers may be referenced by other shards.
|
||||
//
|
||||
// Orphan layers are not a corruption, and not an indication of a problem. They are just
|
||||
// consuming some space in remote storage, and may be cleaned up at leisure.
|
||||
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
|
||||
let ttid = TenantShardTimelineId {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: shard_index.shard_count,
|
||||
shard_number: shard_index.shard_number,
|
||||
},
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
if let Some(timeline_generation) = timeline_generations.get(&ttid) {
|
||||
if &generation >= timeline_generation {
|
||||
// Candidate orphan layer is in the current or future generation relative
|
||||
// to the index we read for this timeline shard, so its absence from the index
|
||||
// doesn't make it an orphan: more likely, it is a case where the layer was
|
||||
// uploaded, but the index referencing the layer wasn't written yet.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let orphan_path = remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
shard_index,
|
||||
&layer_file,
|
||||
generation,
|
||||
);
|
||||
|
||||
tracing::info!("Orphan layer detected: {orphan_path}");
|
||||
|
||||
summary.notify_timeline_orphan(&ttid);
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate through all the timeline results. These are in key-order, so
|
||||
// all results for the same tenant will be adjacent. We accumulate these,
|
||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||
let mut summary = MetadataSummary::new();
|
||||
pin_mut!(timelines);
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
summary.update_data(&data);
|
||||
|
||||
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
|
||||
match tenant_id {
|
||||
None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
analyze_tenant(
|
||||
tenant_id.expect("Must be set if results are present"),
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
tenant_timeline_results,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
|
||||
@@ -457,7 +457,6 @@ class NeonEnvBuilder:
|
||||
self.preserve_database_files = preserve_database_files
|
||||
self.initial_tenant = initial_tenant or TenantId.generate()
|
||||
self.initial_timeline = initial_timeline or TimelineId.generate()
|
||||
self.enable_generations = True
|
||||
self.scrub_on_exit = False
|
||||
self.test_output_dir = test_output_dir
|
||||
|
||||
@@ -677,8 +676,7 @@ class NeonEnvBuilder:
|
||||
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.stop(immediate=True)
|
||||
self.env.attachment_service.stop(immediate=True)
|
||||
|
||||
cleanup_error = None
|
||||
|
||||
@@ -772,13 +770,9 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
if config.enable_generations:
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
|
||||
else:
|
||||
self.control_plane_api = None
|
||||
self.attachment_service = None
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: NeonAttachmentService = NeonAttachmentService(self)
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
cfg: Dict[str, Any] = {
|
||||
@@ -851,8 +845,7 @@ class NeonEnv:
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
self.broker.try_start()
|
||||
|
||||
if self.attachment_service is not None:
|
||||
self.attachment_service.start()
|
||||
self.attachment_service.start()
|
||||
|
||||
for pageserver in self.pageservers:
|
||||
pageserver.start()
|
||||
@@ -1834,20 +1827,19 @@ class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
client = self.http_client()
|
||||
return client.tenant_attach(
|
||||
tenant_id, config, config_null, generation=self.maybe_get_generation(tenant_id)
|
||||
tenant_id,
|
||||
config,
|
||||
config_null,
|
||||
generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id),
|
||||
)
|
||||
|
||||
def tenant_detach(self, tenant_id: TenantId):
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_detach(tenant_id)
|
||||
|
||||
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
|
||||
# This API is only for use when generations are enabled
|
||||
assert self.env.attachment_service is not None
|
||||
|
||||
if config["mode"].startswith("Attached") and "generation" not in config:
|
||||
config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
|
||||
@@ -1873,26 +1865,15 @@ class NeonPageserver(PgProtocol):
|
||||
generation: Optional[int] = None,
|
||||
) -> TenantId:
|
||||
if generation is None:
|
||||
generation = self.maybe_get_generation(tenant_id)
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
client = self.http_client(auth_token=auth_token)
|
||||
return client.tenant_create(tenant_id, conf, generation=generation)
|
||||
|
||||
def tenant_load(self, tenant_id: TenantId):
|
||||
client = self.http_client()
|
||||
return client.tenant_load(tenant_id, generation=self.maybe_get_generation(tenant_id))
|
||||
|
||||
def maybe_get_generation(self, tenant_id: TenantId):
|
||||
"""
|
||||
For tests that would like to use an HTTP client directly instead of using
|
||||
the `tenant_attach` and `tenant_create` helpers here: issue a generation
|
||||
number for a tenant.
|
||||
|
||||
Returns None if the attachment service is not enabled (legacy mode)
|
||||
"""
|
||||
if self.env.attachment_service is not None:
|
||||
return self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
else:
|
||||
return None
|
||||
return client.tenant_load(
|
||||
tenant_id, generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
)
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(
|
||||
|
||||
@@ -125,3 +125,51 @@ class TenantId(Id):
|
||||
class TimelineId(Id):
|
||||
def __repr__(self) -> str:
|
||||
return f'TimelineId("{self.id.hex()}")'
|
||||
|
||||
|
||||
# Workaround for compat with python 3.9, which does not have `typing.Self`
|
||||
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
|
||||
|
||||
|
||||
class TenantShardId:
|
||||
def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int):
|
||||
self.tenant_id = tenant_id
|
||||
self.shard_number = shard_number
|
||||
self.shard_count = shard_count
|
||||
assert self.shard_number < self.shard_count or self.shard_count == 0
|
||||
|
||||
@classmethod
|
||||
def parse(cls: Type[TTenantShardId], input) -> TTenantShardId:
|
||||
if len(input) == 32:
|
||||
return cls(
|
||||
tenant_id=TenantId(input),
|
||||
shard_number=0,
|
||||
shard_count=0,
|
||||
)
|
||||
elif len(input) == 37:
|
||||
return cls(
|
||||
tenant_id=TenantId(input[0:32]),
|
||||
shard_number=int(input[33:35], 16),
|
||||
shard_count=int(input[35:37], 16),
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Invalid TenantShardId '{input}'")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
|
||||
def _tuple(self) -> tuple[TenantId, int, int]:
|
||||
return (self.tenant_id, self.shard_number, self.shard_count)
|
||||
|
||||
def __lt__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() < other._tuple()
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() == other._tuple()
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self._tuple())
|
||||
|
||||
@@ -61,7 +61,6 @@ def measure_recovery_time(env: NeonCompare):
|
||||
# of view, but the same as far as the safekeeper/WAL is concerned. To work around that,
|
||||
# we will explicitly create the tenant in the same generation that it was previously
|
||||
# attached in.
|
||||
assert env.env.attachment_service is not None
|
||||
attach_status = env.env.attachment_service.inspect(tenant_id=env.tenant)
|
||||
assert attach_status is not None
|
||||
(attach_gen, _) = attach_status
|
||||
|
||||
@@ -136,10 +136,7 @@ def test_no_config(positive_env: NeonEnv, content_type: Optional[str]):
|
||||
ps_http.tenant_detach(tenant_id)
|
||||
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
|
||||
|
||||
body = {}
|
||||
gen = env.pageserver.maybe_get_generation(tenant_id)
|
||||
if gen is not None:
|
||||
body["generation"] = gen
|
||||
body = {"generation": env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)}
|
||||
|
||||
ps_http.post(
|
||||
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
|
||||
|
||||
@@ -87,7 +87,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
#
|
||||
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
|
||||
# about to start as the attached pageserver
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
|
||||
env.pageservers[0].start()
|
||||
env.pageservers[1].stop()
|
||||
|
||||
@@ -157,7 +157,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
|
||||
|
||||
def get_generation_number():
|
||||
assert env.attachment_service is not None
|
||||
attachment = env.attachment_service.inspect(tenant_id)
|
||||
assert attachment is not None
|
||||
return attachment[0]
|
||||
|
||||
@@ -72,7 +72,9 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
|
||||
# create new tenant and check it is also there
|
||||
tenant_id = TenantId.generate()
|
||||
client.tenant_create(tenant_id, generation=env.pageserver.maybe_get_generation(tenant_id))
|
||||
client.tenant_create(
|
||||
tenant_id, generation=env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
)
|
||||
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}
|
||||
|
||||
timelines = client.timeline_list(tenant_id)
|
||||
|
||||
@@ -187,7 +187,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
- After upgrade, the bucket should contain a mixture.
|
||||
- In both cases, postgres I/O should work.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -196,7 +195,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env.broker.try_start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.start()
|
||||
|
||||
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
|
||||
@@ -262,12 +260,10 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
some_other_pageserver = 1234
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -341,7 +337,6 @@ def test_deletion_queue_recovery(
|
||||
:param validate_before: whether to wait for deletions to be validated before restart. This
|
||||
makes them elegible to be executed after restart, if the same node keeps the attachment.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -405,7 +400,6 @@ def test_deletion_queue_recovery(
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
env.pageserver.start()
|
||||
@@ -453,7 +447,6 @@ def test_deletion_queue_recovery(
|
||||
|
||||
|
||||
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -473,7 +466,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
)
|
||||
|
||||
# Simulate a major incident: the control plane goes offline
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.stop()
|
||||
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
@@ -545,7 +537,6 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
|
||||
and must be constructed using the proper generation for the layer, which may not be the same generation
|
||||
that the tenant is running in.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -575,7 +566,6 @@ def test_multi_attach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
|
||||
@@ -9,9 +9,7 @@ from fixtures.utils import wait_until
|
||||
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
@pytest.mark.parametrize("generations", [True, False])
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool):
|
||||
neon_env_builder.enable_generations = generations
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.enable_scrub_on_exit()
|
||||
|
||||
|
||||
@@ -57,13 +57,11 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
|
||||
states are valid, so that we may test it in this way: the API should always
|
||||
work as long as the tenant exists.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
pageservers = env.pageservers
|
||||
list([p.http_client() for p in pageservers])
|
||||
@@ -210,13 +208,11 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test the sequence of location states that are used in a live migration.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -60,8 +60,6 @@ def test_remote_storage_backup_and_restore(
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
neon_env_builder.enable_generations = generations
|
||||
|
||||
# Exercise retry code path by making all uploads and downloads fail for the
|
||||
# first time. The retries print INFO-messages to the log; we will check
|
||||
# that they are present after the test.
|
||||
|
||||
Reference in New Issue
Block a user