Compare commits

...

10 Commits

Author SHA1 Message Date
Arseny Sher
22e9702525 migrate script 2023-12-25 23:04:34 +03:00
John Spray
ac38d3a88c remote_storage: don't count 404s as errors (#6201)
## Problem

Currently a chart of S3 error rate is misleading: it can show errors any
time we are attaching a tenant (probing for index_part generation,
checking for remote delete marker).

Considering 404 successful isn't perfectly elegant, but it enables the
error rate to be used a a more meaningful alert signal: it would
indicate if we were having auth issues, sending bad requests, getting
throttled ,etc.

## Summary of changes

Track 404 requests in the AttemptOutcome::Ok bucket instead of the
AttemptOutcome::Err bucket.
2023-12-20 17:00:29 +00:00
Arthur Petukhovsky
0f56104a61 Make sk_collect_dumps also possible with teleport (#4739)
Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2023-12-20 15:06:55 +00:00
John Spray
f260f1565e pageserver: fixes + test updates for sharding (#6186)
This is a precursor to:
- https://github.com/neondatabase/neon/pull/6185

While that PR contains big changes to neon_local and attachment_service,
this PR contains a few unrelated standalone changes generated while
working on that branch:
- Fix restarting a pageserver when it contains multiple shards for the
same tenant
- When using location_config api to attach a tenant, create its
timelines dir
- Update test paths where generations were previously optional to make
them always-on: this avoids tests having to spuriously assert that
attachment_service is not None in order to make the linter happy.
- Add a TenantShardId python implementation for subsequent use in test
helpers that will be made shard-aware
- Teach scrubber to read across shards when checking for layer
existence: this is a refactor to track the list of existent layers at
tenant-level rather than locally to each timeline. This is a precursor
to testing shard splitting.
2023-12-20 12:26:20 +00:00
Joonas Koivunen
c29df80634 fix(layer): move backoff to spawned task (#5746)
Move the backoff to spawned task as it can still be useful; make the
sleep cancellable.
2023-12-20 10:26:06 +02:00
Em Sharnoff
58dbca6ce3 Bump vm-builder v0.19.0 -> v0.21.0 (#6197)
Only applicable change was neondatabase/autoscaling#650, reducing the
vector scrape interval (inside the VM) from 15 seconds to 1 second.
2023-12-19 23:48:41 +00:00
Arthur Petukhovsky
613906acea Support custom types in broker (#5761)
Old methods are unchanged for backwards compatibility. Added
`SafekeeperDiscoveryRequest` and `SafekeeperDiscoveryResponse` types to
serve as example, and also as a prerequisite for
https://github.com/neondatabase/neon/issues/5471
2023-12-19 17:06:43 +00:00
Christian Schwarz
82809d2ec2 fix metric pageserver_initial_logical_size_start_calculation (#6191)
It wasn't being incremented.

Fixup of

    commit 1c88824ed0
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Fri Dec 1 12:52:59 2023 +0100

        initial logical size calculation: add a bunch of metrics (#5995)
2023-12-19 17:44:49 +01:00
Anastasia Lubennikova
0bd79eb063 Handle role deletion when project has no databases. (#6170)
There is still default 'postgres' database, that may contain objects
owned by the role or some ACLs. We need to reassign objects in this
database too.

## Problem
If customer deleted all databases and then tries to delete role, that
has some non-standard ACLs,
`apply_config` operation will stuck because of failing role deletion.
2023-12-19 16:27:47 +00:00
Konstantin Knizhnik
8ff5387da1 eliminate GCC warning for unchecked result of fread (#6167)
## Problem


GCCproduce warning that bread result is not checked. It doesn't affect
program logic, but better live without warnings.

## Summary of changes

Check read result.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-12-19 18:17:11 +02:00
35 changed files with 986 additions and 296 deletions

View File

@@ -857,7 +857,7 @@ jobs:
run: run:
shell: sh -eu {0} shell: sh -eu {0}
env: env:
VM_BUILDER_VERSION: v0.19.0 VM_BUILDER_VERSION: v0.21.0
steps: steps:
- name: Checkout - name: Checkout

View File

@@ -370,33 +370,49 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
Ok(()) Ok(())
} }
fn reassign_owned_objects_in_one_db(
conf: Config,
role_name: &PgIdent,
db_owner: &PgIdent,
) -> Result<()> {
let mut client = conf.connect(NoTls)?;
// This will reassign all dependent objects to the db owner
let reassign_query = format!(
"REASSIGN OWNED BY {} TO {}",
role_name.pg_quote(),
db_owner.pg_quote()
);
info!(
"reassigning objects owned by '{}' in db '{}' to '{}'",
role_name,
conf.get_dbname().unwrap_or(""),
db_owner
);
client.simple_query(&reassign_query)?;
// This now will only drop privileges of the role
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
client.simple_query(&drop_query)?;
Ok(())
}
// Reassign all owned objects in all databases to the owner of the database. // Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> { fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
for db in &spec.cluster.databases { for db in &spec.cluster.databases {
if db.owner != *role_name { if db.owner != *role_name {
let mut conf = Config::from_str(connstr)?; let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name); conf.dbname(&db.name);
reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
let mut client = conf.connect(NoTls)?;
// This will reassign all dependent objects to the db owner
let reassign_query = format!(
"REASSIGN OWNED BY {} TO {}",
role_name.pg_quote(),
db.owner.pg_quote()
);
info!(
"reassigning objects owned by '{}' in db '{}' to '{}'",
role_name, &db.name, &db.owner
);
client.simple_query(&reassign_query)?;
// This now will only drop privileges of the role
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
client.simple_query(&drop_query)?;
} }
} }
// Also handle case when there are no databases in the spec.
// In this case we need to reassign objects in the default database.
let conf = Config::from_str(connstr)?;
let db_owner = PgIdent::from_str("cloud_admin")?;
reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
Ok(()) Ok(())
} }

View File

@@ -159,7 +159,7 @@ impl From<[u8; 18]> for TenantShardId {
/// shard we're dealing with, but do not need to know the full ShardIdentity (because /// shard we're dealing with, but do not need to know the full ShardIdentity (because
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified /// we won't be doing any page->shard mapping), and do not need to know the fully qualified
/// TenantShardId. /// TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)] #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex { pub struct ShardIndex {
pub shard_number: ShardNumber, pub shard_number: ShardNumber,
pub shard_count: ShardCount, pub shard_count: ShardCount,

View File

@@ -218,14 +218,6 @@ impl S3Bucket {
let started_at = ScopeGuard::into_inner(started_at); let started_at = ScopeGuard::into_inner(started_at);
if get_object.is_err() {
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);
}
match get_object { match get_object {
Ok(object_output) => { Ok(object_output) => {
let metadata = object_output.metadata().cloned().map(StorageMetadata); let metadata = object_output.metadata().cloned().map(StorageMetadata);
@@ -241,11 +233,27 @@ impl S3Bucket {
}) })
} }
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
// an error: we expect to sometimes fetch an object and find it missing,
// e.g. when probing for timeline indices.
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Ok,
started_at,
);
Err(DownloadError::NotFound) Err(DownloadError::NotFound)
} }
Err(e) => Err(DownloadError::Other( Err(e) => {
anyhow::Error::new(e).context("download s3 object"), metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
)), kind,
AttemptOutcome::Err,
started_at,
);
Err(DownloadError::Other(
anyhow::Error::new(e).context("download s3 object"),
))
}
} }
} }
} }

View File

@@ -522,14 +522,18 @@ pub(crate) mod initial_logical_size {
impl StartCalculation { impl StartCalculation {
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into(); let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["first", circumstances_label]); self.0
.with_label_values(&["first", circumstances_label])
.inc();
OngoingCalculationGuard { OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()), inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
} }
} }
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into(); let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["retry", circumstances_label]); self.0
.with_label_values(&["retry", circumstances_label])
.inc();
OngoingCalculationGuard { OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()), inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
} }

View File

@@ -514,10 +514,7 @@ pub async fn init_tenant_mgr(
&ctx, &ctx,
) { ) {
Ok(tenant) => { Ok(tenant) => {
tenants.insert( tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
TenantShardId::unsharded(tenant.tenant_id()),
TenantSlot::Attached(tenant),
);
} }
Err(e) => { Err(e) => {
error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}"); error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
@@ -962,35 +959,27 @@ impl TenantManager {
} }
let tenant_path = self.conf.tenant_path(&tenant_shard_id); let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
// Directory structure is the same for attached and secondary modes:
// create it if it doesn't exist. Timeline load/creation expects the
// timelines/ subdir to already exist.
//
// Does not need to be fsync'd because local storage is just a cache.
tokio::fs::create_dir_all(&timelines_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
// Before activating either secondary or attached mode, persist the
// configuration, so that on restart we will re-attach (or re-start
// secondary) on the tenant.
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let new_slot = match &new_location_config.mode { let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => { LocationMode::Secondary(_) => TenantSlot::Secondary,
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
tokio::fs::create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Attached(_attach_config) => { LocationMode::Attached(_attach_config) => {
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
// re-attached, see https://github.com/neondatabase/neon/issues/5550
tokio::fs::create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let shard_identity = new_location_config.shard; let shard_identity = new_location_config.shard;
let tenant = tenant_spawn( let tenant = tenant_spawn(
self.conf, self.conf,

View File

@@ -878,6 +878,23 @@ impl LayerInner {
Ok(()) Ok(())
} }
Err(e) => { Err(e) => {
let consecutive_failures =
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
_ = timeline.cancel.cancelled() => {},
};
Err(e) Err(e)
} }
}; };
@@ -926,21 +943,9 @@ impl LayerInner {
Ok(permit) Ok(permit)
} }
Ok((Err(e), _permit)) => { Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive // sleep already happened in the spawned task, if it was not cancelled
// let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed);
// while we should not need this, this backoff has turned out to be useful with
// a bug of unexpectedly deleted remote layer file (#5787).
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::time::sleep(backoff).await;
Err(DownloadError::DownloadFailed) Err(DownloadError::DownloadFailed)
} }
Err(_gone) => Err(DownloadError::DownloadCancelled), Err(_gone) => Err(DownloadError::DownloadCancelled),

View File

@@ -1712,9 +1712,9 @@ walprop_pg_after_election(WalProposer *wp)
f = fopen("restart.lsn", "rb"); f = fopen("restart.lsn", "rb");
if (f != NULL && !wp->config->syncSafekeepers) if (f != NULL && !wp->config->syncSafekeepers)
{ {
fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f); size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
fclose(f); fclose(f);
if (lrRestartLsn != InvalidXLogRecPtr) if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
{ {
elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));

View File

@@ -1,9 +1,12 @@
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use anyhow::Context; use anyhow::Context;
use aws_sdk_s3::{types::ObjectIdentifier, Client}; use aws_sdk_s3::{types::ObjectIdentifier, Client};
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver_api::shard::ShardIndex;
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use utils::generation::Generation; use utils::generation::Generation;
use utils::id::TimelineId;
use crate::cloud_admin_api::BranchData; use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing; use crate::metadata_stream::stream_listing;
@@ -40,7 +43,7 @@ impl TimelineAnalysis {
pub(crate) fn branch_cleanup_and_check_errors( pub(crate) fn branch_cleanup_and_check_errors(
id: &TenantShardTimelineId, id: &TenantShardTimelineId,
s3_root: &RootTarget, tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>, s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>, console_branch: Option<BranchData>,
s3_data: Option<S3TimelineBlobData>, s3_data: Option<S3TimelineBlobData>,
@@ -72,8 +75,8 @@ pub(crate) fn branch_cleanup_and_check_errors(
match s3_data.blob_data { match s3_data.blob_data {
BlobDataParseResult::Parsed { BlobDataParseResult::Parsed {
index_part, index_part,
index_part_generation, index_part_generation: _index_part_generation,
mut s3_layers, s3_layers: _s3_layers,
} => { } => {
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) { if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
result.errors.push(format!( result.errors.push(format!(
@@ -111,65 +114,19 @@ pub(crate) fn branch_cleanup_and_check_errors(
)) ))
} }
let layer_map_key = (layer, metadata.generation); if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
if !s3_layers.remove(&layer_map_key) {
// FIXME: this will emit false positives if an index was // FIXME: this will emit false positives if an index was
// uploaded concurrently with our scan. To make this check // uploaded concurrently with our scan. To make this check
// correct, we need to try sending a HEAD request for the // correct, we need to try sending a HEAD request for the
// layer we think is missing. // layer we think is missing.
result.errors.push(format!( result.errors.push(format!(
"index_part.json contains a layer {}{} that is not present in remote storage", "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
layer_map_key.0.file_name(), layer.file_name(),
layer_map_key.1.get_suffix() metadata.generation.get_suffix(),
metadata.shard
)) ))
} }
} }
let orphan_layers: Vec<(LayerFileName, Generation)> = s3_layers
.into_iter()
.filter(|(_layer_name, gen)|
// A layer is only considered orphaned if it has a generation below
// the index. If the generation is >= the index, then the layer may
// be an upload from a running pageserver, or even an upload from
// a new generation that didn't upload an index yet.
//
// Even so, a layer that is not referenced by the index could just
// be something enqueued for deletion, so while this check is valid
// for indicating that a layer is garbage, it is not an indicator
// of a problem.
gen < &index_part_generation)
.collect();
if !orphan_layers.is_empty() {
// An orphan layer is not an error: it's arguably not even a warning, but it is helpful to report
// these as a hint that there is something worth cleaning up here.
result.warnings.push(format!(
"index_part.json does not contain layers from S3: {:?}",
orphan_layers
.iter()
.map(|(layer_name, gen)| format!(
"{}{}",
layer_name.file_name(),
gen.get_suffix()
))
.collect::<Vec<_>>(),
));
result.garbage_keys.extend(orphan_layers.iter().map(
|(layer_name, layer_gen)| {
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
let delimiter = s3_root.delimiter();
if !key.ends_with(delimiter) {
key.push_str(delimiter);
}
key.push_str(&format!(
"{}{}",
&layer_name.file_name(),
layer_gen.get_suffix()
));
key
},
));
}
} }
BlobDataParseResult::Relic => {} BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend( BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
@@ -204,6 +161,83 @@ pub(crate) fn branch_cleanup_and_check_errors(
result result
} }
#[derive(Default)]
pub(crate) struct LayerRef {
ref_count: usize,
}
/// Top-level index of objects in a tenant. This may be used by any shard-timeline within
/// the tenant to query whether an object exists.
#[derive(Default)]
pub(crate) struct TenantObjectListing {
shard_timelines:
HashMap<(ShardIndex, TimelineId), HashMap<(LayerFileName, Generation), LayerRef>>,
}
impl TenantObjectListing {
/// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
/// list of layer keys for the Tenant.
pub(crate) fn push(
&mut self,
ttid: TenantShardTimelineId,
layers: HashSet<(LayerFileName, Generation)>,
) {
let shard_index = ShardIndex::new(
ttid.tenant_shard_id.shard_number,
ttid.tenant_shard_id.shard_count,
);
let replaced = self.shard_timelines.insert(
(shard_index, ttid.timeline_id),
layers
.into_iter()
.map(|l| (l, LayerRef::default()))
.collect(),
);
assert!(
replaced.is_none(),
"Built from an S3 object listing, which should never repeat a key"
);
}
/// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
/// the layer's refcount will be incremented. Later, after calling this for all references in all indices
/// in a tenant, orphan layers may be detected by their zero refcounts.
///
/// Returns true if the layer exists
pub(crate) fn check_ref(
&mut self,
timeline_id: TimelineId,
layer_file: &LayerFileName,
metadata: &IndexLayerMetadata,
) -> bool {
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
return false;
};
let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
return false;
};
layer_ref.ref_count += 1;
true
}
pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerFileName, Generation)> {
let mut result = Vec::new();
for ((shard_index, timeline_id), layers) in &self.shard_timelines {
for ((layer_file, generation), layer_ref) in layers {
if layer_ref.ref_count == 0 {
result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
}
}
}
result
}
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct S3TimelineBlobData { pub(crate) struct S3TimelineBlobData {
pub(crate) blob_data: BlobDataParseResult, pub(crate) blob_data: BlobDataParseResult,

View File

@@ -2,22 +2,25 @@ use std::collections::{HashMap, HashSet};
use crate::checks::{ use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData, branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TimelineAnalysis, TenantObjectListing, TimelineAnalysis,
}; };
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client; use aws_sdk_s3::Client;
use futures_util::{pin_mut, StreamExt, TryStreamExt}; use futures_util::{pin_mut, StreamExt, TryStreamExt};
use histogram::Histogram; use histogram::Histogram;
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver::tenant::IndexPart; use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use serde::Serialize; use serde::Serialize;
use utils::id::TenantId;
#[derive(Serialize)] #[derive(Serialize)]
pub struct MetadataSummary { pub struct MetadataSummary {
count: usize, count: usize,
with_errors: HashSet<TenantShardTimelineId>, with_errors: HashSet<TenantShardTimelineId>,
with_warnings: HashSet<TenantShardTimelineId>, with_warnings: HashSet<TenantShardTimelineId>,
with_garbage: HashSet<TenantShardTimelineId>, with_orphans: HashSet<TenantShardTimelineId>,
indices_by_version: HashMap<usize, usize>, indices_by_version: HashMap<usize, usize>,
layer_count: MinMaxHisto, layer_count: MinMaxHisto,
@@ -87,7 +90,7 @@ impl MetadataSummary {
count: 0, count: 0,
with_errors: HashSet::new(), with_errors: HashSet::new(),
with_warnings: HashSet::new(), with_warnings: HashSet::new(),
with_garbage: HashSet::new(), with_orphans: HashSet::new(),
indices_by_version: HashMap::new(), indices_by_version: HashMap::new(),
layer_count: MinMaxHisto::new(), layer_count: MinMaxHisto::new(),
timeline_size_bytes: MinMaxHisto::new(), timeline_size_bytes: MinMaxHisto::new(),
@@ -141,6 +144,10 @@ impl MetadataSummary {
} }
} }
fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
self.with_orphans.insert(*ttid);
}
/// Long-form output for printing at end of a scan /// Long-form output for printing at end of a scan
pub fn summary_string(&self) -> String { pub fn summary_string(&self) -> String {
let version_summary: String = itertools::join( let version_summary: String = itertools::join(
@@ -154,7 +161,7 @@ impl MetadataSummary {
"Timelines: {0} "Timelines: {0}
With errors: {1} With errors: {1}
With warnings: {2} With warnings: {2}
With garbage: {3} With orphan layers: {3}
Index versions: {version_summary} Index versions: {version_summary}
Timeline size bytes: {4} Timeline size bytes: {4}
Layer size bytes: {5} Layer size bytes: {5}
@@ -163,7 +170,7 @@ Timeline layer count: {6}
self.count, self.count,
self.with_errors.len(), self.with_errors.len(),
self.with_warnings.len(), self.with_warnings.len(),
self.with_garbage.len(), self.with_orphans.len(),
self.timeline_size_bytes.oneline(), self.timeline_size_bytes.oneline(),
self.layer_size_bytes.oneline(), self.layer_size_bytes.oneline(),
self.layer_count.oneline(), self.layer_count.oneline(),
@@ -191,7 +198,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
// Generate a stream of TenantTimelineId // Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t)); let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffer_unordered(CONCURRENCY); let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten(); let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData // Generate a stream of S3TimelineBlobData
@@ -204,17 +211,118 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
Ok((ttid, data)) Ok((ttid, data))
} }
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid)); let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.try_buffer_unordered(CONCURRENCY); let timelines = timelines.try_buffered(CONCURRENCY);
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
let mut tenant_id = None;
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();
fn analyze_tenant(
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
) {
let mut timeline_generations = HashMap::new();
for (ttid, data) in timelines {
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation,
s3_layers: _s3_layers,
} = &data.blob_data
{
timeline_generations.insert(ttid, *index_part_generation);
}
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis =
branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
summary.update_analysis(&ttid, &analysis);
}
// Identifying orphan layers must be done on a tenant-wide basis, because individual
// shards' layers may be referenced by other shards.
//
// Orphan layers are not a corruption, and not an indication of a problem. They are just
// consuming some space in remote storage, and may be cleaned up at leisure.
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
let ttid = TenantShardTimelineId {
tenant_shard_id: TenantShardId {
tenant_id,
shard_count: shard_index.shard_count,
shard_number: shard_index.shard_number,
},
timeline_id,
};
if let Some(timeline_generation) = timeline_generations.get(&ttid) {
if &generation >= timeline_generation {
// Candidate orphan layer is in the current or future generation relative
// to the index we read for this timeline shard, so its absence from the index
// doesn't make it an orphan: more likely, it is a case where the layer was
// uploaded, but the index referencing the layer wasn't written yet.
continue;
}
}
let orphan_path = remote_layer_path(
&tenant_id,
&timeline_id,
shard_index,
&layer_file,
generation,
);
tracing::info!("Orphan layer detected: {orphan_path}");
summary.notify_timeline_orphan(&ttid);
}
}
// Iterate through all the timeline results. These are in key-order, so
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut summary = MetadataSummary::new(); let mut summary = MetadataSummary::new();
pin_mut!(timelines); pin_mut!(timelines);
while let Some(i) = timelines.next().await { while let Some(i) = timelines.next().await {
let (ttid, data) = i?; let (ttid, data) = i?;
summary.update_data(&data); summary.update_data(&data);
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)); match tenant_id {
None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
}
}
}
summary.update_analysis(&ttid, &analysis); if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} = &data.blob_data
{
tenant_objects.push(ttid, s3_layers.clone());
}
tenant_timeline_results.push((ttid, data));
}
if !tenant_timeline_results.is_empty() {
analyze_tenant(
tenant_id.expect("Must be set if results are present"),
&mut summary,
tenant_objects,
tenant_timeline_results,
);
} }
Ok(summary) Ok(summary)

View File

@@ -1,2 +1,4 @@
result result
*.json *.json
hosts
poetry.lock

View File

@@ -0,0 +1,11 @@
[defaults]
host_key_checking = False
inventory=./hosts
remote_tmp=/tmp
remote_user=developer
callbacks_enabled = profile_tasks
[ssh_connection]
scp_if_ssh = True
ssh_args = -F ./ssh.cfg
pipelining = True

View File

@@ -0,0 +1,16 @@
[tool.poetry]
name = "sk-collect-dumps"
version = "0.1.0"
description = ""
authors = ["Arseny Sher <sher-ars@yandex.ru>"]
readme = "README.md"
packages = [{include = "sk_collect_dumps"}]
[tool.poetry.dependencies]
python = "^3.11"
ansible = "^9.1.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,25 +1,43 @@
# Collect /v1/debug_dump from all safekeeper nodes # Collect /v1/debug_dump from all safekeeper nodes
1. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory. 3. Issue admin token (add/remove .stage from url for staging/prod and setting proper API key):
2. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
## How to use ansible (staging)
``` ```
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml # staging:
AUTH_TOKEN=$(curl https://console.stage.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_STAGING_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
# prod:
AUTH_TOKEN=$(curl https://console.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_PROD_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
# check
echo $AUTH_TOKEN
```
2. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.eu-west-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml There are two ways to do that, with ssm or tsh. ssm:
```
# in aws repo, cd .github/ansible and run e.g. (adjusting profile and region in vars and limit):
AWS_DEFAULT_PROFILE=dev ansible-playbook -i inventory_aws_ec2.yaml -i staging.us-east-2.vars.yaml -e @ssm_config -l 'safekeeper:&us_east_2' -e "auth_token=${AUTH_TOKEN}" ~/neon/neon/scripts/sk_collect_dumps/remote.yaml
```
It will put the results to .results directory *near the playbook*.
tsh:
Update the inventory, if needed, selecting .build/.tech and optionally region:
```
rm -f hosts && echo '[safekeeper]' >> hosts
# staging:
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.build" | grep us-east-2 >> hosts
# prod:
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.tech" | grep us-east-2 >> hosts
``` ```
## How to use ansible (prod) Test ansible connection:
``` ```
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-west-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml ansible all -m ping -v
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.eu-central-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.ap-southeast-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
``` ```
Download the dumps:
```
mkdir -p result && rm -f result/*
ansible-playbook -e "auth_token=${AUTH_TOKEN}" remote.yaml
```
3. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.

View File

@@ -1,18 +1,37 @@
- name: Fetch state dumps from safekeepers - name: Fetch state dumps from safekeepers
hosts: safekeepers hosts: safekeeper
gather_facts: False gather_facts: False
remote_user: "{{ remote_user }}"
tasks: tasks:
- name: Download file - name: Dump file
get_url: get_url:
url: "http://{{ inventory_hostname }}:7676/v1/debug_dump?dump_all=true&dump_disk_content=false" url: "http://{{ inventory_hostname }}:7676/v1/debug_dump?dump_all=true&dump_disk_content=false"
dest: "/tmp/{{ inventory_hostname }}.json" dest: "/tmp/{{ inventory_hostname }}-dump.json"
headers:
Authorization: "Bearer {{ auth_token }}"
- name: Fetch file from remote hosts - name: install rsync
ansible.builtin.apt:
name: rsync
update_cache: yes
become: yes
ignore_errors: true # it can be already installed and we don't always have sudo
- name: Fetch file from remote hosts (works only with ssm)
fetch: fetch:
src: "/tmp/{{ inventory_hostname }}.json" src: "/tmp/{{ inventory_hostname }}-dump.json"
dest: "./result/{{ inventory_hostname }}.json" dest: "./result/{{ inventory_hostname }}-dump.json"
flat: yes flat: yes
fail_on_missing: no fail_on_missing: no
when: ansible_connection == "aws_ssm"
# xxx not sure how to make ansible 'synchronize' work with tsh
- name: Fetch file from remote hosts
shell: rsync -e 'tsh ssh' -azvP "developer@{{ inventory_hostname }}:/tmp/{{ inventory_hostname }}-dump.json" "./result/{{ inventory_hostname }}-dump.json"
delegate_to: localhost
when: ansible_connection != "aws_ssm"
- name: remove remote dumps
ansible.builtin.file:
path: "/tmp/{{ inventory_hostname }}-dump.json"
state: absent

View File

@@ -0,0 +1,13 @@
# Begin generated Teleport configuration for teleport.aws.neon.tech by tsh
# Common flags for all teleport.aws.neon.tech hosts
Host *
HostKeyAlgorithms rsa-sha2-512-cert-v01@openssh.com,rsa-sha2-256-cert-v01@openssh.com,ssh-rsa-cert-v01@openssh.com
# Flags for all teleport.aws.neon.tech hosts except the proxy
Host * !teleport.aws.neon.tech
Port 3022
ProxyCommand "/usr/local/bin/tsh" proxy ssh --cluster=teleport.aws.neon.tech --proxy=teleport.aws.neon.tech:443 %r@%h:%p
User developer
# End generated Teleport configuration

View File

@@ -31,22 +31,22 @@ SELECT
(data->>'tenant_id') AS tenant_id, (data->>'tenant_id') AS tenant_id,
(data->>'timeline_id') AS timeline_id, (data->>'timeline_id') AS timeline_id,
(data->'memory'->>'active')::bool AS active, (data->'memory'->>'active')::bool AS active,
(data->'memory'->>'flush_lsn')::bigint AS flush_lsn, (data->'memory'->>'flush_lsn')::pg_lsn AS flush_lsn,
(data->'memory'->'mem_state'->>'backup_lsn')::bigint AS backup_lsn, (data->'memory'->'mem_state'->>'backup_lsn')::pg_lsn AS backup_lsn,
(data->'memory'->'mem_state'->>'commit_lsn')::bigint AS commit_lsn, (data->'memory'->'mem_state'->>'commit_lsn')::pg_lsn AS commit_lsn,
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::bigint AS peer_horizon_lsn, (data->'memory'->'mem_state'->>'peer_horizon_lsn')::pg_lsn AS peer_horizon_lsn,
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::bigint AS remote_consistent_lsn, (data->'memory'->'mem_state'->>'remote_consistent_lsn')::pg_lsn AS remote_consistent_lsn,
(data->'memory'->>'write_lsn')::bigint AS write_lsn, (data->'memory'->>'write_lsn')::pg_lsn AS write_lsn,
(data->'memory'->>'num_computes')::bigint AS num_computes, (data->'memory'->>'num_computes')::bigint AS num_computes,
(data->'memory'->>'epoch_start_lsn')::bigint AS epoch_start_lsn, (data->'memory'->>'epoch_start_lsn')::pg_lsn AS epoch_start_lsn,
(data->'memory'->>'last_removed_segno')::bigint AS last_removed_segno, (data->'memory'->>'last_removed_segno')::bigint AS last_removed_segno,
(data->'memory'->>'is_cancelled')::bool AS is_cancelled, (data->'memory'->>'is_cancelled')::bool AS is_cancelled,
(data->'control_file'->>'backup_lsn')::bigint AS disk_backup_lsn, (data->'control_file'->>'backup_lsn')::pg_lsn AS disk_backup_lsn,
(data->'control_file'->>'commit_lsn')::bigint AS disk_commit_lsn, (data->'control_file'->>'commit_lsn')::pg_lsn AS disk_commit_lsn,
(data->'control_file'->'acceptor_state'->>'term')::bigint AS disk_term, (data->'control_file'->'acceptor_state'->>'term')::bigint AS disk_term,
(data->'control_file'->>'local_start_lsn')::bigint AS local_start_lsn, (data->'control_file'->>'local_start_lsn')::pg_lsn AS local_start_lsn,
(data->'control_file'->>'peer_horizon_lsn')::bigint AS disk_peer_horizon_lsn, (data->'control_file'->>'peer_horizon_lsn')::pg_lsn AS disk_peer_horizon_lsn,
(data->'control_file'->>'timeline_start_lsn')::bigint AS timeline_start_lsn, (data->'control_file'->>'timeline_start_lsn')::pg_lsn AS timeline_start_lsn,
(data->'control_file'->>'remote_consistent_lsn')::bigint AS disk_remote_consistent_lsn (data->'control_file'->>'remote_consistent_lsn')::pg_lsn AS disk_remote_consistent_lsn
FROM tmp_json FROM tmp_json
EOF EOF

View File

@@ -0,0 +1,10 @@
#!/bin/bash
# export NEON_API_KEY=
while IFS= read -r ENDPOINT
do
echo "$ENDPOINT"
# curl -X POST -H "Authorization: Bearer $NEON_PROD_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/console/api/v1/admin/endpoints/$ENDPOINT/restart
curl -X POST -H "Authorization: Bearer $NEON_API_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/aws-us-east-2/api/v1/admin/endpoints/$ENDPOINT/restart
done < endpoints_cplane.txt

View File

@@ -0,0 +1,137 @@
import argparse
import sys
import psycopg2
import psycopg2.extras
import os
import requests
def migrate_project(conn, from_sk: dict[str, any], to_sk: dict[str, any], project_id: str, dry_run=True):
print("###############################################################")
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM projects WHERE id = %s", (project_id,))
project = cur.fetchone()
if project is None:
print("Project with id {} does not exist".format(project_id))
return
assert project['deleted'] == False, "Project with id {} is deleted".format(project_id)
with conn.cursor() as cur:
cur.execute("SELECT safekeeper_id FROM projects_safekeepers WHERE project_id = %s", (project_id, ))
sk_ids = list(map(lambda x: x[0], cur.fetchall()))
assert from_sk['id'] in sk_ids
assert to_sk['id'] not in sk_ids
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM branches WHERE project_id = %s AND deleted = 'f'", (project_id, ))
branches = cur.fetchall()
for branch in branches:
if branch['deleted'] != False:
continue
tenant_id = project['tenant_id']
timeline_id = branch['timeline_id']
print("tenant_id: {}, timeline_id: {}".format(tenant_id, timeline_id))
print(f"Migrating from {from_sk['host']} to {to_sk['host']}, project={project_id}, branch={branch['id']}, deleted={branch['deleted']}")
print(list(sk_ids))
sk_hosts = list(map(
lambda x: f"http://{safekeepers[x]['host']}:{safekeepers[x]['http_port']}",
filter(lambda x: x != from_sk['id'], sk_ids)
))
# make HTTP request to /pull_timeline
# url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/pull_timeline"
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"http_hosts": sk_hosts,
}
print(body)
print("Making HTTP request to {}".format(url), flush=True)
if not dry_run:
response = requests.post(url, json=body)
# response = requests.get(url)
if response.status_code != 200 and f"error decoding response body: missing field `tenant_id` at line 1 column 104" in response.text:
print(f"WARN: Skipping branch {branch['id']} because it's empty on all safekeepers")
continue
if response.status_code != 200 and f"Timeline {timeline_id} already exists" in response.text:
print(f"WARN: Skipping timeline {timeline_id} because it is already exists (was migrated earlier)")
continue
if response.status_code != 200:
print("ERROR: {}".format(response.text))
return
print(response.text)
print(f"Updating safekeeper {from_sk['id']} -> {to_sk['id']} for project={project_id} in the database")
if not dry_run:
with conn.cursor() as cur:
cur.execute("UPDATE projects_safekeepers SET safekeeper_id = %s WHERE project_id = %s AND safekeeper_id = %s RETURNING *", (to_sk['id'], project_id, from_sk['id']))
print(cur.fetchone())
conn.commit()
def find_projects(sk_from_id: int):
with conn.cursor() as cur:
cur.execute("SELECT p.id FROM projects p, projects_safekeepers ps WHERE ps.project_id = p.id AND NOT p.deleted AND ps.safekeeper_id = %s", (sk_from_id, ))
project_ids = list(map(lambda x: x[0], cur.fetchall()))
return project_ids
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='migrate sk')
parser.add_argument("-d", help="database URL", type=str, required=True)
parser.add_argument("--from-sk", help="from sk id as in the cplane db", type=int, required=True)
parser.add_argument("--to-sk", help="to sk id as in the cplane db", type=int, required=True)
parser.add_argument("--not-dry-run", help="", action='store_true')
parser.add_argument("--project-id", help="project to migrate", type=str, default=None)
args = parser.parse_args()
# Connect to postgresql database
conn = psycopg2.connect(args.d)
safekeepers = dict()
# We need to fetch all objects from "safekeepers" table and store them in "safekeepers" list
# Create cursor
cur = conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor)
# Execute query
cur.execute("SELECT * FROM safekeepers")
# Fetch all rows
rows = cur.fetchall()
# Close cursor
cur.close()
# Iterate over rows
for row in rows:
safekeepers[row['id']] = row
# Print all safekeepers
# print(safekeepers)
assert args.from_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.from_sk)
from_sk_hostname = safekeepers[args.from_sk]['host']
assert safekeepers[args.from_sk]['active'] == False, "Safekeeper with id {} should be inactive".format(args.from_sk)
assert args.to_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.to_sk)
to_sk_hostname = safekeepers[args.to_sk]['host']
assert safekeepers[args.to_sk]['active'] == True, "Safekeeper with id {} should be active".format(args.to_sk)
print(f"migrating from id {args.from_sk} {from_sk_hostname} to {args.to_sk} {to_sk_hostname}")
if args.project_id is not None:
project_ids = [args.project_id]
else:
project_ids = find_projects(args.from_sk)
print(project_ids)
for project_id in project_ids:
migrate_project(conn, safekeepers[args.from_sk], safekeepers[args.to_sk], project_id)

View File

@@ -3,9 +3,12 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use clap::Parser; use clap::Parser;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SubscribeByFilterRequest,
TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, TypedMessage,
};
use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT}; use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT};
use tokio::time; use tokio::time;
@@ -91,15 +94,23 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(), None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
}; };
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId { let ttid = ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16], tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(i), timeline_id: tli_from_u64(i),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
}; };
let mut stream = client
.subscribe_safekeeper_info(request) let request = SubscribeByFilterRequest {
types: vec![TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo.into(),
}],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ttid),
}),
};
let mut stream: tonic::Streaming<TypedMessage> = client
.subscribe_by_filter(request)
.await .await
.unwrap() .unwrap()
.into_inner(); .into_inner();

View File

@@ -10,6 +10,12 @@ service BrokerService {
// Publish safekeeper updates. // Publish safekeeper updates.
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {}; rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
// Subscribe to all messages, limited by a filter.
rpc SubscribeByFilter(SubscribeByFilterRequest) returns (stream TypedMessage) {};
// Publish one message.
rpc PublishOne(TypedMessage) returns (google.protobuf.Empty) {};
} }
message SubscribeSafekeeperInfoRequest { message SubscribeSafekeeperInfoRequest {
@@ -48,3 +54,55 @@ message TenantTimelineId {
bytes tenant_id = 1; bytes tenant_id = 1;
bytes timeline_id = 2; bytes timeline_id = 2;
} }
message FilterTenantTimelineId {
// If true, only messages related to `tenant_timeline_id` will be emitted.
// Otherwise, messages for all timelines will be emitted.
bool enabled = 1;
TenantTimelineId tenant_timeline_id = 2;
}
message TypeSubscription {
MessageType type = 1;
}
message SubscribeByFilterRequest {
// Subscription will emit messages only of the specified types. You need to specify
// at least one type to receive any messages.
repeated TypeSubscription types = 1;
// If set and enabled, subscription will emit messages only for the specified tenant/timeline.
optional FilterTenantTimelineId tenant_timeline_id = 2;
}
enum MessageType {
UNKNOWN = 0;
SAFEKEEPER_TIMELINE_INFO = 2;
SAFEKEEPER_DISCOVERY_REQUEST = 3;
SAFEKEEPER_DISCOVERY_RESPONSE = 4;
}
// A message with a type.
message TypedMessage {
MessageType type = 1;
optional SafekeeperTimelineInfo safekeeper_timeline_info = 2;
optional SafekeeperDiscoveryRequest safekeeper_discovery_request = 3;
optional SafekeeperDiscoveryResponse safekeeper_discovery_response = 4;
}
message SafekeeperDiscoveryRequest {
TenantTimelineId tenant_timeline_id = 1;
}
// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
message SafekeeperDiscoveryResponse {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
// WAL available to download.
uint64 commit_lsn = 3;
// A connection string to use for WAL downloading.
string safekeeper_connstr = 4;
// Availability zone of a safekeeper.
optional string availability_zone = 5;
}

View File

@@ -35,10 +35,16 @@ use tracing::*;
use utils::signals::ShutdownSignals; use utils::signals::ShutdownSignals;
use metrics::{Encoder, TextEncoder}; use metrics::{Encoder, TextEncoder};
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE}; use storage_broker::metrics::{
BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
};
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer}; use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey; use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
};
use storage_broker::{ use storage_broker::{
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
}; };
@@ -73,8 +79,103 @@ struct Args {
log_format: String, log_format: String,
} }
type PubId = u64; // id of publisher for registering in maps /// Id of publisher for registering in maps
type SubId = u64; // id of subscriber for registering in maps type PubId = u64;
/// Id of subscriber for registering in maps
type SubId = u64;
/// Single enum type for all messages.
#[derive(Clone, Debug, PartialEq)]
#[allow(clippy::enum_variant_names)]
enum Message {
SafekeeperTimelineInfo(SafekeeperTimelineInfo),
SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
}
impl Message {
/// Convert proto message to internal message.
pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
match proto_msg.r#type() {
MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
proto_msg.safekeeper_timeline_info.ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
})?,
)),
MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
proto_msg.safekeeper_discovery_request.ok_or_else(|| {
Status::new(
Code::InvalidArgument,
"missing safekeeper_discovery_request",
)
})?,
)),
MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
proto_msg.safekeeper_discovery_response.ok_or_else(|| {
Status::new(
Code::InvalidArgument,
"missing safekeeper_discovery_response",
)
})?,
)),
MessageType::Unknown => Err(Status::new(
Code::InvalidArgument,
format!("invalid message type: {:?}", proto_msg.r#type),
)),
}
}
/// Get the tenant_timeline_id from the message.
pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
match self {
Message::SafekeeperTimelineInfo(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
.tenant_timeline_id
.as_ref()
.map(parse_proto_ttid)
.transpose()?),
}
}
/// Convert internal message to the protobuf struct.
pub fn as_typed_message(&self) -> TypedMessage {
let mut res = TypedMessage {
r#type: self.message_type() as i32,
..Default::default()
};
match self {
Message::SafekeeperTimelineInfo(msg) => {
res.safekeeper_timeline_info = Some(msg.clone())
}
Message::SafekeeperDiscoveryRequest(msg) => {
res.safekeeper_discovery_request = Some(msg.clone())
}
Message::SafekeeperDiscoveryResponse(msg) => {
res.safekeeper_discovery_response = Some(msg.clone())
}
}
res
}
/// Get the message type.
pub fn message_type(&self) -> MessageType {
match self {
Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
}
}
}
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
enum SubscriptionKey { enum SubscriptionKey {
@@ -83,7 +184,7 @@ enum SubscriptionKey {
} }
impl SubscriptionKey { impl SubscriptionKey {
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors). /// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> { pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
match key { match key {
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All), ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
@@ -92,14 +193,29 @@ impl SubscriptionKey {
} }
} }
} }
/// Parse from FilterTenantTimelineId
pub fn from_proto_filter_tenant_timeline_id(
f: &FilterTenantTimelineId,
) -> Result<Self, Status> {
if !f.enabled {
return Ok(SubscriptionKey::All);
}
let ttid =
parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
Ok(SubscriptionKey::Timeline(ttid))
}
} }
// Channel to timeline subscribers. /// Channel to timeline subscribers.
struct ChanToTimelineSub { struct ChanToTimelineSub {
chan: broadcast::Sender<SafekeeperTimelineInfo>, chan: broadcast::Sender<Message>,
// Tracked separately to know when delete the shmem entry. receiver_count() /// Tracked separately to know when delete the shmem entry. receiver_count()
// is unhandy for that as unregistering and dropping the receiver side /// is unhandy for that as unregistering and dropping the receiver side
// happens at different moments. /// happens at different moments.
num_subscribers: u64, num_subscribers: u64,
} }
@@ -110,7 +226,7 @@ struct SharedState {
num_subs_to_timelines: i64, num_subs_to_timelines: i64,
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>, chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
num_subs_to_all: i64, num_subs_to_all: i64,
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>, chan_to_all_subs: broadcast::Sender<Message>,
} }
impl SharedState { impl SharedState {
@@ -146,7 +262,7 @@ impl SharedState {
&mut self, &mut self,
sub_key: SubscriptionKey, sub_key: SubscriptionKey,
timeline_chan_size: usize, timeline_chan_size: usize,
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) { ) -> (SubId, broadcast::Receiver<Message>) {
let sub_id = self.next_sub_id; let sub_id = self.next_sub_id;
self.next_sub_id += 1; self.next_sub_id += 1;
let sub_rx = match sub_key { let sub_rx = match sub_key {
@@ -262,6 +378,29 @@ impl Registry {
subscriber.id, subscriber.key, subscriber.remote_addr subscriber.id, subscriber.key, subscriber.remote_addr
); );
} }
/// Send msg to relevant subscribers.
pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
PROCESSED_MESSAGES_TOTAL.inc();
// send message to subscribers for everything
let shared_state = self.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers, if there is ttid
let ttid = msg.tenant_timeline_id()?;
if let Some(ttid) = ttid {
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as tx is destroyed only after removing
// from the map the last subscriber along with tx.
subs.chan
.send(msg.clone())
.expect("rx is still in the map with zero subscribers");
}
}
Ok(())
}
} }
// Private subscriber state. // Private subscriber state.
@@ -269,7 +408,7 @@ struct Subscriber {
id: SubId, id: SubId,
key: SubscriptionKey, key: SubscriptionKey,
// Subscriber receives messages from publishers here. // Subscriber receives messages from publishers here.
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>, sub_rx: broadcast::Receiver<Message>,
// to unregister itself from shared state in Drop // to unregister itself from shared state in Drop
registry: Registry, registry: Registry,
// for logging // for logging
@@ -291,26 +430,9 @@ struct Publisher {
} }
impl Publisher { impl Publisher {
// Send msg to relevant subscribers. /// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> { pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
// send message to subscribers for everything self.registry.send_msg(msg)
let shared_state = self.registry.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers
let ttid =
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as tx is destroyed only after removing
// from the map the last subscriber along with tx.
subs.chan
.send(msg.clone())
.expect("rx is still in the map with zero subscribers");
}
Ok(())
} }
} }
@@ -339,7 +461,7 @@ impl BrokerService for Broker {
loop { loop {
match stream.next().await { match stream.next().await {
Some(Ok(msg)) => publisher.send_msg(&msg)?, Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
Some(Err(e)) => return Err(e), // grpc error from the stream Some(Err(e)) => return Err(e), // grpc error from the stream
None => break, // closed stream None => break, // closed stream
} }
@@ -371,8 +493,15 @@ impl BrokerService for Broker {
let mut missed_msgs: u64 = 0; let mut missed_msgs: u64 = 0;
loop { loop {
match subscriber.sub_rx.recv().await { match subscriber.sub_rx.recv().await {
Ok(info) => yield info, Ok(info) => {
match info {
Message::SafekeeperTimelineInfo(info) => yield info,
_ => {},
}
BROADCASTED_MESSAGES_TOTAL.inc();
},
Err(RecvError::Lagged(skipped_msg)) => { Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg; missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() { if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full", warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
@@ -392,6 +521,78 @@ impl BrokerService for Broker {
Box::pin(output) as Self::SubscribeSafekeeperInfoStream Box::pin(output) as Self::SubscribeSafekeeperInfoStream
)) ))
} }
type SubscribeByFilterStream =
Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
/// Subscribe to all messages, limited by a filter.
async fn subscribe_by_filter(
&self,
request: Request<SubscribeByFilterRequest>,
) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
let remote_addr = request
.remote_addr()
.expect("TCPConnectInfo inserted by handler");
let proto_filter = request.into_inner();
let ttid_filter = proto_filter
.tenant_timeline_id
.as_ref()
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
let types_set = proto_filter
.types
.iter()
.map(|t| t.r#type)
.collect::<std::collections::HashSet<_>>();
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
let mut warn_interval = time::interval(Duration::from_millis(1000));
let mut missed_msgs: u64 = 0;
loop {
match subscriber.sub_rx.recv().await {
Ok(msg) => {
let msg_type = msg.message_type() as i32;
if types_set.contains(&msg_type) {
yield msg.as_typed_message();
BROADCASTED_MESSAGES_TOTAL.inc();
}
},
Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
missed_msgs = 0;
}
}
Err(RecvError::Closed) => {
// can't happen, we never drop the channel while there is a subscriber
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
}
}
}
};
Ok(Response::new(
Box::pin(output) as Self::SubscribeByFilterStream
))
}
/// Publish one message.
async fn publish_one(
&self,
request: Request<TypedMessage>,
) -> std::result::Result<Response<()>, Status> {
let msg = Message::from(request.into_inner())?;
PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
self.registry.send_msg(&msg)?;
Ok(Response::new(()))
}
} }
// We serve only metrics and healthcheck through http1. // We serve only metrics and healthcheck through http1.
@@ -515,8 +716,8 @@ mod tests {
use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::broadcast::error::TryRecvError;
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo { fn msg(timeline_id: Vec<u8>) -> Message {
SafekeeperTimelineInfo { Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
safekeeper_id: 1, safekeeper_id: 1,
tenant_timeline_id: Some(ProtoTenantTimelineId { tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: vec![0x00; 16], tenant_id: vec![0x00; 16],
@@ -533,7 +734,7 @@ mod tests {
http_connstr: "neon-1-sk-1.local:7677".to_owned(), http_connstr: "neon-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0, local_start_lsn: 0,
availability_zone: None, availability_zone: None,
} })
} }
fn tli_from_u64(i: u64) -> Vec<u8> { fn tli_from_u64(i: u64) -> Vec<u8> {

View File

@@ -1,6 +1,6 @@
//! Broker metrics. //! Broker metrics.
use metrics::{register_int_gauge, IntGauge}; use metrics::{register_int_counter, register_int_gauge, IntCounter, IntGauge};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| { pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
@@ -23,3 +23,35 @@ pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
) )
.expect("Failed to register metric") .expect("Failed to register metric")
}); });
pub static PROCESSED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_processed_messages_total",
"Number of messages received by storage broker, before routing and broadcasting"
)
.expect("Failed to register metric")
});
pub static BROADCASTED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_broadcasted_messages_total",
"Number of messages broadcasted (sent over network) to subscribers"
)
.expect("Failed to register metric")
});
pub static BROADCAST_DROPPED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_broadcast_dropped_messages_total",
"Number of messages dropped due to channel capacity overflow"
)
.expect("Failed to register metric")
});
pub static PUBLISHED_ONEOFF_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_published_oneoff_messages_total",
"Number of one-off messages sent via PublishOne method"
)
.expect("Failed to register metric")
});

View File

@@ -457,7 +457,6 @@ class NeonEnvBuilder:
self.preserve_database_files = preserve_database_files self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate() self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate() self.initial_timeline = initial_timeline or TimelineId.generate()
self.enable_generations = True
self.scrub_on_exit = False self.scrub_on_exit = False
self.test_output_dir = test_output_dir self.test_output_dir = test_output_dir
@@ -677,8 +676,7 @@ class NeonEnvBuilder:
pageserver.stop(immediate=True) pageserver.stop(immediate=True)
if self.env.attachment_service is not None: self.env.attachment_service.stop(immediate=True)
self.env.attachment_service.stop(immediate=True)
cleanup_error = None cleanup_error = None
@@ -772,13 +770,9 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline self.initial_timeline = config.initial_timeline
if config.enable_generations: attachment_service_port = self.port_distributor.get_port()
attachment_service_port = self.port_distributor.get_port() self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}" self.attachment_service: NeonAttachmentService = NeonAttachmentService(self)
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
else:
self.control_plane_api = None
self.attachment_service = None
# Create a config file corresponding to the options # Create a config file corresponding to the options
cfg: Dict[str, Any] = { cfg: Dict[str, Any] = {
@@ -851,8 +845,7 @@ class NeonEnv:
# Start up broker, pageserver and all safekeepers # Start up broker, pageserver and all safekeepers
self.broker.try_start() self.broker.try_start()
if self.attachment_service is not None: self.attachment_service.start()
self.attachment_service.start()
for pageserver in self.pageservers: for pageserver in self.pageservers:
pageserver.start() pageserver.start()
@@ -1834,20 +1827,19 @@ class NeonPageserver(PgProtocol):
""" """
client = self.http_client() client = self.http_client()
return client.tenant_attach( return client.tenant_attach(
tenant_id, config, config_null, generation=self.maybe_get_generation(tenant_id) tenant_id,
config,
config_null,
generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id),
) )
def tenant_detach(self, tenant_id: TenantId): def tenant_detach(self, tenant_id: TenantId):
if self.env.attachment_service is not None: self.env.attachment_service.attach_hook_drop(tenant_id)
self.env.attachment_service.attach_hook_drop(tenant_id)
client = self.http_client() client = self.http_client()
return client.tenant_detach(tenant_id) return client.tenant_detach(tenant_id)
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs): def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
# This API is only for use when generations are enabled
assert self.env.attachment_service is not None
if config["mode"].startswith("Attached") and "generation" not in config: if config["mode"].startswith("Attached") and "generation" not in config:
config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id) config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
@@ -1873,26 +1865,15 @@ class NeonPageserver(PgProtocol):
generation: Optional[int] = None, generation: Optional[int] = None,
) -> TenantId: ) -> TenantId:
if generation is None: if generation is None:
generation = self.maybe_get_generation(tenant_id) generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
client = self.http_client(auth_token=auth_token) client = self.http_client(auth_token=auth_token)
return client.tenant_create(tenant_id, conf, generation=generation) return client.tenant_create(tenant_id, conf, generation=generation)
def tenant_load(self, tenant_id: TenantId): def tenant_load(self, tenant_id: TenantId):
client = self.http_client() client = self.http_client()
return client.tenant_load(tenant_id, generation=self.maybe_get_generation(tenant_id)) return client.tenant_load(
tenant_id, generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
def maybe_get_generation(self, tenant_id: TenantId): )
"""
For tests that would like to use an HTTP client directly instead of using
the `tenant_attach` and `tenant_create` helpers here: issue a generation
number for a tenant.
Returns None if the attachment service is not enabled (legacy mode)
"""
if self.env.attachment_service is not None:
return self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
else:
return None
def append_pageserver_param_overrides( def append_pageserver_param_overrides(

View File

@@ -125,3 +125,51 @@ class TenantId(Id):
class TimelineId(Id): class TimelineId(Id):
def __repr__(self) -> str: def __repr__(self) -> str:
return f'TimelineId("{self.id.hex()}")' return f'TimelineId("{self.id.hex()}")'
# Workaround for compat with python 3.9, which does not have `typing.Self`
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
class TenantShardId:
def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int):
self.tenant_id = tenant_id
self.shard_number = shard_number
self.shard_count = shard_count
assert self.shard_number < self.shard_count or self.shard_count == 0
@classmethod
def parse(cls: Type[TTenantShardId], input) -> TTenantShardId:
if len(input) == 32:
return cls(
tenant_id=TenantId(input),
shard_number=0,
shard_count=0,
)
elif len(input) == 37:
return cls(
tenant_id=TenantId(input[0:32]),
shard_number=int(input[33:35], 16),
shard_count=int(input[35:37], 16),
)
else:
raise ValueError(f"Invalid TenantShardId '{input}'")
def __str__(self):
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
def _tuple(self) -> tuple[TenantId, int, int]:
return (self.tenant_id, self.shard_number, self.shard_count)
def __lt__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self._tuple() < other._tuple()
def __eq__(self, other) -> bool:
if not isinstance(other, type(self)):
return NotImplemented
return self._tuple() == other._tuple()
def __hash__(self) -> int:
return hash(self._tuple())

View File

@@ -61,7 +61,6 @@ def measure_recovery_time(env: NeonCompare):
# of view, but the same as far as the safekeeper/WAL is concerned. To work around that, # of view, but the same as far as the safekeeper/WAL is concerned. To work around that,
# we will explicitly create the tenant in the same generation that it was previously # we will explicitly create the tenant in the same generation that it was previously
# attached in. # attached in.
assert env.env.attachment_service is not None
attach_status = env.env.attachment_service.inspect(tenant_id=env.tenant) attach_status = env.env.attachment_service.inspect(tenant_id=env.tenant)
assert attach_status is not None assert attach_status is not None
(attach_gen, _) = attach_status (attach_gen, _) = attach_status

View File

@@ -136,10 +136,7 @@ def test_no_config(positive_env: NeonEnv, content_type: Optional[str]):
ps_http.tenant_detach(tenant_id) ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()] assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
body = {} body = {"generation": env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)}
gen = env.pageserver.maybe_get_generation(tenant_id)
if gen is not None:
body["generation"] = gen
ps_http.post( ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach", f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",

View File

@@ -87,7 +87,6 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
# #
# Since we're dual-attached, need to tip-off attachment service to treat the one we're # Since we're dual-attached, need to tip-off attachment service to treat the one we're
# about to start as the attached pageserver # about to start as the attached pageserver
assert env.attachment_service is not None
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id) env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
env.pageservers[0].start() env.pageservers[0].start()
env.pageservers[1].stop() env.pageservers[1].stop()

View File

@@ -157,7 +157,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
def get_generation_number(): def get_generation_number():
assert env.attachment_service is not None
attachment = env.attachment_service.inspect(tenant_id) attachment = env.attachment_service.inspect(tenant_id)
assert attachment is not None assert attachment is not None
return attachment[0] return attachment[0]

View File

@@ -72,7 +72,9 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
# create new tenant and check it is also there # create new tenant and check it is also there
tenant_id = TenantId.generate() tenant_id = TenantId.generate()
client.tenant_create(tenant_id, generation=env.pageserver.maybe_get_generation(tenant_id)) client.tenant_create(
tenant_id, generation=env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
)
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()} assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}
timelines = client.timeline_list(tenant_id) timelines = client.timeline_list(tenant_id)

View File

@@ -187,7 +187,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
- After upgrade, the bucket should contain a mixture. - After upgrade, the bucket should contain a mixture.
- In both cases, postgres I/O should work. - In both cases, postgres I/O should work.
""" """
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3, RemoteStorageKind.MOCK_S3,
) )
@@ -196,7 +195,6 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
env.broker.try_start() env.broker.try_start()
for sk in env.safekeepers: for sk in env.safekeepers:
sk.start() sk.start()
assert env.attachment_service is not None
env.attachment_service.start() env.attachment_service.start()
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',)) env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
@@ -262,12 +260,10 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder): def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3, RemoteStorageKind.MOCK_S3,
) )
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
some_other_pageserver = 1234 some_other_pageserver = 1234
ps_http = env.pageserver.http_client() ps_http = env.pageserver.http_client()
@@ -341,7 +337,6 @@ def test_deletion_queue_recovery(
:param validate_before: whether to wait for deletions to be validated before restart. This :param validate_before: whether to wait for deletions to be validated before restart. This
makes them elegible to be executed after restart, if the same node keeps the attachment. makes them elegible to be executed after restart, if the same node keeps the attachment.
""" """
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3, RemoteStorageKind.MOCK_S3,
) )
@@ -405,7 +400,6 @@ def test_deletion_queue_recovery(
if keep_attachment == KeepAttachment.LOSE: if keep_attachment == KeepAttachment.LOSE:
some_other_pageserver = 101010 some_other_pageserver = 101010
assert env.attachment_service is not None
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver) env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
env.pageserver.start() env.pageserver.start()
@@ -453,7 +447,6 @@ def test_deletion_queue_recovery(
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3, RemoteStorageKind.MOCK_S3,
) )
@@ -473,7 +466,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
) )
# Simulate a major incident: the control plane goes offline # Simulate a major incident: the control plane goes offline
assert env.attachment_service is not None
env.attachment_service.stop() env.attachment_service.stop()
# Remember how many validations had happened before the control plane went offline # Remember how many validations had happened before the control plane went offline
@@ -545,7 +537,6 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
and must be constructed using the proper generation for the layer, which may not be the same generation and must be constructed using the proper generation for the layer, which may not be the same generation
that the tenant is running in. that the tenant is running in.
""" """
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3, RemoteStorageKind.MOCK_S3,
) )
@@ -575,7 +566,6 @@ def test_multi_attach(
neon_env_builder: NeonEnvBuilder, neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin, pg_bin: PgBin,
): ):
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3 neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3, remote_storage_kind=RemoteStorageKind.MOCK_S3,

View File

@@ -9,9 +9,7 @@ from fixtures.utils import wait_until
# Test restarting page server, while safekeeper and compute node keep # Test restarting page server, while safekeeper and compute node keep
# running. # running.
@pytest.mark.parametrize("generations", [True, False]) def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool):
neon_env_builder.enable_generations = generations
neon_env_builder.enable_pageserver_remote_storage(s3_storage()) neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit() neon_env_builder.enable_scrub_on_exit()

View File

@@ -57,13 +57,11 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
states are valid, so that we may test it in this way: the API should always states are valid, so that we may test it in this way: the API should always
work as long as the tenant exists. work as long as the tenant exists.
""" """
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3 neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3, remote_storage_kind=RemoteStorageKind.MOCK_S3,
) )
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
pageservers = env.pageservers pageservers = env.pageservers
list([p.http_client() for p in pageservers]) list([p.http_client() for p in pageservers])
@@ -210,13 +208,11 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
""" """
Test the sequence of location states that are used in a live migration. Test the sequence of location states that are used in a live migration.
""" """
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 2 neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage( neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3, remote_storage_kind=RemoteStorageKind.MOCK_S3,
) )
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
tenant_id = env.initial_tenant tenant_id = env.initial_tenant
timeline_id = env.initial_timeline timeline_id = env.initial_timeline

View File

@@ -60,8 +60,6 @@ def test_remote_storage_backup_and_restore(
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.enable_generations = generations
# Exercise retry code path by making all uploads and downloads fail for the # Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check # first time. The retries print INFO-messages to the log; we will check
# that they are present after the test. # that they are present after the test.

View File

@@ -263,15 +263,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
ps_http, env.initial_tenant, timeline_id, iterations=iterations ps_http, env.initial_tenant, timeline_id, iterations=iterations
) )
if failpoint == "timeline-delete-after-index-delete":
m = ps_http.get_metrics()
assert (
m.query_one(
"remote_storage_s3_request_seconds_count",
filter={"request_type": "get_object", "result": "ok"},
).value
== 1 # index part for initial timeline
)
elif check is Check.RETRY_WITHOUT_RESTART: elif check is Check.RETRY_WITHOUT_RESTART:
# this should succeed # this should succeed
# this also checks that delete can be retried even when timeline is in Broken state # this also checks that delete can be retried even when timeline is in Broken state