Merge branch 'main' into problame/move-pageserver-config-into-api-crate

This commit is contained in:
Christian Schwarz
2024-05-03 15:20:24 +00:00
20 changed files with 801 additions and 163 deletions

View File

@@ -65,7 +65,7 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
&& mv s5cmd /usr/local/bin/s5cmd
# LLVM
ENV LLVM_VERSION=17
ENV LLVM_VERSION=18
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/bullseye/ llvm-toolchain-bullseye-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt update \
@@ -141,7 +141,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.77.0
ENV RUSTC_VERSION=1.78.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \

View File

@@ -490,7 +490,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
"rename_db" => {
let new_name = op.new_name.as_ref().unwrap();
if existing_dbs.get(&op.name).is_some() {
if existing_dbs.contains_key(&op.name) {
let query: String = format!(
"ALTER DATABASE {} RENAME TO {}",
op.name.pg_quote(),

View File

@@ -382,7 +382,10 @@ impl LocalEnv {
// Find neon binaries.
if env.neon_distrib_dir == Path::new("") {
env.neon_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
env::current_exe()?
.parent()
.unwrap()
.clone_into(&mut env.neon_distrib_dir);
}
if env.pageservers.is_empty() {

View File

@@ -80,7 +80,7 @@ impl Key {
}
/// Get the range of metadata keys.
pub fn metadata_key_range() -> Range<Self> {
pub const fn metadata_key_range() -> Range<Self> {
Key {
field1: METADATA_KEY_BEGIN_PREFIX,
field2: 0,
@@ -572,14 +572,17 @@ pub const AUX_FILES_KEY: Key = Key {
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.
/// Non inherited range for vectored get.
pub const NON_INHERITED_RANGE: Range<Key> = AUX_FILES_KEY..AUX_FILES_KEY.next();
/// Sparse keyspace range for vectored get. Missing key error will be ignored for this range.
pub const NON_INHERITED_SPARSE_RANGE: Range<Key> = Key::metadata_key_range();
// AUX_FILES currently stores only data for logical replication (slots etc), and
// we don't preserve these on a branch because safekeepers can't follow timeline
// switch (and generally it likely should be optional), so ignore these.
#[inline(always)]
pub fn is_inherited_key(key: Key) -> bool {
!NON_INHERITED_RANGE.contains(&key)
!NON_INHERITED_RANGE.contains(&key) && !NON_INHERITED_SPARSE_RANGE.contains(&key)
}
#[inline(always)]

View File

@@ -97,7 +97,7 @@ impl ShardCount {
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known
/// as `TenantShardId::unsharded`.
/// as [`TenantShardId::unsharded`].
///
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
@@ -116,7 +116,9 @@ impl ShardCount {
self.0
}
///
/// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
/// uses the legacy format for `TenantShardId`. See also the documentation for
/// [`Self::count`].
pub fn is_unsharded(&self) -> bool {
self.0 == 0
}

View File

@@ -284,6 +284,34 @@ impl Client {
Ok((status, progress))
}
pub async fn tenant_secondary_status(
&self,
tenant_shard_id: TenantShardId,
) -> Result<SecondaryProgress> {
let path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/secondary/status",
self.mgmt_api_endpoint, tenant_shard_id
))
.expect("Cannot build URL");
self.request(Method::GET, path, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
let path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/heatmap_upload",
self.mgmt_api_endpoint, tenant_id
))
.expect("Cannot build URL");
self.request(Method::POST, path, ()).await?;
Ok(())
}
pub async fn location_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -2160,6 +2160,27 @@ async fn secondary_download_handler(
json_response(status, progress)
}
async fn secondary_status_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let Some(secondary_tenant) = state
.tenant_manager
.get_secondary_tenant_shard(tenant_shard_id)
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Shard {} not found", tenant_shard_id).into(),
));
};
let progress = secondary_tenant.progress.lock().unwrap().clone();
json_response(StatusCode::OK, progress)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -2521,6 +2542,9 @@ pub fn make_router(
.put("/v1/deletion_queue/flush", |r| {
api_handler(r, deletion_queue_flush)
})
.get("/v1/tenant/:tenant_shard_id/secondary/status", |r| {
api_handler(r, secondary_status_handler)
})
.post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
api_handler(r, secondary_download_handler)
})

View File

@@ -194,6 +194,11 @@ pub(crate) struct GetVectoredLatency {
map: EnumMap<TaskKind, Option<Histogram>>,
}
#[allow(dead_code)]
pub(crate) struct ScanLatency {
map: EnumMap<TaskKind, Option<Histogram>>,
}
impl GetVectoredLatency {
// Only these task types perform vectored gets. Filter all other tasks out to reduce total
// cardinality of the metric.
@@ -204,6 +209,48 @@ impl GetVectoredLatency {
}
}
impl ScanLatency {
// Only these task types perform vectored gets. Filter all other tasks out to reduce total
// cardinality of the metric.
const TRACKED_TASK_KINDS: [TaskKind; 1] = [TaskKind::PageRequestHandler];
pub(crate) fn for_task_kind(&self, task_kind: TaskKind) -> Option<&Histogram> {
self.map[task_kind].as_ref()
}
}
pub(crate) struct ScanLatencyOngoingRecording<'a> {
parent: &'a Histogram,
start: std::time::Instant,
}
impl<'a> ScanLatencyOngoingRecording<'a> {
pub(crate) fn start_recording(parent: &'a Histogram) -> ScanLatencyOngoingRecording<'a> {
let start = Instant::now();
ScanLatencyOngoingRecording { parent, start }
}
pub(crate) fn observe(self, throttled: Option<Duration>) {
let elapsed = self.start.elapsed();
let ex_throttled = if let Some(throttled) = throttled {
elapsed.checked_sub(throttled)
} else {
Some(elapsed)
};
if let Some(ex_throttled) = ex_throttled {
self.parent.observe(ex_throttled.as_secs_f64());
} else {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!("error deducting time spent throttled; this message is logged at a global rate limit");
});
}
}
}
pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_get_vectored_seconds",
@@ -227,6 +274,29 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
}
});
pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_scan_seconds",
"Time spent in scan, excluding time spent in timeline_get_throttle.",
&["task_kind"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric");
ScanLatency {
map: EnumMap::from_array(std::array::from_fn(|task_kind_idx| {
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind_idx);
if ScanLatency::TRACKED_TASK_KINDS.contains(&task_kind) {
let task_kind = task_kind.into();
Some(inner.with_label_values(&[task_kind]))
} else {
None
}
})),
}
});
pub(crate) struct PageCacheMetricsForTaskKind {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_immutable: IntCounter,

View File

@@ -279,7 +279,7 @@ impl Timeline {
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
let exists = dir.rels.contains(&(tag.relnode, tag.forknum));
Ok(exists)
}
Err(e) => Err(PageReconstructError::from(e)),
@@ -379,7 +379,7 @@ impl Timeline {
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
let exists = dir.segments.get(&segno).is_some();
let exists = dir.segments.contains(&segno);
Ok(exists)
}
Err(e) => Err(PageReconstructError::from(e)),
@@ -1143,21 +1143,22 @@ impl<'a> DatadirModification<'a> {
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
// Didn't exist. Update dbdir
dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
let mut rel_dir =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {

View File

@@ -3925,7 +3925,7 @@ mod tests {
use crate::DEFAULT_PG_VERSION;
use bytes::BytesMut;
use hex_literal::hex;
use pageserver_api::key::NON_INHERITED_RANGE;
use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::CompactionAlgorithm;
use rand::{thread_rng, Rng};
@@ -4791,15 +4791,7 @@ mod tests {
.await;
let images = vectored_res?;
let mut key = NON_INHERITED_RANGE.start;
while key < NON_INHERITED_RANGE.end {
assert!(matches!(
images[&key],
Err(PageReconstructError::MissingKey(_))
));
key = key.next();
}
assert!(images.is_empty());
Ok(())
}
@@ -5500,4 +5492,116 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_metadata_scan() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_scan")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
const NUM_KEYS: usize = 1000;
const STEP: usize = 100; // random update + scan base_key + idx * STEP
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
let mut test_key = base_key;
// Track when each page was last modified. Used to assert that
// a read sees the latest page version.
let mut updated = [Lsn(0); NUM_KEYS];
let mut lsn = Lsn(0x10);
#[allow(clippy::needless_range_loop)]
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
updated[blknum] = lsn;
drop(writer);
}
let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32));
for _ in 0..10 {
// Read all the blocks
for (blknum, last_lsn) in updated.iter().enumerate() {
test_key.field6 = (blknum * STEP) as u32;
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", blknum, last_lsn))
);
}
let mut cnt = 0;
for (key, value) in tline
.get_vectored_impl(
keyspace.clone(),
lsn,
ValuesReconstructState::default(),
&ctx,
)
.await?
{
let blknum = key.field6 as usize;
let value = value?;
assert!(blknum % STEP == 0);
let blknum = blknum / STEP;
assert_eq!(
value,
test_img(&format!("{} at {}", blknum, updated[blknum]))
);
cnt += 1;
}
assert_eq!(cnt, NUM_KEYS);
for _ in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = (blknum * STEP) as u32;
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
updated[blknum] = lsn;
}
// Perform a cycle of flush, compact, and GC
let cutoff = tline.get_last_record_lsn();
tline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}
Ok(())
}
}

View File

@@ -588,7 +588,7 @@ impl LayerMap {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
current_key = change_key;
current_val = change_val.clone();
current_val.clone_from(&change_val);
}
// Add the final interval
@@ -672,12 +672,12 @@ impl LayerMap {
// Loop through the delta coverage and recurse on each part
for (change_key, change_val) in version.delta_coverage.range(start..end) {
// If there's a relevant delta in this part, add 1 and recurse down
if let Some(val) = current_val {
if let Some(val) = &current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let base_count = Self::is_reimage_worthy(val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
max_stacked_deltas = std::cmp::max(
@@ -689,17 +689,17 @@ impl LayerMap {
}
current_key = change_key;
current_val = change_val.clone();
current_val.clone_from(&change_val);
}
// Consider the last part
if let Some(val) = current_val {
if let Some(val) = &current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(end);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let base_count = Self::is_reimage_worthy(val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
max_stacked_deltas = std::cmp::max(

View File

@@ -16,7 +16,10 @@ use enumset::EnumSet;
use fail::fail_point;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{AUX_FILES_KEY, NON_INHERITED_RANGE},
key::{
AUX_FILES_KEY, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
NON_INHERITED_SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
@@ -55,7 +58,6 @@ use std::{
ops::ControlFlow,
};
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
@@ -77,6 +79,9 @@ use crate::{
use crate::{
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
};
use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
@@ -885,16 +890,15 @@ impl Timeline {
value
}
}
None => {
error!(
"Expected {}, but singular vectored get returned nothing",
key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get did not return a value for {}",
key
)))
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(0),
request_lsn: lsn,
ancestor_lsn: None,
traversal_path: Vec::new(),
backtrace: None,
})),
}
}
}
@@ -1044,6 +1048,70 @@ impl Timeline {
res
}
/// Scan the keyspace and return all existing key-values in the keyspace. This currently uses vectored
/// get underlying. Normal vectored get would throw an error when a key in the keyspace is not found
/// during the search, but for the scan interface, it returns all existing key-value pairs, and does
/// not expect each single key in the key space will be found. The semantics is closer to the RocksDB
/// scan iterator interface. We could optimize this interface later to avoid some checks in the vectored
/// get path to maintain and split the probing and to-be-probe keyspace. We also need to ensure that
/// the scan operation will not cause OOM in the future.
#[allow(dead_code)]
pub(crate) async fn scan(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(lsn));
}
trace!(
"key-value scan request for {:?}@{} from task kind {:?}",
keyspace,
lsn,
ctx.task_kind()
);
// We should generalize this into Keyspace::contains in the future.
for range in &keyspace.ranges {
if range.start.field1 < METADATA_KEY_BEGIN_PREFIX
|| range.end.field1 >= METADATA_KEY_END_PREFIX
{
return Err(GetVectoredError::Other(anyhow::anyhow!(
"only metadata keyspace can be scanned"
)));
}
}
let start = crate::metrics::SCAN_LATENCY
.for_task_kind(ctx.task_kind())
.map(ScanLatencyOngoingRecording::start_recording);
// start counting after throttle so that throttle time
// is always less than observation time
let throttled = self
.timeline_get_throttle
// assume scan = 1 quota for now until we find a better way to process this
.throttle(ctx, 1)
.await;
let vectored_res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
ValuesReconstructState::default(),
ctx,
)
.await;
if let Some(recording) = start {
recording.observe(throttled);
}
vectored_res
}
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn get_vectored_sequential_impl(
&self,
@@ -1052,6 +1120,7 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
@@ -1064,12 +1133,16 @@ impl Timeline {
Err(Cancelled | AncestorStopping(_)) => {
return Err(GetVectoredError::Cancelled)
}
// we only capture stuck_at_lsn=false now until we figure out https://github.com/neondatabase/neon/issues/7380
Err(MissingKey(err)) if !NON_INHERITED_RANGE.contains(&key) => {
// The vectored read path handles non inherited keys specially.
// If such a a key cannot be reconstructed from the current timeline,
// the vectored read path returns a key level error as opposed to a top
// level error.
Err(MissingKey(_))
if NON_INHERITED_RANGE.contains(&key)
|| NON_INHERITED_SPARSE_RANGE.contains(&key) =>
{
// Ignore missing key error for aux key range. TODO: currently, we assume non_inherited_range == aux_key_range.
// When we add more types of keys into the page server, we should revisit this part of code and throw errors
// accordingly.
key = key.next();
}
Err(MissingKey(err)) => {
return Err(GetVectoredError::MissingKey(err));
}
Err(Other(err))
@@ -1157,6 +1230,11 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) {
if keyspace.overlaps(&Key::metadata_key_range()) {
// skip validation for metadata key range
return;
}
let sequential_res = self
.get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
.await;
@@ -3209,36 +3287,12 @@ impl Timeline {
// Do not descend into the ancestor timeline for aux files.
// We don't return a blanket [`GetVectoredError::MissingKey`] to avoid
// stalling compaction.
// TODO(chi): this will need to be updated for aux files v2 storage
if keyspace.overlaps(&NON_INHERITED_RANGE) {
let removed = keyspace.remove_overlapping_with(&KeySpace {
ranges: vec![NON_INHERITED_RANGE],
});
for range in removed.ranges {
let mut key = range.start;
while key < range.end {
reconstruct_state.on_key_error(
key,
PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn,
request_lsn,
ancestor_lsn: None,
traversal_path: Vec::default(),
backtrace: if cfg!(test) {
Some(std::backtrace::Backtrace::force_capture())
} else {
None
},
}),
);
key = key.next();
}
}
}
keyspace.remove_overlapping_with(&KeySpace {
ranges: vec![NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE],
});
// Keyspace is fully retrieved, no ancestor timeline, or metadata scan (where we do not look
// into ancestor timelines). TODO: is there any other metadata which we want to inherit?
if keyspace.total_raw_size() == 0 || timeline.ancestor_timeline.is_none() {
break;
}

View File

@@ -1535,7 +1535,7 @@ mod tests {
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
let mut state = dummy_state(&harness).await;
state.conf.availability_zone = test_az.clone();
state.conf.availability_zone.clone_from(&test_az);
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1568,7 +1568,7 @@ mod tests {
// We have another safekeeper with the same commit_lsn, and it have the same availability zone as
// the current pageserver.
let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
same_az_sk.timeline.availability_zone = test_az.clone();
same_az_sk.timeline.availability_zone.clone_from(&test_az);
state.wal_stream_candidates = HashMap::from([
(

View File

@@ -76,14 +76,11 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered<S: IoBuf>(
pub async fn write_buffered<S: IoBuf + Send>(
&mut self,
chunk: Slice<S>,
ctx: &RequestContext,
) -> std::io::Result<(usize, S)>
where
S: IoBuf + Send,
{
) -> std::io::Result<(usize, S)> {
let chunk_len = chunk.len();
// avoid memcpy for the middle of the chunk
if chunk.len() >= self.buf().cap() {

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.77.0"
channel = "1.78.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -114,6 +114,27 @@ impl PageserverClient {
)
}
pub(crate) async fn tenant_secondary_status(
&self,
tenant_shard_id: TenantShardId,
) -> Result<SecondaryProgress> {
measured_request!(
"tenant_secondary_status",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.tenant_secondary_status(tenant_shard_id).await
)
}
pub(crate) async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
measured_request!(
"tenant_heatmap_upload",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.tenant_heatmap_upload(tenant_id).await
)
}
pub(crate) async fn location_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -13,7 +13,9 @@ use crate::{
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
tenant_shard::ReconcileNeeded,
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
use control_plane::storage_controller::{
@@ -709,7 +711,7 @@ impl Service {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
// Run optimizer only when we didn't find any other work to do
self.optimize_all();
self.optimize_all().await;
}
}
_ = self.cancel.cancelled() => return
@@ -2639,6 +2641,45 @@ impl Service {
Ok(results)
}
/// Concurrently invoke a pageserver API call on many shards at once
pub(crate) async fn tenant_for_shards_api<T, O, F>(
&self,
locations: Vec<(TenantShardId, Node)>,
op: O,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> Vec<mgmt_api::Result<T>>
where
O: Fn(TenantShardId, PageserverClient) -> F + Copy,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
let mut futs = FuturesUnordered::new();
let mut results = Vec::with_capacity(locations.len());
for (tenant_shard_id, node) in locations {
futs.push(async move {
node.with_client_retries(
|client| op(tenant_shard_id, client),
&self.config.jwt_token,
warn_threshold,
max_retries,
timeout,
cancel,
)
.await
});
}
while let Some(r) = futs.next().await {
let r = r.unwrap_or(Err(mgmt_api::Error::Cancelled));
results.push(r);
}
results
}
pub(crate) async fn tenant_timeline_delete(
&self,
tenant_id: TenantId,
@@ -3088,11 +3129,14 @@ impl Service {
) -> (
TenantShardSplitResponse,
Vec<(TenantShardId, NodeId, ShardStripeSize)>,
Vec<ReconcilerWaiter>,
) {
let mut response = TenantShardSplitResponse {
new_shards: Vec::new(),
};
let mut child_locations = Vec::new();
let mut waiters = Vec::new();
{
let mut locked = self.inner.write().unwrap();
@@ -3171,14 +3215,112 @@ impl Service {
tracing::warn!("Failed to schedule child shard {child}: {e}");
}
// In the background, attach secondary locations for the new shards
self.maybe_reconcile_shard(&mut child_state, nodes);
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) {
waiters.push(waiter);
}
tenants.insert(child, child_state);
response.new_shards.push(child);
}
}
(response, child_locations, waiters)
}
}
(response, child_locations)
async fn tenant_shard_split_start_secondaries(
&self,
tenant_id: TenantId,
waiters: Vec<ReconcilerWaiter>,
) {
// Wait for initial reconcile of child shards, this creates the secondary locations
if let Err(e) = self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
// This is not a failure to split: it's some issue reconciling the new child shards, perhaps
// their secondaries couldn't be attached.
tracing::warn!("Failed to reconcile after split: {e}");
return;
}
// Take the state lock to discover the attached & secondary intents for all shards
let (attached, secondary) = {
let locked = self.inner.read().unwrap();
let mut attached = Vec::new();
let mut secondary = Vec::new();
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let Some(node_id) = shard.intent.get_attached() else {
// Unexpected. Race with a PlacementPolicy change?
tracing::warn!(
"No attached node on {tenant_shard_id} immediately after shard split!"
);
continue;
};
let Some(secondary_node_id) = shard.intent.get_secondary().first() else {
// No secondary location. Nothing for us to do.
continue;
};
let attached_node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
let secondary_node = locked
.nodes
.get(secondary_node_id)
.expect("Pageservers may not be deleted while referenced");
attached.push((*tenant_shard_id, attached_node.clone()));
secondary.push((*tenant_shard_id, secondary_node.clone()));
}
(attached, secondary)
};
if secondary.is_empty() {
// No secondary locations; nothing for us to do
return;
}
for result in self
.tenant_for_shards_api(
attached,
|tenant_shard_id, client| async move {
client.tenant_heatmap_upload(tenant_shard_id).await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
if let Err(e) = result {
tracing::warn!("Error calling heatmap upload after shard split: {e}");
return;
}
}
for result in self
.tenant_for_shards_api(
secondary,
|tenant_shard_id, client| async move {
client
.tenant_secondary_download(tenant_shard_id, Some(Duration::ZERO))
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
if let Err(e) = result {
tracing::warn!("Error calling secondary download after shard split: {e}");
return;
}
}
}
@@ -3212,8 +3354,8 @@ impl Service {
.do_tenant_shard_split(tenant_id, shard_split_params)
.await;
match r {
Ok(r) => Ok(r),
let (response, waiters) = match r {
Ok(r) => r,
Err(e) => {
// Split might be part-done, we must do work to abort it.
tracing::warn!("Enqueuing background abort of split on {tenant_id}");
@@ -3226,9 +3368,17 @@ impl Service {
})
// Ignore error sending: that just means we're shutting down: aborts are ephemeral so it's fine to drop it.
.ok();
Err(e)
return Err(e);
}
}
};
// The split is now complete. As an optimization, we will trigger all the child shards to upload
// a heatmap immediately, and all their secondary locations to start downloading: this avoids waiting
// for the background heatmap/download interval before secondaries get warm enough to migrate shards
// in [`Self::optimize_all`]
self.tenant_shard_split_start_secondaries(tenant_id, waiters)
.await;
Ok(response)
}
fn prepare_tenant_shard_split(
@@ -3378,7 +3528,7 @@ impl Service {
&self,
tenant_id: TenantId,
params: ShardSplitParams,
) -> Result<TenantShardSplitResponse, ApiError> {
) -> Result<(TenantShardSplitResponse, Vec<ReconcilerWaiter>), ApiError> {
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
// request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
// parent shards exist as expected, but it would be neater to do the above pre-checks within the
@@ -3580,7 +3730,7 @@ impl Service {
));
// Replace all the shards we just split with their children: this phase is infallible.
let (response, child_locations) =
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
// Send compute notifications for all the new shards
@@ -3607,7 +3757,7 @@ impl Service {
}
}
Ok(response)
Ok((response, waiters))
}
pub(crate) async fn tenant_shard_migrate(
@@ -4373,25 +4523,68 @@ impl Service {
/// To put it more briefly: whereas the scheduler respects soft constraints in a ScheduleContext at
/// the time of scheduling, this function looks for cases where a better-scoring location is available
/// according to those same soft constraints.
fn optimize_all(&self) -> usize {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
let mut schedule_context = ScheduleContext::default();
let mut reconciles_spawned = 0;
let mut tenant_shards: Vec<&TenantShard> = Vec::new();
async fn optimize_all(&self) -> usize {
// Limit on how many shards' optmizations each call to this function will execute. Combined
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
// trickle of optimizations in the background, rather than executing a large number in parallel
// when a change occurs.
const MAX_OPTIMIZATIONS_PER_PASS: usize = 2;
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 2;
// Synchronous prepare: scan shards for possible scheduling optimizations
let candidate_work = self.optimize_all_plan();
let candidate_work_len = candidate_work.len();
// Asynchronous validate: I/O to pageservers to make sure shards are in a good state to apply validation
let validated_work = self.optimize_all_validate(candidate_work).await;
let was_work_filtered = validated_work.len() != candidate_work_len;
// Synchronous apply: update the shards' intent states according to validated optimisations
let mut reconciles_spawned = 0;
let mut optimizations_applied = 0;
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, optimization) in validated_work {
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
// Shard was dropped between planning and execution;
continue;
};
if shard.apply_optimization(scheduler, optimization) {
optimizations_applied += 1;
if self.maybe_reconcile_shard(shard, nodes).is_some() {
reconciles_spawned += 1;
}
}
if optimizations_applied >= MAX_OPTIMIZATIONS_EXEC_PER_PASS {
break;
}
}
if was_work_filtered {
// If we filtered any work out during validation, ensure we return a nonzero value to indicate
// to callers that the system is not in a truly quiet state, it's going to do some work as soon
// as these validations start passing.
reconciles_spawned = std::cmp::max(reconciles_spawned, 1);
}
reconciles_spawned
}
fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> {
let mut schedule_context = ScheduleContext::default();
let mut tenant_shards: Vec<&TenantShard> = Vec::new();
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
let mut work = Vec::new();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, shard) in tenants.iter() {
if tenant_shard_id.is_shard_zero() {
// Reset accumulators on the first shard in a tenant
@@ -4400,7 +4593,7 @@ impl Service {
tenant_shards.clear();
}
if work.len() >= MAX_OPTIMIZATIONS_PER_PASS {
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}
@@ -4472,18 +4665,105 @@ impl Service {
}
}
for (tenant_shard_id, optimization) in work {
let shard = tenants
.get_mut(&tenant_shard_id)
.expect("We held lock from place we got this ID");
shard.apply_optimization(scheduler, optimization);
work
}
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
async fn optimize_all_validate(
&self,
candidate_work: Vec<(TenantShardId, ScheduleOptimization)>,
) -> Vec<(TenantShardId, ScheduleOptimization)> {
// Take a clone of the node map to use outside the lock in async validation phase
let validation_nodes = { self.inner.read().unwrap().nodes.clone() };
let mut want_secondary_status = Vec::new();
// Validate our plans: this is an async phase where we may do I/O to pageservers to
// check that the state of locations is acceptable to run the optimization, such as
// checking that a secondary location is sufficiently warmed-up to cleanly cut over
// in a live migration.
let mut validated_work = Vec::new();
for (tenant_shard_id, optimization) in candidate_work {
match optimization.action {
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: _,
new_attached_node_id,
}) => {
match validation_nodes.get(&new_attached_node_id) {
None => {
// Node was dropped between planning and validation
}
Some(node) => {
if !node.is_available() {
tracing::info!("Skipping optimization migration of {tenant_shard_id} to {new_attached_node_id} because node unavailable");
} else {
// Accumulate optimizations that require fetching secondary status, so that we can execute these
// remote API requests concurrently.
want_secondary_status.push((
tenant_shard_id,
node.clone(),
optimization,
));
}
}
}
}
ScheduleOptimizationAction::ReplaceSecondary(_) => {
// No extra checks needed to replace a secondary: this does not interrupt client access
validated_work.push((tenant_shard_id, optimization))
}
};
}
// Call into pageserver API to find out if the destination secondary location is warm enough for a reasonably smooth migration: we
// do this so that we avoid spawning a Reconciler that would have to wait minutes/hours for a destination to warm up: that reconciler
// would hold a precious reconcile semaphore unit the whole time it was waiting for the destination to warm up.
let results = self
.tenant_for_shards_api(
want_secondary_status
.iter()
.map(|i| (i.0, i.1.clone()))
.collect(),
|tenant_shard_id, client| async move {
client.tenant_secondary_status(tenant_shard_id).await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
for ((tenant_shard_id, node, optimization), secondary_status) in
want_secondary_status.into_iter().zip(results.into_iter())
{
match secondary_status {
Err(e) => {
tracing::info!("Skipping migration of {tenant_shard_id} to {node}, error querying secondary: {e}");
}
Ok(progress) => {
// We require secondary locations to have less than 10GiB of downloads pending before we will use
// them in an optimization
const DOWNLOAD_FRESHNESS_THRESHOLD: u64 = 10 * 1024 * 1024 * 1024;
if progress.bytes_total == 0
|| progress.bytes_total < DOWNLOAD_FRESHNESS_THRESHOLD
&& progress.bytes_downloaded != progress.bytes_total
|| progress.bytes_total - progress.bytes_downloaded
> DOWNLOAD_FRESHNESS_THRESHOLD
{
tracing::info!("Skipping migration of {tenant_shard_id} to {node} because secondary isn't ready: {progress:?}");
} else {
// Location looks ready: proceed
tracing::info!(
"{tenant_shard_id} secondary on {node} is warm enough for migration: {progress:?}"
);
validated_work.push((tenant_shard_id, optimization))
}
}
}
}
reconciles_spawned
validated_work
}
/// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
@@ -4491,10 +4771,12 @@ impl Service {
/// put the system into a quiescent state where future background reconciliations won't do anything.
pub(crate) async fn reconcile_all_now(&self) -> Result<usize, ReconcileWaitError> {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
let reconciles_spawned = if reconciles_spawned == 0 {
// Only optimize when we are otherwise idle
self.optimize_all();
}
self.optimize_all().await
} else {
reconciles_spawned
};
let waiters = {
let mut waiters = Vec::new();

View File

@@ -325,18 +325,28 @@ pub(crate) struct ReplaceSecondary {
#[derive(Eq, PartialEq, Debug)]
pub(crate) struct MigrateAttachment {
old_attached_node_id: NodeId,
new_attached_node_id: NodeId,
pub(crate) old_attached_node_id: NodeId,
pub(crate) new_attached_node_id: NodeId,
}
#[derive(Eq, PartialEq, Debug)]
pub(crate) enum ScheduleOptimization {
pub(crate) enum ScheduleOptimizationAction {
// Replace one of our secondary locations with a different node
ReplaceSecondary(ReplaceSecondary),
// Migrate attachment to an existing secondary location
MigrateAttachment(MigrateAttachment),
}
#[derive(Eq, PartialEq, Debug)]
pub(crate) struct ScheduleOptimization {
// What was the reconcile sequence when we generated this optimization? The optimization
// should only be applied if the shard's sequence is still at this value, in case other changes
// happened between planning the optimization and applying it.
sequence: Sequence,
pub(crate) action: ScheduleOptimizationAction,
}
impl ReconcilerWaiter {
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
tokio::select! {
@@ -675,10 +685,13 @@ impl TenantShard {
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *preferred_node,
}));
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *preferred_node,
}),
});
}
} else {
tracing::debug!(
@@ -736,28 +749,37 @@ impl TenantShard {
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
}));
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
}),
});
}
}
None
}
/// Return true if the optimization was really applied: it will not be applied if the optimization's
/// sequence is behind this tenant shard's
pub(crate) fn apply_optimization(
&mut self,
scheduler: &mut Scheduler,
optimization: ScheduleOptimization,
) {
) -> bool {
if optimization.sequence != self.sequence {
return false;
}
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_schedule_optimization
.inc();
match optimization {
ScheduleOptimization::MigrateAttachment(MigrateAttachment {
match optimization.action {
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id,
new_attached_node_id,
}) => {
@@ -765,7 +787,7 @@ impl TenantShard {
self.intent
.promote_attached(scheduler, new_attached_node_id);
}
ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id,
new_node_id,
}) => {
@@ -773,6 +795,8 @@ impl TenantShard {
self.intent.push_secondary(scheduler, new_node_id);
}
}
true
}
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
@@ -1428,10 +1452,13 @@ pub(crate) mod tests {
// would be no other shards from the same tenant, and request to do so.
assert_eq!(
optimization_a,
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
}))
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
})
})
);
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
@@ -1442,10 +1469,13 @@ pub(crate) mod tests {
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
assert_eq!(
optimization_b,
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(3)
}))
Some(ScheduleOptimization {
sequence: shard_b.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(3)
})
})
);
// Applying these optimizations should result in the end state proposed
@@ -1489,10 +1519,13 @@ pub(crate) mod tests {
// same tenant should generate an optimization to move one away
assert_eq!(
optimization_a,
Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
old_node_id: NodeId(3),
new_node_id: NodeId(4)
}))
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: NodeId(3),
new_node_id: NodeId(4)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());

View File

@@ -1104,7 +1104,6 @@ class NeonEnv:
self,
ps_id,
port=pageserver_port,
config_override=self.pageserver_config_override,
)
)
cfg["pageservers"].append(ps_cfg)
@@ -2373,15 +2372,12 @@ class NeonPageserver(PgProtocol, LogUtils):
TEMP_FILE_SUFFIX = "___temp"
def __init__(
self, env: NeonEnv, id: int, port: PageserverPort, config_override: Optional[str] = None
):
def __init__(self, env: NeonEnv, id: int, port: PageserverPort):
super().__init__(host="localhost", port=port.pg, user="cloud_admin")
self.env = env
self.id = id
self.running = False
self.service_port = port
self.config_override = config_override
self.version = env.get_binary_version("pageserver")
self.logfile = self.workdir / "pageserver.log"
# After a test finishes, we will scrape the log to see if there are any

View File

@@ -287,6 +287,11 @@ def test_sharding_split_smoke(
== shard_count
)
# Make secondary downloads slow: this exercises the storage controller logic for not migrating an attachment
# during post-split optimization until the secondary is ready
for ps in env.pageservers:
ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")])
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
post_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
@@ -300,7 +305,7 @@ def test_sharding_split_smoke(
# Enough background reconciliations should result in the shards being properly distributed.
# Run this before the workload, because its LSN-waiting code presumes stable locations.
env.storage_controller.reconcile_until_idle()
env.storage_controller.reconcile_until_idle(timeout_secs=60)
workload.validate()
@@ -342,6 +347,10 @@ def test_sharding_split_smoke(
assert cancelled_reconciles is not None and int(cancelled_reconciles) == 0
assert errored_reconciles is not None and int(errored_reconciles) == 0
# We should see that the migration of shards after the split waited for secondaries to warm up
# before happening
assert env.storage_controller.log_contains(".*Skipping.*because secondary isn't ready.*")
env.storage_controller.consistency_check()
def get_node_shard_counts(env: NeonEnv, tenant_ids):
@@ -1071,6 +1080,17 @@ def test_sharding_split_failures(
finish_split()
assert_split_done()
if isinstance(failure, StorageControllerFailpoint) and "post-complete" in failure.failpoint:
# On a post-complete failure, the controller will recover the post-split state
# after restart, but it will have missed the optimization part of the split function
# where secondary downloads are kicked off. This means that reconcile_until_idle
# will take a very long time if we wait for all optimizations to complete, because
# those optimizations will wait for secondary downloads.
#
# Avoid that by configuring the tenant into Essential scheduling mode, so that it will
# skip optimizations when we're exercising this particular failpoint.
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Essential"})
# Having completed the split, pump the background reconciles to ensure that
# the scheduler reaches an idle state
env.storage_controller.reconcile_until_idle(timeout_secs=30)