Compare commits

...

7 Commits

Author SHA1 Message Date
John Spray
fec5ac5838 storage controller: a more comprehensive log on tenant creation 2025-01-17 09:54:26 +00:00
John Spray
da13154791 storcon: revise fill logic to prioritize AZ (#10411)
## Problem

Node fills were limited to moving (total shards / node_count) shards. In
systems that aren't perfectly balanced already, that leads us to skip
migrating some of the shards that belong on this node, generating work
for the optimizer later to gradually move them back.

## Summary of changes

- Where a shard has a preferred AZ and is currently attached outside
this AZ, then always promote it during fill, irrespective of target fill
count
2025-01-16 17:33:46 +00:00
John Spray
2e13a3aa7a storage controller: handle legacy TenantConf in consistency_check (#10422)
## Problem

We were comparing serialized configs from the database with serialized
configs from memory. If fields have been added/removed to TenantConfig,
this generates spurious consistency errors. This is fine in test
environments, but limits the usefulness of this debug API in the field.

Closes: https://github.com/neondatabase/neon/issues/10369

## Summary of changes

- Do a decode/encode cycle on the config before comparing it, so that it
will have exactly the expected fields.
2025-01-16 16:56:44 +00:00
Alex Chi Z.
cccc196848 refactor(pageserver): make partitioning an ArcSwap (#10377)
## Problem

gc-compaction needs the partitioning data to decide the job split. This
refactor allows concurrent access/computing the partitioning.

## Summary of changes

Make `partitioning` an ArcSwap so that others can access the
partitioning while we compute it. Fully eliminate the `repartition is
called concurrently` warning when gc-compaction is going on.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-16 15:33:37 +00:00
Arpad Müller
e436dcad57 Rename "disabled" safekeeper scheduling policy to "pause" (#10410)
Rename the safekeeper scheduling policy "disabled" to "pause".

A rename was requested in
https://github.com/neondatabase/neon/pull/10400#discussion_r1916259124,
as the "disabled" policy is meant to be analogous to the "pause" policy
for pageservers.

Also simplify the `SkSchedulingPolicyArg::from_str` function, relying on
the `from_str` implementation of `SkSchedulingPolicy`. Latter is used
for the database format as well, so it is quite stable. If we ever want
to change the UI, we'll need to duplicate the function again but this is
cheap.
2025-01-16 14:30:49 +00:00
John Spray
21d7b6a258 tests: refactor test_tenant_delete_races_timeline_creation (#10425)
## Problem

Threads spawned in `test_tenant_delete_races_timeline_creation` are not
joined before the test ends, and can generate
`PytestUnhandledThreadExceptionWarning` in other tests.


https://neon-github-public-dev.s3.amazonaws.com/reports/pr-10419/12805365523/index.html#/testresult/53a72568acd04dbd

## Summary of changes

- Wrap threads in ThreadPoolExecutor which will join them before the
test ends
- Remove a spurious deletion call -- the background thread doing
deletion ought to succeed.
2025-01-16 14:11:33 +00:00
JC Grünhage
86dbc44db1 CI: Run check-codestyle-rust as part of pre-merge-checks (#10387)
## Problem

When multiple changes are grouped in a merge group to be merged as part
of the merge queue, the changes might individually pass
`check-codestyle-rust` but not in their combined form.

## Summary of changes

- Move `check-codestyle-rust` into a reusable workflow that is called
from it's previous location in `build_and_test.yml`, and additionally
call it from `pre_merge_checks.yml`. The additional call does not run on
ARM, only x86, to ensure the merge queue continues being responsive.
- Trigger `pre_merge_checks.yml` on PRs that change any of the workflows
running in `pre_merge_checks.yml`, so that we get feedback on those
early an not only after trying to merge those changes.
2025-01-16 09:20:24 +00:00
14 changed files with 339 additions and 192 deletions

View File

@@ -0,0 +1,91 @@
name: Check Codestyle Rust
on:
workflow_call:
inputs:
build-tools-image:
description: "build-tools image"
required: true
type: string
archs:
description: "Json array of architectures to run on"
type: string
defaults:
run:
shell: bash -euxo pipefail {0}
jobs:
check-codestyle-rust:
strategy:
matrix:
arch: ${{ fromJson(inputs.archs) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
- name: Cache cargo deps
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
!~/.cargo/registry/src
~/.cargo/git
target
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
# Some of our rust modules use FFI and need those to be checked
- name: Get postgres headers
run: make postgres-headers -j$(nproc)
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
#
# The only difference between "clippy --debug" and "clippy --release" is that in --release mode,
# #[cfg(debug_assertions)] blocks are not built. It's not worth building everything for second
# time just for that, so skip "clippy --release".
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
echo "No clippy args found in .neon_clippy_args"
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --features default --ignore-unknown-features --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items
env:
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting
if: ${{ !cancelled() }}
run: cargo fmt --all -- --check
# https://github.com/facebookincubator/cargo-guppy/tree/bec4e0eb29dcd1faac70b1b5360267fc02bf830e/tools/cargo-hakari#2-keep-the-workspace-hack-up-to-date-in-ci
- name: Check rust dependencies
if: ${{ !cancelled() }}
run: |
cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date
cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack
# https://github.com/EmbarkStudios/cargo-deny
- name: Check rust licenses/bans/advisories/sources
if: ${{ !cancelled() }}
run: cargo deny check --hide-inclusion-graph

View File

@@ -164,77 +164,11 @@ jobs:
check-codestyle-rust:
needs: [ check-permissions, build-build-tools-image ]
strategy:
matrix:
arch: [ x64, arm64 ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'small-arm64' || 'small')) }}
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
- name: Cache cargo deps
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
!~/.cargo/registry/src
~/.cargo/git
target
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
# Some of our rust modules use FFI and need those to be checked
- name: Get postgres headers
run: make postgres-headers -j$(nproc)
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
#
# The only difference between "clippy --debug" and "clippy --release" is that in --release mode,
# #[cfg(debug_assertions)] blocks are not built. It's not worth building everything for second
# time just for that, so skip "clippy --release".
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
echo "No clippy args found in .neon_clippy_args"
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --features default --ignore-unknown-features --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items
env:
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting
if: ${{ !cancelled() }}
run: cargo fmt --all -- --check
# https://github.com/facebookincubator/cargo-guppy/tree/bec4e0eb29dcd1faac70b1b5360267fc02bf830e/tools/cargo-hakari#2-keep-the-workspace-hack-up-to-date-in-ci
- name: Check rust dependencies
if: ${{ !cancelled() }}
run: |
cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date
cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack
# https://github.com/EmbarkStudios/cargo-deny
- name: Check rust licenses/bans/advisories/sources
if: ${{ !cancelled() }}
run: cargo deny check --hide-inclusion-graph
uses: ./.github/workflows/_check-codestyle-rust.yml
with:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
archs: '["x64", "arm64"]'
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]

View File

@@ -1,6 +1,12 @@
name: Pre-merge checks
on:
pull_request:
paths:
- .github/workflows/_check-codestyle-python.yml
- .github/workflows/_check-codestyle-rust.yml
- .github/workflows/build-build-tools-image.yml
- .github/workflows/pre-merge-checks.yml
merge_group:
branches:
- main
@@ -17,8 +23,10 @@ jobs:
runs-on: ubuntu-22.04
outputs:
python-changed: ${{ steps.python-src.outputs.any_changed }}
rust-changed: ${{ steps.rust-src.outputs.any_changed }}
steps:
- uses: actions/checkout@v4
- uses: tj-actions/changed-files@4edd678ac3f81e2dc578756871e4d00c19191daf # v45.0.4
id: python-src
with:
@@ -30,11 +38,25 @@ jobs:
poetry.lock
pyproject.toml
- uses: tj-actions/changed-files@4edd678ac3f81e2dc578756871e4d00c19191daf # v45.0.4
id: rust-src
with:
files: |
.github/workflows/_check-codestyle-rust.yml
.github/workflows/build-build-tools-image.yml
.github/workflows/pre-merge-checks.yml
**/**.rs
**/Cargo.toml
Cargo.toml
Cargo.lock
- name: PRINT ALL CHANGED FILES FOR DEBUG PURPOSES
env:
PYTHON_CHANGED_FILES: ${{ steps.python-src.outputs.all_changed_files }}
RUST_CHANGED_FILES: ${{ steps.rust-src.outputs.all_changed_files }}
run: |
echo "${PYTHON_CHANGED_FILES}"
echo "${RUST_CHANGED_FILES}"
build-build-tools-image:
if: needs.get-changed-files.outputs.python-changed == 'true'
@@ -55,6 +77,16 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm-x64
secrets: inherit
check-codestyle-rust:
if: needs.get-changed-files.outputs.rust-changed == 'true'
needs: [ get-changed-files, build-build-tools-image ]
uses: ./.github/workflows/_check-codestyle-rust.yml
with:
# `-bookworm-x64` suffix should match the combination in `build-build-tools-image`
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm-x64
archs: '["x64"]'
secrets: inherit
# To get items from the merge queue merged into main we need to satisfy "Status checks that are required".
# Currently we require 2 jobs (checks with exact name):
# - conclusion
@@ -67,6 +99,7 @@ jobs:
needs:
- get-changed-files
- check-codestyle-python
- check-codestyle-rust
runs-on: ubuntu-22.04
steps:
- name: Create fake `neon-cloud-e2e` check

View File

@@ -298,14 +298,7 @@ impl FromStr for SkSchedulingPolicyArg {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"active" => Ok(Self(SkSchedulingPolicy::Active)),
"disabled" => Ok(Self(SkSchedulingPolicy::Disabled)),
"decomissioned" => Ok(Self(SkSchedulingPolicy::Decomissioned)),
_ => Err(anyhow::anyhow!(
"Unknown scheduling policy '{s}', try active,disabled,decomissioned"
)),
}
SkSchedulingPolicy::from_str(s).map(Self)
}
}

View File

@@ -324,7 +324,7 @@ impl From<NodeSchedulingPolicy> for String {
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum SkSchedulingPolicy {
Active,
Disabled,
Pause,
Decomissioned,
}
@@ -334,9 +334,13 @@ impl FromStr for SkSchedulingPolicy {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"active" => Self::Active,
"disabled" => Self::Disabled,
"pause" => Self::Pause,
"decomissioned" => Self::Decomissioned,
_ => return Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
_ => {
return Err(anyhow::anyhow!(
"Unknown scheduling policy '{s}', try active,pause,decomissioned"
))
}
})
}
}
@@ -346,7 +350,7 @@ impl From<SkSchedulingPolicy> for String {
use SkSchedulingPolicy::*;
match value {
Active => "active",
Disabled => "disabled",
Pause => "pause",
Decomissioned => "decomissioned",
}
.to_string()

View File

@@ -0,0 +1,54 @@
//! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes
//! don't block reads.
use arc_swap::ArcSwap;
use std::sync::Arc;
use tokio::sync::TryLockError;
pub struct GuardArcSwap<T> {
inner: ArcSwap<T>,
guard: tokio::sync::Mutex<()>,
}
pub struct Guard<'a, T> {
_guard: tokio::sync::MutexGuard<'a, ()>,
inner: &'a ArcSwap<T>,
}
impl<T> GuardArcSwap<T> {
pub fn new(inner: T) -> Self {
Self {
inner: ArcSwap::new(Arc::new(inner)),
guard: tokio::sync::Mutex::new(()),
}
}
pub fn read(&self) -> Arc<T> {
self.inner.load_full()
}
pub async fn write_guard(&self) -> Guard<'_, T> {
Guard {
_guard: self.guard.lock().await,
inner: &self.inner,
}
}
pub fn try_write_guard(&self) -> Result<Guard<'_, T>, TryLockError> {
let guard = self.guard.try_lock()?;
Ok(Guard {
_guard: guard,
inner: &self.inner,
})
}
}
impl<T> Guard<'_, T> {
pub fn read(&self) -> Arc<T> {
self.inner.load_full()
}
pub fn write(&mut self, value: T) {
self.inner.store(Arc::new(value));
}
}

View File

@@ -98,6 +98,8 @@ pub mod try_rcu;
pub mod pprof;
pub mod guard_arc_swap;
// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;

View File

@@ -51,7 +51,9 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
fs_ext, pausable_failpoint,
fs_ext,
guard_arc_swap::GuardArcSwap,
pausable_failpoint,
postgres_client::PostgresClientProtocol,
sync::gate::{Gate, GateGuard},
};
@@ -353,8 +355,8 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning? Make it pub to test cases.
pub(super) partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// The repartitioning result. Allows a single writer and multiple readers.
pub(crate) partitioning: GuardArcSwap<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -2340,7 +2342,8 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: tokio::sync::Mutex::new((
partitioning: GuardArcSwap::new((
(KeyPartitioning::new(), KeyPartitioning::new().into_sparse()),
Lsn(0),
)),
@@ -4028,18 +4031,15 @@ impl Timeline {
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
let Ok(mut guard) = self.partitioning.try_write_guard() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
// Note that there are a third "caller" that will take the `partitioning` lock. It is `gc_compaction_split_jobs` for
// gc-compaction where it uses the repartition data to determine the split jobs. In the future, it might use its own
// heuristics, but for now, we should allow concurrent access to it and let the caller retry compaction.
return Err(CompactionError::Other(anyhow!(
"repartition() called concurrently, this is rare and a retry should be fine"
"repartition() called concurrently"
)));
};
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read();
if lsn < *partition_lsn {
return Err(CompactionError::Other(anyhow!(
"repartition() called with LSN going backwards, this should not happen"
@@ -4067,9 +4067,9 @@ impl Timeline {
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
}; // no partitioning for metadata keys for now
*partitioning_guard = ((dense_partitioning, sparse_partitioning), lsn);
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
let result = ((dense_partitioning, sparse_partitioning), lsn);
guard.write(result.clone());
Ok(result)
}
// Is it time to create a new image layer for the given partition?

View File

@@ -2146,12 +2146,7 @@ impl Timeline {
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = {
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock during gc-compaction");
};
partition.clone()
};
let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,

View File

@@ -0,0 +1,2 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'disabled';
UPDATE safekeepers SET scheduling_policy = 'disabled' WHERE scheduling_policy = 'pause';

View File

@@ -0,0 +1,2 @@
ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'pause';
UPDATE safekeepers SET scheduling_policy = 'pause' WHERE scheduling_policy = 'disabled';

View File

@@ -2109,12 +2109,6 @@ impl Service {
create_req.new_tenant_id.tenant_id
};
tracing::info!(
"Creating tenant {}, shard_count={:?}",
create_req.new_tenant_id,
create_req.shard_parameters.count,
);
let create_ids = (0..create_req.shard_parameters.count.count())
.map(|i| TenantShardId {
tenant_id,
@@ -2155,6 +2149,14 @@ impl Service {
}
};
tracing::info!(
generation=?initial_generation,
preferred_az_id=?preferred_az_id,
tenant_id=%create_req.new_tenant_id,
shard_count=?create_req.shard_parameters.count,
"Creating tenant",
);
// Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
// to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
// during the creation, rather than risking leaving orphan objects in S3.
@@ -5411,6 +5413,15 @@ impl Service {
expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
// Because JSON contents of persistent tenants might disagree with the fields in current `TenantConfig`
// definition, we will do an encode/decode cycle to ensure any legacy fields are dropped and any new
// fields are added, before doing a comparison.
for tsp in &mut persistent_shards {
let config: TenantConfig = serde_json::from_str(&tsp.config)
.map_err(|e| ApiError::InternalServerError(e.into()))?;
tsp.config = serde_json::to_string(&config).expect("Encoding config is infallible");
}
if persistent_shards != expect_shards {
tracing::error!("Consistency check failed on shards.");
@@ -7270,19 +7281,14 @@ impl Service {
Ok(())
}
/// Create a node fill plan (pick secondaries to promote) that meets the following requirements:
/// 1. The node should be filled until it reaches the expected cluster average of
/// attached shards. If there are not enough secondaries on the node, the plan stops early.
/// 2. Select tenant shards to promote such that the number of attached shards is balanced
/// throughout the cluster. We achieve this by picking tenant shards from each node,
/// starting from the ones with the largest number of attached shards, until the node
/// reaches the expected cluster average.
/// 3. Avoid promoting more shards of the same tenant than required. The upper bound
/// for the number of tenants from the same shard promoted to the node being filled is:
/// shard count for the tenant divided by the number of nodes in the cluster.
/// Create a node fill plan (pick secondaries to promote), based on:
/// 1. Shards which have a secondary on this node, and this node is in their home AZ, and are currently attached to a node
/// outside their home AZ, should be migrated back here.
/// 2. If after step 1 we have not migrated enough shards for this node to have its fair share of
/// attached shards, we will promote more shards from the nodes with the most attached shards, unless
/// those shards have a home AZ that doesn't match the node we're filling.
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
let mut locked = self.inner.write().unwrap();
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
let (nodes, tenants, _scheduler) = locked.parts_mut();
let node_az = nodes
@@ -7291,53 +7297,79 @@ impl Service {
.get_availability_zone_id()
.clone();
let mut tids_by_node = tenants
.iter_mut()
.filter_map(|(tid, tenant_shard)| {
if !matches!(
tenant_shard.get_scheduling_policy(),
ShardSchedulingPolicy::Active
) {
// Only include tenants in fills if they have a normal (Active) scheduling policy. We
// even exclude Essential, because moving to fill a node is not essential to keeping this
// tenant available.
return None;
}
// The tenant shard IDs that we plan to promote from secondary to attached on this node
let mut plan = Vec::new();
// AZ check: when filling nodes after a restart, our intent is to move _back_ the
// shards which belong on this node, not to promote shards whose scheduling preference
// would be on their currently attached node. So will avoid promoting shards whose
// home AZ doesn't match the AZ of the node we're filling.
match tenant_shard.preferred_az() {
None => {
// Shard doesn't have an AZ preference: it is elegible to be moved.
}
Some(az) if az == &node_az => {
// This shard's home AZ is equal to the node we're filling: it is
// elegible to be moved: fall through;
}
Some(_) => {
// This shard's home AZ is somewhere other than the node we're filling:
// do not include it in the fill plan.
return None;
}
}
// Collect shards which do not have a preferred AZ & are elegible for moving in stage 2
let mut free_tids_by_node: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
if tenant_shard.intent.get_secondary().contains(&node_id) {
// Don't respect AZ preferences if there is only one AZ. This comes up in tests, but it could
// conceivably come up in real life if deploying a single-AZ region intentionally.
let respect_azs = nodes
.values()
.map(|n| n.get_availability_zone_id())
.unique()
.count()
> 1;
// Step 1: collect all shards that we are required to migrate back to this node because their AZ preference
// requires it.
for (tsid, tenant_shard) in tenants {
if !tenant_shard.intent.get_secondary().contains(&node_id) {
// Shard doesn't have a secondary on this node, ignore it.
continue;
}
// AZ check: when filling nodes after a restart, our intent is to move _back_ the
// shards which belong on this node, not to promote shards whose scheduling preference
// would be on their currently attached node. So will avoid promoting shards whose
// home AZ doesn't match the AZ of the node we're filling.
match tenant_shard.preferred_az() {
_ if !respect_azs => {
if let Some(primary) = tenant_shard.intent.get_attached() {
return Some((*primary, *tid));
free_tids_by_node.entry(*primary).or_default().push(*tsid);
}
}
None => {
// Shard doesn't have an AZ preference: it is elegible to be moved, but we
// will only do so if our target shard count requires it.
if let Some(primary) = tenant_shard.intent.get_attached() {
free_tids_by_node.entry(*primary).or_default().push(*tsid);
}
}
Some(az) if az == &node_az => {
// This shard's home AZ is equal to the node we're filling: it should
// be moved back to this node as part of filling, unless its currently
// attached location is also in its home AZ.
if let Some(primary) = tenant_shard.intent.get_attached() {
if nodes
.get(primary)
.expect("referenced node must exist")
.get_availability_zone_id()
!= tenant_shard
.preferred_az()
.expect("tenant must have an AZ preference")
{
plan.push(*tsid)
}
} else {
plan.push(*tsid)
}
}
Some(_) => {
// This shard's home AZ is somewhere other than the node we're filling,
// it may not be moved back to this node as part of filling. Ignore it
}
}
}
None
})
.into_group_map();
// Step 2: also promote any AZ-agnostic shards as required to achieve the target number of attachments
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
let expected_attached = locked.scheduler.expected_attached_shard_count();
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
let mut promoted_per_tenant: HashMap<TenantId, usize> = HashMap::new();
let mut plan = Vec::new();
for (node_id, attached) in nodes_by_load {
let available = locked.nodes.get(&node_id).is_some_and(|n| n.is_available());
@@ -7346,7 +7378,7 @@ impl Service {
}
if plan.len() >= fill_requirement
|| tids_by_node.is_empty()
|| free_tids_by_node.is_empty()
|| attached <= expected_attached
{
break;
@@ -7358,7 +7390,7 @@ impl Service {
let mut remove_node = false;
while take > 0 {
match tids_by_node.get_mut(&node_id) {
match free_tids_by_node.get_mut(&node_id) {
Some(tids) => match tids.pop() {
Some(tid) => {
let max_promote_for_tenant = std::cmp::max(
@@ -7384,7 +7416,7 @@ impl Service {
}
if remove_node {
tids_by_node.remove(&node_id);
free_tids_by_node.remove(&node_id);
}
}

View File

@@ -3211,7 +3211,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
# some small tests for the scheduling policy querying and returning APIs
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Disabled"
assert newest_info["scheduling_policy"] == "Pause"
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
import pytest
@@ -253,29 +254,8 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder)
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "pause"))
def timeline_create():
try:
ps_http.timeline_create(env.pg_version, tenant_id, TimelineId.generate(), timeout=1)
raise RuntimeError("creation succeeded even though it shouldn't")
except ReadTimeout:
pass
Thread(target=timeline_create).start()
def hit_initdb_upload_failpoint():
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")
wait_until(hit_initdb_upload_failpoint)
def creation_connection_timed_out():
env.pageserver.assert_log_contains(
"POST.*/timeline.* request was dropped before completing"
)
# Wait so that we hit the timeout and the connection is dropped
# (But timeline creation still continues)
wait_until(creation_connection_timed_out)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "pause"))
ps_http.timeline_create(env.pg_version, tenant_id, TimelineId.generate(), timeout=1)
raise RuntimeError("creation succeeded even though it shouldn't")
def tenant_delete():
def tenant_delete_inner():
@@ -283,21 +263,46 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder)
wait_until(tenant_delete_inner)
Thread(target=tenant_delete).start()
# We will spawn background threads for timeline creation and tenant deletion. They will both
# get blocked on our failpoint.
with ThreadPoolExecutor(max_workers=1) as executor:
create_fut = executor.submit(timeline_create)
def deletion_arrived():
env.pageserver.assert_log_contains(
f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause"
)
def hit_initdb_upload_failpoint():
env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}")
wait_until(deletion_arrived)
wait_until(hit_initdb_upload_failpoint)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "off"))
def creation_connection_timed_out():
env.pageserver.assert_log_contains(
"POST.*/timeline.* request was dropped before completing"
)
# Disable the failpoint and wait for deletion to finish
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "off"))
# Wait so that we hit the timeout and the connection is dropped
# (But timeline creation still continues)
wait_until(creation_connection_timed_out)
ps_http.tenant_delete(tenant_id)
with pytest.raises(ReadTimeout):
# Our creation failed from the client's point of view.
create_fut.result()
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "pause"))
delete_fut = executor.submit(tenant_delete)
def deletion_arrived():
env.pageserver.assert_log_contains(
f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause"
)
wait_until(deletion_arrived)
ps_http.configure_failpoints((DELETE_BEFORE_CLEANUP_FAILPOINT, "off"))
# Disable the failpoint and wait for deletion to finish
ps_http.configure_failpoints((BEFORE_INITDB_UPLOAD_FAILPOINT, "off"))
delete_fut.result()
# Physical deletion should have happened
assert_prefix_empty(