Compare commits

...

10 Commits

Author SHA1 Message Date
Arseny Sher
22e9702525 migrate script 2023-12-25 23:04:34 +03:00
John Spray
ac38d3a88c remote_storage: don't count 404s as errors (#6201)
## Problem

Currently a chart of S3 error rate is misleading: it can show errors any
time we are attaching a tenant (probing for index_part generation,
checking for remote delete marker).

Considering 404 successful isn't perfectly elegant, but it enables the
error rate to be used a a more meaningful alert signal: it would
indicate if we were having auth issues, sending bad requests, getting
throttled ,etc.

## Summary of changes

Track 404 requests in the AttemptOutcome::Ok bucket instead of the
AttemptOutcome::Err bucket.
2023-12-20 17:00:29 +00:00
Arthur Petukhovsky
0f56104a61 Make sk_collect_dumps also possible with teleport (#4739)
Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2023-12-20 15:06:55 +00:00
John Spray
f260f1565e pageserver: fixes + test updates for sharding (#6186)
This is a precursor to:
- https://github.com/neondatabase/neon/pull/6185

While that PR contains big changes to neon_local and attachment_service,
this PR contains a few unrelated standalone changes generated while
working on that branch:
- Fix restarting a pageserver when it contains multiple shards for the
same tenant
- When using location_config api to attach a tenant, create its
timelines dir
- Update test paths where generations were previously optional to make
them always-on: this avoids tests having to spuriously assert that
attachment_service is not None in order to make the linter happy.
- Add a TenantShardId python implementation for subsequent use in test
helpers that will be made shard-aware
- Teach scrubber to read across shards when checking for layer
existence: this is a refactor to track the list of existent layers at
tenant-level rather than locally to each timeline. This is a precursor
to testing shard splitting.
2023-12-20 12:26:20 +00:00
Joonas Koivunen
c29df80634 fix(layer): move backoff to spawned task (#5746)
Move the backoff to spawned task as it can still be useful; make the
sleep cancellable.
2023-12-20 10:26:06 +02:00
Em Sharnoff
58dbca6ce3 Bump vm-builder v0.19.0 -> v0.21.0 (#6197)
Only applicable change was neondatabase/autoscaling#650, reducing the
vector scrape interval (inside the VM) from 15 seconds to 1 second.
2023-12-19 23:48:41 +00:00
Arthur Petukhovsky
613906acea Support custom types in broker (#5761)
Old methods are unchanged for backwards compatibility. Added
`SafekeeperDiscoveryRequest` and `SafekeeperDiscoveryResponse` types to
serve as example, and also as a prerequisite for
https://github.com/neondatabase/neon/issues/5471
2023-12-19 17:06:43 +00:00
Christian Schwarz
82809d2ec2 fix metric pageserver_initial_logical_size_start_calculation (#6191)
It wasn't being incremented.

Fixup of

    commit 1c88824ed0
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Fri Dec 1 12:52:59 2023 +0100

        initial logical size calculation: add a bunch of metrics (#5995)
2023-12-19 17:44:49 +01:00
Anastasia Lubennikova
0bd79eb063 Handle role deletion when project has no databases. (#6170)
There is still default 'postgres' database, that may contain objects
owned by the role or some ACLs. We need to reassign objects in this
database too.

## Problem
If customer deleted all databases and then tries to delete role, that
has some non-standard ACLs,
`apply_config` operation will stuck because of failing role deletion.
2023-12-19 16:27:47 +00:00
Konstantin Knizhnik
8ff5387da1 eliminate GCC warning for unchecked result of fread (#6167)
## Problem


GCCproduce warning that bread result is not checked. It doesn't affect
program logic, but better live without warnings.

## Summary of changes

Check read result.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-12-19 18:17:11 +02:00
35 changed files with 986 additions and 296 deletions

View File

@@ -857,7 +857,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.19.0
VM_BUILDER_VERSION: v0.21.0
steps:
- name: Checkout

View File

@@ -370,33 +370,49 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
Ok(())
}
fn reassign_owned_objects_in_one_db(
conf: Config,
role_name: &PgIdent,
db_owner: &PgIdent,
) -> Result<()> {
let mut client = conf.connect(NoTls)?;
// This will reassign all dependent objects to the db owner
let reassign_query = format!(
"REASSIGN OWNED BY {} TO {}",
role_name.pg_quote(),
db_owner.pg_quote()
);
info!(
"reassigning objects owned by '{}' in db '{}' to '{}'",
role_name,
conf.get_dbname().unwrap_or(""),
db_owner
);
client.simple_query(&reassign_query)?;
// This now will only drop privileges of the role
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
client.simple_query(&drop_query)?;
Ok(())
}
// Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
for db in &spec.cluster.databases {
if db.owner != *role_name {
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut client = conf.connect(NoTls)?;
// This will reassign all dependent objects to the db owner
let reassign_query = format!(
"REASSIGN OWNED BY {} TO {}",
role_name.pg_quote(),
db.owner.pg_quote()
);
info!(
"reassigning objects owned by '{}' in db '{}' to '{}'",
role_name, &db.name, &db.owner
);
client.simple_query(&reassign_query)?;
// This now will only drop privileges of the role
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
client.simple_query(&drop_query)?;
reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
}
}
// Also handle case when there are no databases in the spec.
// In this case we need to reassign objects in the default database.
let conf = Config::from_str(connstr)?;
let db_owner = PgIdent::from_str("cloud_admin")?;
reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
Ok(())
}

View File

@@ -159,7 +159,7 @@ impl From<[u8; 18]> for TenantShardId {
/// shard we're dealing with, but do not need to know the full ShardIdentity (because
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified
/// TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,
pub shard_count: ShardCount,

View File

@@ -218,14 +218,6 @@ impl S3Bucket {
let started_at = ScopeGuard::into_inner(started_at);
if get_object.is_err() {
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);
}
match get_object {
Ok(object_output) => {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
@@ -241,11 +233,27 @@ impl S3Bucket {
})
}
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
// an error: we expect to sometimes fetch an object and find it missing,
// e.g. when probing for timeline indices.
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Ok,
started_at,
);
Err(DownloadError::NotFound)
}
Err(e) => Err(DownloadError::Other(
anyhow::Error::new(e).context("download s3 object"),
)),
Err(e) => {
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);
Err(DownloadError::Other(
anyhow::Error::new(e).context("download s3 object"),
))
}
}
}
}

View File

@@ -522,14 +522,18 @@ pub(crate) mod initial_logical_size {
impl StartCalculation {
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["first", circumstances_label]);
self.0
.with_label_values(&["first", circumstances_label])
.inc();
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
}
}
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["retry", circumstances_label]);
self.0
.with_label_values(&["retry", circumstances_label])
.inc();
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
}

View File

@@ -514,10 +514,7 @@ pub async fn init_tenant_mgr(
&ctx,
) {
Ok(tenant) => {
tenants.insert(
TenantShardId::unsharded(tenant.tenant_id()),
TenantSlot::Attached(tenant),
);
tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
}
Err(e) => {
error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
@@ -962,35 +959,27 @@ impl TenantManager {
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
// Directory structure is the same for attached and secondary modes:
// create it if it doesn't exist. Timeline load/creation expects the
// timelines/ subdir to already exist.
//
// Does not need to be fsync'd because local storage is just a cache.
tokio::fs::create_dir_all(&timelines_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
// Before activating either secondary or attached mode, persist the
// configuration, so that on restart we will re-attach (or re-start
// secondary) on the tenant.
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => {
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
tokio::fs::create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Secondary(_) => TenantSlot::Secondary,
LocationMode::Attached(_attach_config) => {
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
// re-attached, see https://github.com/neondatabase/neon/issues/5550
tokio::fs::create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let shard_identity = new_location_config.shard;
let tenant = tenant_spawn(
self.conf,

View File

@@ -878,6 +878,23 @@ impl LayerInner {
Ok(())
}
Err(e) => {
let consecutive_failures =
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
_ = timeline.cancel.cancelled() => {},
};
Err(e)
}
};
@@ -926,21 +943,9 @@ impl LayerInner {
Ok(permit)
}
Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive
//
// while we should not need this, this backoff has turned out to be useful with
// a bug of unexpectedly deleted remote layer file (#5787).
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
// sleep already happened in the spawned task, if it was not cancelled
let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::time::sleep(backoff).await;
Err(DownloadError::DownloadFailed)
}
Err(_gone) => Err(DownloadError::DownloadCancelled),

View File

@@ -1712,9 +1712,9 @@ walprop_pg_after_election(WalProposer *wp)
f = fopen("restart.lsn", "rb");
if (f != NULL && !wp->config->syncSafekeepers)
{
fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
fclose(f);
if (lrRestartLsn != InvalidXLogRecPtr)
if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
{
elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));

View File

@@ -1,9 +1,12 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use anyhow::Context;
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver_api::shard::ShardIndex;
use tracing::{error, info, warn};
use utils::generation::Generation;
use utils::id::TimelineId;
use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing;
@@ -40,7 +43,7 @@ impl TimelineAnalysis {
pub(crate) fn branch_cleanup_and_check_errors(
id: &TenantShardTimelineId,
s3_root: &RootTarget,
tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
s3_data: Option<S3TimelineBlobData>,
@@ -72,8 +75,8 @@ pub(crate) fn branch_cleanup_and_check_errors(
match s3_data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
mut s3_layers,
index_part_generation: _index_part_generation,
s3_layers: _s3_layers,
} => {
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
result.errors.push(format!(
@@ -111,65 +114,19 @@ pub(crate) fn branch_cleanup_and_check_errors(
))
}
let layer_map_key = (layer, metadata.generation);
if !s3_layers.remove(&layer_map_key) {
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
// FIXME: this will emit false positives if an index was
// uploaded concurrently with our scan. To make this check
// correct, we need to try sending a HEAD request for the
// layer we think is missing.
result.errors.push(format!(
"index_part.json contains a layer {}{} that is not present in remote storage",
layer_map_key.0.file_name(),
layer_map_key.1.get_suffix()
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
layer.file_name(),
metadata.generation.get_suffix(),
metadata.shard
))
}
}
let orphan_layers: Vec<(LayerFileName, Generation)> = s3_layers
.into_iter()
.filter(|(_layer_name, gen)|
// A layer is only considered orphaned if it has a generation below
// the index. If the generation is >= the index, then the layer may
// be an upload from a running pageserver, or even an upload from
// a new generation that didn't upload an index yet.
//
// Even so, a layer that is not referenced by the index could just
// be something enqueued for deletion, so while this check is valid
// for indicating that a layer is garbage, it is not an indicator
// of a problem.
gen < &index_part_generation)
.collect();
if !orphan_layers.is_empty() {
// An orphan layer is not an error: it's arguably not even a warning, but it is helpful to report
// these as a hint that there is something worth cleaning up here.
result.warnings.push(format!(
"index_part.json does not contain layers from S3: {:?}",
orphan_layers
.iter()
.map(|(layer_name, gen)| format!(
"{}{}",
layer_name.file_name(),
gen.get_suffix()
))
.collect::<Vec<_>>(),
));
result.garbage_keys.extend(orphan_layers.iter().map(
|(layer_name, layer_gen)| {
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
let delimiter = s3_root.delimiter();
if !key.ends_with(delimiter) {
key.push_str(delimiter);
}
key.push_str(&format!(
"{}{}",
&layer_name.file_name(),
layer_gen.get_suffix()
));
key
},
));
}
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
@@ -204,6 +161,83 @@ pub(crate) fn branch_cleanup_and_check_errors(
result
}
#[derive(Default)]
pub(crate) struct LayerRef {
ref_count: usize,
}
/// Top-level index of objects in a tenant. This may be used by any shard-timeline within
/// the tenant to query whether an object exists.
#[derive(Default)]
pub(crate) struct TenantObjectListing {
shard_timelines:
HashMap<(ShardIndex, TimelineId), HashMap<(LayerFileName, Generation), LayerRef>>,
}
impl TenantObjectListing {
/// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
/// list of layer keys for the Tenant.
pub(crate) fn push(
&mut self,
ttid: TenantShardTimelineId,
layers: HashSet<(LayerFileName, Generation)>,
) {
let shard_index = ShardIndex::new(
ttid.tenant_shard_id.shard_number,
ttid.tenant_shard_id.shard_count,
);
let replaced = self.shard_timelines.insert(
(shard_index, ttid.timeline_id),
layers
.into_iter()
.map(|l| (l, LayerRef::default()))
.collect(),
);
assert!(
replaced.is_none(),
"Built from an S3 object listing, which should never repeat a key"
);
}
/// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
/// the layer's refcount will be incremented. Later, after calling this for all references in all indices
/// in a tenant, orphan layers may be detected by their zero refcounts.
///
/// Returns true if the layer exists
pub(crate) fn check_ref(
&mut self,
timeline_id: TimelineId,
layer_file: &LayerFileName,
metadata: &IndexLayerMetadata,
) -> bool {
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
return false;
};
let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
return false;
};
layer_ref.ref_count += 1;
true
}
pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerFileName, Generation)> {
let mut result = Vec::new();
for ((shard_index, timeline_id), layers) in &self.shard_timelines {
for ((layer_file, generation), layer_ref) in layers {
if layer_ref.ref_count == 0 {
result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
}
}
}
result
}
}
#[derive(Debug)]
pub(crate) struct S3TimelineBlobData {
pub(crate) blob_data: BlobDataParseResult,

View File

@@ -2,22 +2,25 @@ use std::collections::{HashMap, HashSet};
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TimelineAnalysis,
TenantObjectListing, TimelineAnalysis,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use histogram::Histogram;
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use utils::id::TenantId;
#[derive(Serialize)]
pub struct MetadataSummary {
count: usize,
with_errors: HashSet<TenantShardTimelineId>,
with_warnings: HashSet<TenantShardTimelineId>,
with_garbage: HashSet<TenantShardTimelineId>,
with_orphans: HashSet<TenantShardTimelineId>,
indices_by_version: HashMap<usize, usize>,
layer_count: MinMaxHisto,
@@ -87,7 +90,7 @@ impl MetadataSummary {
count: 0,
with_errors: HashSet::new(),
with_warnings: HashSet::new(),
with_garbage: HashSet::new(),
with_orphans: HashSet::new(),
indices_by_version: HashMap::new(),
layer_count: MinMaxHisto::new(),
timeline_size_bytes: MinMaxHisto::new(),
@@ -141,6 +144,10 @@ impl MetadataSummary {
}
}
fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
self.with_orphans.insert(*ttid);
}
/// Long-form output for printing at end of a scan
pub fn summary_string(&self) -> String {
let version_summary: String = itertools::join(
@@ -154,7 +161,7 @@ impl MetadataSummary {
"Timelines: {0}
With errors: {1}
With warnings: {2}
With garbage: {3}
With orphan layers: {3}
Index versions: {version_summary}
Timeline size bytes: {4}
Layer size bytes: {5}
@@ -163,7 +170,7 @@ Timeline layer count: {6}
self.count,
self.with_errors.len(),
self.with_warnings.len(),
self.with_garbage.len(),
self.with_orphans.len(),
self.timeline_size_bytes.oneline(),
self.layer_size_bytes.oneline(),
self.layer_count.oneline(),
@@ -191,7 +198,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
@@ -204,17 +211,118 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let timelines = timelines.try_buffered(CONCURRENCY);
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
let mut tenant_id = None;
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();
fn analyze_tenant(
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
) {
let mut timeline_generations = HashMap::new();
for (ttid, data) in timelines {
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation,
s3_layers: _s3_layers,
} = &data.blob_data
{
timeline_generations.insert(ttid, *index_part_generation);
}
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis =
branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
summary.update_analysis(&ttid, &analysis);
}
// Identifying orphan layers must be done on a tenant-wide basis, because individual
// shards' layers may be referenced by other shards.
//
// Orphan layers are not a corruption, and not an indication of a problem. They are just
// consuming some space in remote storage, and may be cleaned up at leisure.
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
let ttid = TenantShardTimelineId {
tenant_shard_id: TenantShardId {
tenant_id,
shard_count: shard_index.shard_count,
shard_number: shard_index.shard_number,
},
timeline_id,
};
if let Some(timeline_generation) = timeline_generations.get(&ttid) {
if &generation >= timeline_generation {
// Candidate orphan layer is in the current or future generation relative
// to the index we read for this timeline shard, so its absence from the index
// doesn't make it an orphan: more likely, it is a case where the layer was
// uploaded, but the index referencing the layer wasn't written yet.
continue;
}
}
let orphan_path = remote_layer_path(
&tenant_id,
&timeline_id,
shard_index,
&layer_file,
generation,
);
tracing::info!("Orphan layer detected: {orphan_path}");
summary.notify_timeline_orphan(&ttid);
}
}
// Iterate through all the timeline results. These are in key-order, so
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut summary = MetadataSummary::new();
pin_mut!(timelines);
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
match tenant_id {
None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
}
}
}
summary.update_analysis(&ttid, &analysis);
if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} = &data.blob_data
{
tenant_objects.push(ttid, s3_layers.clone());
}
tenant_timeline_results.push((ttid, data));
}
if !tenant_timeline_results.is_empty() {
analyze_tenant(
tenant_id.expect("Must be set if results are present"),
&mut summary,
tenant_objects,
tenant_timeline_results,
);
}
Ok(summary)

View File

@@ -1,2 +1,4 @@
result
*.json
hosts
poetry.lock

View File

@@ -0,0 +1,11 @@
[defaults]
host_key_checking = False
inventory=./hosts
remote_tmp=/tmp
remote_user=developer
callbacks_enabled = profile_tasks
[ssh_connection]
scp_if_ssh = True
ssh_args = -F ./ssh.cfg
pipelining = True

View File

@@ -0,0 +1,16 @@
[tool.poetry]
name = "sk-collect-dumps"
version = "0.1.0"
description = ""
authors = ["Arseny Sher <sher-ars@yandex.ru>"]
readme = "README.md"
packages = [{include = "sk_collect_dumps"}]
[tool.poetry.dependencies]
python = "^3.11"
ansible = "^9.1.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,25 +1,43 @@
# Collect /v1/debug_dump from all safekeeper nodes
1. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
2. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
## How to use ansible (staging)
3. Issue admin token (add/remove .stage from url for staging/prod and setting proper API key):
```
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
# staging:
AUTH_TOKEN=$(curl https://console.stage.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_STAGING_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
# prod:
AUTH_TOKEN=$(curl https://console.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_PROD_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
# check
echo $AUTH_TOKEN
```
2. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.eu-west-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
There are two ways to do that, with ssm or tsh. ssm:
```
# in aws repo, cd .github/ansible and run e.g. (adjusting profile and region in vars and limit):
AWS_DEFAULT_PROFILE=dev ansible-playbook -i inventory_aws_ec2.yaml -i staging.us-east-2.vars.yaml -e @ssm_config -l 'safekeeper:&us_east_2' -e "auth_token=${AUTH_TOKEN}" ~/neon/neon/scripts/sk_collect_dumps/remote.yaml
```
It will put the results to .results directory *near the playbook*.
tsh:
Update the inventory, if needed, selecting .build/.tech and optionally region:
```
rm -f hosts && echo '[safekeeper]' >> hosts
# staging:
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.build" | grep us-east-2 >> hosts
# prod:
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.tech" | grep us-east-2 >> hosts
```
## How to use ansible (prod)
Test ansible connection:
```
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-west-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.eu-central-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.ap-southeast-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
ansible all -m ping -v
```
Download the dumps:
```
mkdir -p result && rm -f result/*
ansible-playbook -e "auth_token=${AUTH_TOKEN}" remote.yaml
```
3. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.

View File

@@ -1,18 +1,37 @@
- name: Fetch state dumps from safekeepers
hosts: safekeepers
hosts: safekeeper
gather_facts: False
remote_user: "{{ remote_user }}"
tasks:
- name: Download file
- name: Dump file
get_url:
url: "http://{{ inventory_hostname }}:7676/v1/debug_dump?dump_all=true&dump_disk_content=false"
dest: "/tmp/{{ inventory_hostname }}.json"
dest: "/tmp/{{ inventory_hostname }}-dump.json"
headers:
Authorization: "Bearer {{ auth_token }}"
- name: Fetch file from remote hosts
- name: install rsync
ansible.builtin.apt:
name: rsync
update_cache: yes
become: yes
ignore_errors: true # it can be already installed and we don't always have sudo
- name: Fetch file from remote hosts (works only with ssm)
fetch:
src: "/tmp/{{ inventory_hostname }}.json"
dest: "./result/{{ inventory_hostname }}.json"
src: "/tmp/{{ inventory_hostname }}-dump.json"
dest: "./result/{{ inventory_hostname }}-dump.json"
flat: yes
fail_on_missing: no
when: ansible_connection == "aws_ssm"
# xxx not sure how to make ansible 'synchronize' work with tsh
- name: Fetch file from remote hosts
shell: rsync -e 'tsh ssh' -azvP "developer@{{ inventory_hostname }}:/tmp/{{ inventory_hostname }}-dump.json" "./result/{{ inventory_hostname }}-dump.json"
delegate_to: localhost
when: ansible_connection != "aws_ssm"
- name: remove remote dumps
ansible.builtin.file:
path: "/tmp/{{ inventory_hostname }}-dump.json"
state: absent

View File

@@ -0,0 +1,13 @@
# Begin generated Teleport configuration for teleport.aws.neon.tech by tsh
# Common flags for all teleport.aws.neon.tech hosts
Host *
HostKeyAlgorithms rsa-sha2-512-cert-v01@openssh.com,rsa-sha2-256-cert-v01@openssh.com,ssh-rsa-cert-v01@openssh.com
# Flags for all teleport.aws.neon.tech hosts except the proxy
Host * !teleport.aws.neon.tech
Port 3022
ProxyCommand "/usr/local/bin/tsh" proxy ssh --cluster=teleport.aws.neon.tech --proxy=teleport.aws.neon.tech:443 %r@%h:%p
User developer
# End generated Teleport configuration

View File

@@ -31,22 +31,22 @@ SELECT
(data->>'tenant_id') AS tenant_id,
(data->>'timeline_id') AS timeline_id,
(data->'memory'->>'active')::bool AS active,
(data->'memory'->>'flush_lsn')::bigint AS flush_lsn,
(data->'memory'->'mem_state'->>'backup_lsn')::bigint AS backup_lsn,
(data->'memory'->'mem_state'->>'commit_lsn')::bigint AS commit_lsn,
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::bigint AS peer_horizon_lsn,
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::bigint AS remote_consistent_lsn,
(data->'memory'->>'write_lsn')::bigint AS write_lsn,
(data->'memory'->>'flush_lsn')::pg_lsn AS flush_lsn,
(data->'memory'->'mem_state'->>'backup_lsn')::pg_lsn AS backup_lsn,
(data->'memory'->'mem_state'->>'commit_lsn')::pg_lsn AS commit_lsn,
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::pg_lsn AS peer_horizon_lsn,
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::pg_lsn AS remote_consistent_lsn,
(data->'memory'->>'write_lsn')::pg_lsn AS write_lsn,
(data->'memory'->>'num_computes')::bigint AS num_computes,
(data->'memory'->>'epoch_start_lsn')::bigint AS epoch_start_lsn,
(data->'memory'->>'epoch_start_lsn')::pg_lsn AS epoch_start_lsn,
(data->'memory'->>'last_removed_segno')::bigint AS last_removed_segno,
(data->'memory'->>'is_cancelled')::bool AS is_cancelled,
(data->'control_file'->>'backup_lsn')::bigint AS disk_backup_lsn,
(data->'control_file'->>'commit_lsn')::bigint AS disk_commit_lsn,
(data->'control_file'->>'backup_lsn')::pg_lsn AS disk_backup_lsn,
(data->'control_file'->>'commit_lsn')::pg_lsn AS disk_commit_lsn,
(data->'control_file'->'acceptor_state'->>'term')::bigint AS disk_term,
(data->'control_file'->>'local_start_lsn')::bigint AS local_start_lsn,
(data->'control_file'->>'peer_horizon_lsn')::bigint AS disk_peer_horizon_lsn,
(data->'control_file'->>'timeline_start_lsn')::bigint AS timeline_start_lsn,
(data->'control_file'->>'remote_consistent_lsn')::bigint AS disk_remote_consistent_lsn
(data->'control_file'->>'local_start_lsn')::pg_lsn AS local_start_lsn,
(data->'control_file'->>'peer_horizon_lsn')::pg_lsn AS disk_peer_horizon_lsn,
(data->'control_file'->>'timeline_start_lsn')::pg_lsn AS timeline_start_lsn,
(data->'control_file'->>'remote_consistent_lsn')::pg_lsn AS disk_remote_consistent_lsn
FROM tmp_json
EOF

View File

@@ -0,0 +1,10 @@
#!/bin/bash
# export NEON_API_KEY=
while IFS= read -r ENDPOINT
do
echo "$ENDPOINT"
# curl -X POST -H "Authorization: Bearer $NEON_PROD_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/console/api/v1/admin/endpoints/$ENDPOINT/restart
curl -X POST -H "Authorization: Bearer $NEON_API_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/aws-us-east-2/api/v1/admin/endpoints/$ENDPOINT/restart
done < endpoints_cplane.txt

View File

@@ -0,0 +1,137 @@
import argparse
import sys
import psycopg2
import psycopg2.extras
import os
import requests
def migrate_project(conn, from_sk: dict[str, any], to_sk: dict[str, any], project_id: str, dry_run=True):
print("###############################################################")
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM projects WHERE id = %s", (project_id,))
project = cur.fetchone()
if project is None:
print("Project with id {} does not exist".format(project_id))
return
assert project['deleted'] == False, "Project with id {} is deleted".format(project_id)
with conn.cursor() as cur:
cur.execute("SELECT safekeeper_id FROM projects_safekeepers WHERE project_id = %s", (project_id, ))
sk_ids = list(map(lambda x: x[0], cur.fetchall()))
assert from_sk['id'] in sk_ids
assert to_sk['id'] not in sk_ids
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM branches WHERE project_id = %s AND deleted = 'f'", (project_id, ))
branches = cur.fetchall()
for branch in branches:
if branch['deleted'] != False:
continue
tenant_id = project['tenant_id']
timeline_id = branch['timeline_id']
print("tenant_id: {}, timeline_id: {}".format(tenant_id, timeline_id))
print(f"Migrating from {from_sk['host']} to {to_sk['host']}, project={project_id}, branch={branch['id']}, deleted={branch['deleted']}")
print(list(sk_ids))
sk_hosts = list(map(
lambda x: f"http://{safekeepers[x]['host']}:{safekeepers[x]['http_port']}",
filter(lambda x: x != from_sk['id'], sk_ids)
))
# make HTTP request to /pull_timeline
# url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/pull_timeline"
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"http_hosts": sk_hosts,
}
print(body)
print("Making HTTP request to {}".format(url), flush=True)
if not dry_run:
response = requests.post(url, json=body)
# response = requests.get(url)
if response.status_code != 200 and f"error decoding response body: missing field `tenant_id` at line 1 column 104" in response.text:
print(f"WARN: Skipping branch {branch['id']} because it's empty on all safekeepers")
continue
if response.status_code != 200 and f"Timeline {timeline_id} already exists" in response.text:
print(f"WARN: Skipping timeline {timeline_id} because it is already exists (was migrated earlier)")
continue
if response.status_code != 200:
print("ERROR: {}".format(response.text))
return
print(response.text)
print(f"Updating safekeeper {from_sk['id']} -> {to_sk['id']} for project={project_id} in the database")
if not dry_run:
with conn.cursor() as cur:
cur.execute("UPDATE projects_safekeepers SET safekeeper_id = %s WHERE project_id = %s AND safekeeper_id = %s RETURNING *", (to_sk['id'], project_id, from_sk['id']))
print(cur.fetchone())
conn.commit()
def find_projects(sk_from_id: int):
with conn.cursor() as cur:
cur.execute("SELECT p.id FROM projects p, projects_safekeepers ps WHERE ps.project_id = p.id AND NOT p.deleted AND ps.safekeeper_id = %s", (sk_from_id, ))
project_ids = list(map(lambda x: x[0], cur.fetchall()))
return project_ids
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='migrate sk')
parser.add_argument("-d", help="database URL", type=str, required=True)
parser.add_argument("--from-sk", help="from sk id as in the cplane db", type=int, required=True)
parser.add_argument("--to-sk", help="to sk id as in the cplane db", type=int, required=True)
parser.add_argument("--not-dry-run", help="", action='store_true')
parser.add_argument("--project-id", help="project to migrate", type=str, default=None)
args = parser.parse_args()
# Connect to postgresql database
conn = psycopg2.connect(args.d)
safekeepers = dict()
# We need to fetch all objects from "safekeepers" table and store them in "safekeepers" list
# Create cursor
cur = conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor)
# Execute query
cur.execute("SELECT * FROM safekeepers")
# Fetch all rows
rows = cur.fetchall()
# Close cursor
cur.close()
# Iterate over rows
for row in rows:
safekeepers[row['id']] = row
# Print all safekeepers
# print(safekeepers)
assert args.from_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.from_sk)
from_sk_hostname = safekeepers[args.from_sk]['host']
assert safekeepers[args.from_sk]['active'] == False, "Safekeeper with id {} should be inactive".format(args.from_sk)
assert args.to_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.to_sk)
to_sk_hostname = safekeepers[args.to_sk]['host']
assert safekeepers[args.to_sk]['active'] == True, "Safekeeper with id {} should be active".format(args.to_sk)
print(f"migrating from id {args.from_sk} {from_sk_hostname} to {args.to_sk} {to_sk_hostname}")
if args.project_id is not None:
project_ids = [args.project_id]
else:
project_ids = find_projects(args.from_sk)
print(project_ids)
for project_id in project_ids:
migrate_project(conn, safekeepers[args.from_sk], safekeepers[args.to_sk], project_id)

View File

@@ -3,9 +3,12 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use clap::Parser;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SubscribeByFilterRequest,
TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, TypedMessage,
};
use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT};
use tokio::time;
@@ -91,15 +94,23 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
};
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
let ttid = ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(i),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
};
let mut stream = client
.subscribe_safekeeper_info(request)
let request = SubscribeByFilterRequest {
types: vec![TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo.into(),
}],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ttid),
}),
};
let mut stream: tonic::Streaming<TypedMessage> = client
.subscribe_by_filter(request)
.await
.unwrap()
.into_inner();

View File

@@ -10,6 +10,12 @@ service BrokerService {
// Publish safekeeper updates.
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
// Subscribe to all messages, limited by a filter.
rpc SubscribeByFilter(SubscribeByFilterRequest) returns (stream TypedMessage) {};
// Publish one message.
rpc PublishOne(TypedMessage) returns (google.protobuf.Empty) {};
}
message SubscribeSafekeeperInfoRequest {
@@ -48,3 +54,55 @@ message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}
message FilterTenantTimelineId {
// If true, only messages related to `tenant_timeline_id` will be emitted.
// Otherwise, messages for all timelines will be emitted.
bool enabled = 1;
TenantTimelineId tenant_timeline_id = 2;
}
message TypeSubscription {
MessageType type = 1;
}
message SubscribeByFilterRequest {
// Subscription will emit messages only of the specified types. You need to specify
// at least one type to receive any messages.
repeated TypeSubscription types = 1;
// If set and enabled, subscription will emit messages only for the specified tenant/timeline.
optional FilterTenantTimelineId tenant_timeline_id = 2;
}
enum MessageType {
UNKNOWN = 0;
SAFEKEEPER_TIMELINE_INFO = 2;
SAFEKEEPER_DISCOVERY_REQUEST = 3;
SAFEKEEPER_DISCOVERY_RESPONSE = 4;
}
// A message with a type.
message TypedMessage {
MessageType type = 1;
optional SafekeeperTimelineInfo safekeeper_timeline_info = 2;
optional SafekeeperDiscoveryRequest safekeeper_discovery_request = 3;
optional SafekeeperDiscoveryResponse safekeeper_discovery_response = 4;
}
message SafekeeperDiscoveryRequest {
TenantTimelineId tenant_timeline_id = 1;
}
// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
message SafekeeperDiscoveryResponse {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
// WAL available to download.
uint64 commit_lsn = 3;
// A connection string to use for WAL downloading.
string safekeeper_connstr = 4;
// Availability zone of a safekeeper.
optional string availability_zone = 5;
}

View File

@@ -35,10 +35,16 @@ use tracing::*;
use utils::signals::ShutdownSignals;
use metrics::{Encoder, TextEncoder};
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
use storage_broker::metrics::{
BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
};
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
};
use storage_broker::{
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
};
@@ -73,8 +79,103 @@ struct Args {
log_format: String,
}
type PubId = u64; // id of publisher for registering in maps
type SubId = u64; // id of subscriber for registering in maps
/// Id of publisher for registering in maps
type PubId = u64;
/// Id of subscriber for registering in maps
type SubId = u64;
/// Single enum type for all messages.
#[derive(Clone, Debug, PartialEq)]
#[allow(clippy::enum_variant_names)]
enum Message {
SafekeeperTimelineInfo(SafekeeperTimelineInfo),
SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
}
impl Message {
/// Convert proto message to internal message.
pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
match proto_msg.r#type() {
MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
proto_msg.safekeeper_timeline_info.ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
})?,
)),
MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
proto_msg.safekeeper_discovery_request.ok_or_else(|| {
Status::new(
Code::InvalidArgument,
"missing safekeeper_discovery_request",
)
})?,
)),
MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
proto_msg.safekeeper_discovery_response.ok_or_else(|| {
Status::new(
Code::InvalidArgument,
"missing safekeeper_discovery_response",
)
})?,
)),
MessageType::Unknown => Err(Status::new(
Code::InvalidArgument,
format!("invalid message type: {:?}", proto_msg.r#type),
)),
}
}
/// Get the tenant_timeline_id from the message.
pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
match self {
Message::SafekeeperTimelineInfo(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
}
}
/// Convert internal message to the protobuf struct.
pub fn as_typed_message(&self) -> TypedMessage {
let mut res = TypedMessage {
r#type: self.message_type() as i32,
..Default::default()
};
match self {
Message::SafekeeperTimelineInfo(msg) => {
res.safekeeper_timeline_info = Some(msg.clone())
}
Message::SafekeeperDiscoveryRequest(msg) => {
res.safekeeper_discovery_request = Some(msg.clone())
}
Message::SafekeeperDiscoveryResponse(msg) => {
res.safekeeper_discovery_response = Some(msg.clone())
}
}
res
}
/// Get the message type.
pub fn message_type(&self) -> MessageType {
match self {
Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
}
}
}
#[derive(Copy, Clone, Debug)]
enum SubscriptionKey {
@@ -83,7 +184,7 @@ enum SubscriptionKey {
}
impl SubscriptionKey {
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
/// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
match key {
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
@@ -92,14 +193,29 @@ impl SubscriptionKey {
}
}
}
/// Parse from FilterTenantTimelineId
pub fn from_proto_filter_tenant_timeline_id(
f: &FilterTenantTimelineId,
) -> Result<Self, Status> {
if !f.enabled {
return Ok(SubscriptionKey::All);
}
let ttid =
parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
Ok(SubscriptionKey::Timeline(ttid))
}
}
// Channel to timeline subscribers.
/// Channel to timeline subscribers.
struct ChanToTimelineSub {
chan: broadcast::Sender<SafekeeperTimelineInfo>,
// Tracked separately to know when delete the shmem entry. receiver_count()
// is unhandy for that as unregistering and dropping the receiver side
// happens at different moments.
chan: broadcast::Sender<Message>,
/// Tracked separately to know when delete the shmem entry. receiver_count()
/// is unhandy for that as unregistering and dropping the receiver side
/// happens at different moments.
num_subscribers: u64,
}
@@ -110,7 +226,7 @@ struct SharedState {
num_subs_to_timelines: i64,
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
num_subs_to_all: i64,
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
chan_to_all_subs: broadcast::Sender<Message>,
}
impl SharedState {
@@ -146,7 +262,7 @@ impl SharedState {
&mut self,
sub_key: SubscriptionKey,
timeline_chan_size: usize,
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
) -> (SubId, broadcast::Receiver<Message>) {
let sub_id = self.next_sub_id;
self.next_sub_id += 1;
let sub_rx = match sub_key {
@@ -262,6 +378,29 @@ impl Registry {
subscriber.id, subscriber.key, subscriber.remote_addr
);
}
/// Send msg to relevant subscribers.
pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
PROCESSED_MESSAGES_TOTAL.inc();
// send message to subscribers for everything
let shared_state = self.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers, if there is ttid
let ttid = msg.tenant_timeline_id()?;
if let Some(ttid) = ttid {
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as tx is destroyed only after removing
// from the map the last subscriber along with tx.
subs.chan
.send(msg.clone())
.expect("rx is still in the map with zero subscribers");
}
}
Ok(())
}
}
// Private subscriber state.
@@ -269,7 +408,7 @@ struct Subscriber {
id: SubId,
key: SubscriptionKey,
// Subscriber receives messages from publishers here.
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
sub_rx: broadcast::Receiver<Message>,
// to unregister itself from shared state in Drop
registry: Registry,
// for logging
@@ -291,26 +430,9 @@ struct Publisher {
}
impl Publisher {
// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
// send message to subscribers for everything
let shared_state = self.registry.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers
let ttid =
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as tx is destroyed only after removing
// from the map the last subscriber along with tx.
subs.chan
.send(msg.clone())
.expect("rx is still in the map with zero subscribers");
}
Ok(())
/// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
self.registry.send_msg(msg)
}
}
@@ -339,7 +461,7 @@ impl BrokerService for Broker {
loop {
match stream.next().await {
Some(Ok(msg)) => publisher.send_msg(&msg)?,
Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
Some(Err(e)) => return Err(e), // grpc error from the stream
None => break, // closed stream
}
@@ -371,8 +493,15 @@ impl BrokerService for Broker {
let mut missed_msgs: u64 = 0;
loop {
match subscriber.sub_rx.recv().await {
Ok(info) => yield info,
Ok(info) => {
match info {
Message::SafekeeperTimelineInfo(info) => yield info,
_ => {},
}
BROADCASTED_MESSAGES_TOTAL.inc();
},
Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
@@ -392,6 +521,78 @@ impl BrokerService for Broker {
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
))
}
type SubscribeByFilterStream =
Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
/// Subscribe to all messages, limited by a filter.
async fn subscribe_by_filter(
&self,
request: Request<SubscribeByFilterRequest>,
) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
let remote_addr = request
.remote_addr()
.expect("TCPConnectInfo inserted by handler");
let proto_filter = request.into_inner();
let ttid_filter = proto_filter
.tenant_timeline_id
.as_ref()
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
let types_set = proto_filter
.types
.iter()
.map(|t| t.r#type)
.collect::<std::collections::HashSet<_>>();
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
let mut warn_interval = time::interval(Duration::from_millis(1000));
let mut missed_msgs: u64 = 0;
loop {
match subscriber.sub_rx.recv().await {
Ok(msg) => {
let msg_type = msg.message_type() as i32;
if types_set.contains(&msg_type) {
yield msg.as_typed_message();
BROADCASTED_MESSAGES_TOTAL.inc();
}
},
Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;
}
}
Err(RecvError::Closed) => {
// can't happen, we never drop the channel while there is a subscriber
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
}
}
}
};
Ok(Response::new(
Box::pin(output) as Self::SubscribeByFilterStream
))
}
/// Publish one message.
async fn publish_one(
&self,
request: Request<TypedMessage>,
) -> std::result::Result<Response<()>, Status> {
let msg = Message::from(request.into_inner())?;
PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
self.registry.send_msg(&msg)?;
Ok(Response::new(()))
}
}
// We serve only metrics and healthcheck through http1.
@@ -515,8 +716,8 @@ mod tests {
use tokio::sync::broadcast::error::TryRecvError;
use utils::id::{TenantId, TimelineId};
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
SafekeeperTimelineInfo {
fn msg(timeline_id: Vec<u8>) -> Message {
Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
safekeeper_id: 1,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: vec![0x00; 16],
@@ -533,7 +734,7 @@ mod tests {
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
}
})
}
fn tli_from_u64(i: u64) -> Vec<u8> {

View File

@@ -1,6 +1,6 @@
//! Broker metrics.
use metrics::{register_int_gauge, IntGauge};
use metrics::{register_int_counter, register_int_gauge, IntCounter, IntGauge};
use once_cell::sync::Lazy;
pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
@@ -23,3 +23,35 @@ pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
)
.expect("Failed to register metric")
});
pub static PROCESSED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_processed_messages_total",
"Number of messages received by storage broker, before routing and broadcasting"
)
.expect("Failed to register metric")
});
pub static BROADCASTED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_broadcasted_messages_total",
"Number of messages broadcasted (sent over network) to subscribers"
)
.expect("Failed to register metric")
});
pub static BROADCAST_DROPPED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_broadcast_dropped_messages_total",
"Number of messages dropped due to channel capacity overflow"
)
.expect("Failed to register metric")
});
pub static PUBLISHED_ONEOFF_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_published_oneoff_messages_total",
"Number of one-off messages sent via PublishOne method"
)
.expect("Failed to register metric")
});

View File

@@ -457,7 +457,6 @@ class NeonEnvBuilder:
self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate()
self.enable_generations = True
self.scrub_on_exit = False
self.test_output_dir = test_output_dir
@@ -677,8 +676,7 @@ class NeonEnvBuilder:
pageserver.stop(immediate=True)
if self.env.attachment_service is not None:
self.env.attachment_service.stop(immediate=True)
self.env.attachment_service.stop(immediate=True)
cleanup_error = None
@@ -772,13 +770,9 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
if config.enable_generations:
attachment_service_port = self.port_distributor.get_port()
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
else:
self.control_plane_api = None
self.attachment_service = None
attachment_service_port = self.port_distributor.get_port()
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
self.attachment_service: NeonAttachmentService = NeonAttachmentService(self)
# Create a config file corresponding to the options
cfg: Dict[str, Any] = {
@@ -851,8 +845,7 @@ class NeonEnv:
# Start up broker, pageserver and all safekeepers
self.broker.try_start()
if self.attachment_service is not None:
self.attachment_service.start()
self.attachment_service.start()
for pageserver in self.pageservers:
pageserver.start()
@@ -1834,20 +1827,19 @@ class NeonPageserver(PgProtocol):
"""
client = self.http_client()
return client.tenant_attach(
tenant_id, config, config_null, generation=self.maybe_get_generation(tenant_id)
tenant_id,
config,
config_null,
generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id),
)
def tenant_detach(self, tenant_id: TenantId):
if self.env.attachment_service is not None:
self.env.attachment_service.attach_hook_drop(tenant_id)
self.env.attachment_service.attach_hook_drop(tenant_id)
client = self.http_client()
return client.tenant_detach(tenant_id)
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
# This API is only for use when generations are enabled
assert self.env.attachment_service is not None
if config["mode"].startswith("Attached") and "generation" not in config:
config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
@@ -1873,26 +1865,15 @@ class NeonPageserver(PgProtocol):
generation: Optional[int] = None,
) -> TenantId:
if generation is None:
generation = self.maybe_get_generation(tenant_id)
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
client = self.http_client(auth_token=auth_token)
return client.tenant_create(tenant_id, conf, generation=generation)
def tenant_load(self, tenant_id: TenantId):
client = self.http_client()
return client.tenant_load(tenant_id, generation=self.maybe_get_generation(tenant_id))
def maybe_get_generation(self, tenant_id: TenantId):
"""
For tests that would like to use an HTTP client directly instead of using
the `tenant_attach` and `tenant_create` helpers here: issue a generation
number for a tenant.
Returns None if the attachment service is not enabled (legacy mode)
"""
if self.env.attachment_service is not None:
return self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
else:
return None
return client.tenant_load(
tenant_id, generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
)
def append_pageserver_param_overrides(

View File

@@ -125,3 +125,51 @@ class TenantId(Id):
class TimelineId(Id):
def __repr__(self) -> str:
return f'TimelineId("{self.id.hex()}")'
# Workaround for compat with python 3.9, which does not have `typing.Self`
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
class TenantShardId:
def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int):
self.tenant_id = tenant_id
self.shard_number = shard_number
self.shard_count = shard_count
assert self.shard_number < self.shard_count or self.shard_count == 0
@classmethod
def parse(cls: Type[TTenantShardId], input) -> TTenantShardId:
if len(input) == 32:
return cls(
tenant_id=TenantId(input),
shard_number=0,
shard_count=0,
)
elif len(input) == 37:
return cls(
tenant_id=TenantId(input[0:32]),
shard_number=int(input[33:35], 16),
shard_count=int(input[35:37], 16),
)
else:
raise ValueError(f"Invalid TenantShardId '{input}'")
def __str__(self):
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
def _tuple(self) -> tuple[TenantId, int, int]:
return (self.tenant_id, self.shard_number, self.shard_count)
def __lt__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self._tuple() < other._tuple()
def __eq__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self._tuple() == other._tuple()
def __hash__(self) -> int:
return hash(self._tuple())

View File

@@ -61,7 +61,6 @@ def measure_recovery_time(env: NeonCompare):
# of view, but the same as far as the safekeeper/WAL is concerned. To work around that,
# we will explicitly create the tenant in the same generation that it was previously
# attached in.
assert env.env.attachment_service is not None
attach_status = env.env.attachment_service.inspect(tenant_id=env.tenant)
assert attach_status is not None
(attach_gen, _) = attach_status

View File

@@ -136,10 +136,7 @@ def test_no_config(positive_env: NeonEnv, content_type: Optional[str]):
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
body = {}
gen = env.pageserver.maybe_get_generation(tenant_id)
if gen is not None:
body["generation"] = gen
body = {"generation": env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)}
ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",

View File

@@ -87,7 +87,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
#
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
# about to start as the attached pageserver
assert env.attachment_service is not None
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
env.pageservers[0].start()
env.pageservers[1].stop()

View File

@@ -157,7 +157,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
def get_generation_number():
assert env.attachment_service is not None
attachment = env.attachment_service.inspect(tenant_id)
assert attachment is not None
return attachment[0]

View File

@@ -72,7 +72,9 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
# create new tenant and check it is also there
tenant_id = TenantId.generate()
client.tenant_create(tenant_id, generation=env.pageserver.maybe_get_generation(tenant_id))
client.tenant_create(
tenant_id, generation=env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
)
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}
timelines = client.timeline_list(tenant_id)

View File

@@ -187,7 +187,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
- After upgrade, the bucket should contain a mixture.
- In both cases, postgres I/O should work.
"""
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
@@ -196,7 +195,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
env.broker.try_start()
for sk in env.safekeepers:
sk.start()
assert env.attachment_service is not None
env.attachment_service.start()
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
@@ -262,12 +260,10 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
some_other_pageserver = 1234
ps_http = env.pageserver.http_client()
@@ -341,7 +337,6 @@ def test_deletion_queue_recovery(
:param validate_before: whether to wait for deletions to be validated before restart. This
makes them elegible to be executed after restart, if the same node keeps the attachment.
"""
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
@@ -405,7 +400,6 @@ def test_deletion_queue_recovery(
if keep_attachment == KeepAttachment.LOSE:
some_other_pageserver = 101010
assert env.attachment_service is not None
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
env.pageserver.start()
@@ -453,7 +447,6 @@ def test_deletion_queue_recovery(
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
@@ -473,7 +466,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
)
# Simulate a major incident: the control plane goes offline
assert env.attachment_service is not None
env.attachment_service.stop()
# Remember how many validations had happened before the control plane went offline
@@ -545,7 +537,6 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
and must be constructed using the proper generation for the layer, which may not be the same generation
that the tenant is running in.
"""
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
@@ -575,7 +566,6 @@ def test_multi_attach(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,

View File

@@ -9,9 +9,7 @@ from fixtures.utils import wait_until
# Test restarting page server, while safekeeper and compute node keep
# running.
@pytest.mark.parametrize("generations", [True, False])
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool):
neon_env_builder.enable_generations = generations
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()

View File

@@ -57,13 +57,11 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
states are valid, so that we may test it in this way: the API should always
work as long as the tenant exists.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
pageservers = env.pageservers
list([p.http_client() for p in pageservers])
@@ -210,13 +208,11 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
"""
Test the sequence of location states that are used in a live migration.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

View File

@@ -60,8 +60,6 @@ def test_remote_storage_backup_and_restore(
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.enable_generations = generations
# Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check
# that they are present after the test.

View File

@@ -263,15 +263,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
ps_http, env.initial_tenant, timeline_id, iterations=iterations
)
if failpoint == "timeline-delete-after-index-delete":
m = ps_http.get_metrics()
assert (
m.query_one(
"remote_storage_s3_request_seconds_count",
filter={"request_type": "get_object", "result": "ok"},
).value
== 1 # index part for initial timeline
)
elif check is Check.RETRY_WITHOUT_RESTART:
# this should succeed
# this also checks that delete can be retried even when timeline is in Broken state