pageserver/controller: error handling for shard splitting (#7074)

## Problem

Shard splits worked, but weren't safe against failures (e.g. node crash
during split) yet.

Related: #6676 

## Summary of changes

- Introduce async rwlocks at the scope of Tenant and Node:
  - exclusive tenant lock is used to protect splits
- exclusive node lock is used to protect new reconciliation process that
happens when setting node active
- exclusive locks used in both cases when doing persistent updates (e.g.
node scheduling conf) where the update to DB & in-memory state needs to
be atomic.
- Add failpoints to shard splitting in control plane and pageserver
code.
- Implement error handling in control plane for shard splits: this
detaches child chards and ensures parent shards are re-attached.
- Crash-safety for storage controller restarts requires little effort:
we already reconcile with nodes over a storage controller restart, so as
long as we reset any incomplete splits in the DB on restart (added in
this PR), things are implicitly cleaned up.
- Implement reconciliation with offline nodes before they transition to
active:
- (in this context reconciliation means something like
startup_reconcile, not literally the Reconciler)
- This covers cases where split abort cannot reach a node to clean it
up: the cleanup will eventually happen when the node is marked active,
as part of reconciliation.
- This also covers the case where a node was unavailable when the
storage controller started, but becomes available later: previously this
allowed it to skip the startup reconcile.
- Storage controller now terminates on panics. We only use panics for
true "should never happen" assertions, and these cases can leave us in
an un-usable state if we keep running (e.g. panicking in a shard split).
In the unlikely event that we get into a crashloop as a result, we'll
rely on kubernetes to back us off.
- Add `test_sharding_split_failures` which exercises a variety of
failure cases during shard split.
This commit is contained in:
John Spray
2024-03-14 09:11:57 +00:00
committed by GitHub
parent 3bd6551b36
commit 44f42627dd
18 changed files with 1445 additions and 150 deletions

2
Cargo.lock generated
View File

@@ -282,8 +282,10 @@ dependencies = [
"control_plane",
"diesel",
"diesel_migrations",
"fail",
"futures",
"git-version",
"hex",
"humantime",
"hyper",
"metrics",

View File

@@ -19,8 +19,10 @@ aws-config.workspace = true
aws-sdk-secretsmanager.workspace = true
camino.workspace = true
clap.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
hyper.workspace = true
humantime.workspace = true
once_cell.workspace = true

View File

@@ -10,7 +10,9 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::auth::{Scope, SwappableJwtAuth};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
use utils::http::request::{must_get_query_param, parse_request_param};
use utils::id::{TenantId, TimelineId};
@@ -554,6 +556,9 @@ pub fn make_router(
.post("/debug/v1/consistency_check", |r| {
request_span(r, handle_consistency_check)
})
.put("/debug/v1/failpoints", |r| {
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
})
.get("/control/v1/tenant/:tenant_id/locate", |r| {
tenant_service_handler(r, handle_tenant_locate)
})

View File

@@ -0,0 +1,54 @@
use std::{collections::HashMap, sync::Arc};
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
/// is needed at a tenant-wide granularity.
pub(crate) struct IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
}
impl<T> IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
pub(crate) fn shared(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().read_owned()
}
pub(crate) fn exclusive(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().write_owned()
}
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {
let mut locked = self.entities.lock().unwrap();
locked.retain(|_k, lock| lock.try_write().is_err())
}
}
impl<T> Default for IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
fn default() -> Self {
Self {
entities: std::sync::Mutex::new(HashMap::new()),
}
}
}

View File

@@ -4,6 +4,7 @@ use utils::seqwait::MonotonicCounter;
mod auth;
mod compute_hook;
pub mod http;
mod id_lock_map;
pub mod metrics;
mod node;
pub mod persistence;

View File

@@ -206,6 +206,12 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
}
fn main() -> anyhow::Result<()> {
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
std::process::exit(1);
}));
tokio::runtime::Builder::new_current_thread()
// We use spawn_blocking for database operations, so require approximately
// as many blocking threads as we will open database connections.

View File

@@ -83,29 +83,38 @@ impl Node {
}
}
pub(crate) fn set_availability(
&mut self,
availability: NodeAvailability,
) -> AvailabilityTransition {
use NodeAvailability::*;
let transition = match (self.availability, availability) {
(Offline, Active) => {
pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
match self.get_availability_transition(availability) {
AvailabilityTransition::ToActive => {
// Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
// users of previously-cloned copies of the node will still see the old cancellation
// state. For example, Reconcilers in flight will have to complete and be spawned
// again to realize that the node has become available.
self.cancel = CancellationToken::new();
AvailabilityTransition::ToActive
}
(Active, Offline) => {
AvailabilityTransition::ToOffline => {
// Fire the node's cancellation token to cancel any in-flight API requests to it
self.cancel.cancel();
AvailabilityTransition::ToOffline
}
_ => AvailabilityTransition::Unchanged,
};
AvailabilityTransition::Unchanged => {}
}
self.availability = availability;
transition
}
/// Without modifying the availability of the node, convert the intended availability
/// into a description of the transition.
pub(crate) fn get_availability_transition(
&self,
availability: NodeAvailability,
) -> AvailabilityTransition {
use AvailabilityTransition::*;
use NodeAvailability::*;
match (self.availability, availability) {
(Offline, Active) => ToActive,
(Active, Offline) => ToOffline,
_ => Unchanged,
}
}
/// Whether we may send API requests to this node.

View File

@@ -11,6 +11,9 @@ use diesel::prelude::*;
use diesel::Connection;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
@@ -72,6 +75,14 @@ pub(crate) enum DatabaseError {
Logical(String),
}
#[must_use]
pub(crate) enum AbortShardSplitStatus {
/// We aborted the split in the database by reverting to the parent shards
Aborted,
/// The split had already been persisted.
Complete,
}
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
@@ -570,6 +581,51 @@ impl Persistence {
})
.await
}
/// Used when the remote part of a shard split failed: we will revert the database state to have only
/// the parent shards, with SplitState::Idle.
pub(crate) async fn abort_shard_split(
&self,
split_tenant_id: TenantId,
new_shard_count: ShardCount,
) -> DatabaseResult<AbortShardSplitStatus> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
let aborted = conn.transaction(|conn| -> DatabaseResult<AbortShardSplitStatus> {
// Clear the splitting state on parent shards
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.ne(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)?;
// Parent shards are already gone: we cannot abort.
if updated == 0 {
return Ok(AbortShardSplitStatus::Complete);
}
// Sanity check: if parent shards were present, their cardinality should
// be less than the number of child shards.
if updated >= new_shard_count.count() as usize {
return Err(DatabaseError::Logical(format!(
"Unexpected parent shard count {updated} while aborting split to \
count {new_shard_count:?} on tenant {split_tenant_id}"
)));
}
// Erase child shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.execute(conn)?;
Ok(AbortShardSplitStatus::Aborted)
})?;
Ok(aborted)
})
.await
}
}
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
@@ -604,6 +660,28 @@ pub(crate) struct TenantShardPersistence {
pub(crate) config: String,
}
impl TenantShardPersistence {
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
if self.shard_count == 0 {
Ok(ShardIdentity::unsharded())
} else {
Ok(ShardIdentity::new(
ShardNumber(self.shard_number as u8),
ShardCount::new(self.shard_count as u8),
ShardStripeSize(self.shard_stripe_size as u32),
)?)
}
}
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[diesel(table_name = crate::schema::nodes)]

View File

@@ -1,5 +1,6 @@
use crate::persistence::Persistence;
use crate::service;
use hyper::StatusCode;
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
@@ -18,6 +19,8 @@ use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
/// Object with the lifetime of the background reconcile task that is created
/// for tenants which have a difference between their intent and observed states.
pub(super) struct Reconciler {
@@ -485,17 +488,29 @@ impl Reconciler {
)
.await
{
Some(Ok(observed)) => observed,
Some(Ok(observed)) => Some(observed),
Some(Err(mgmt_api::Error::ApiError(status, _msg)))
if status == StatusCode::NOT_FOUND =>
{
None
}
Some(Err(e)) => return Err(e.into()),
None => return Err(ReconcileError::Cancel),
};
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
self.observed.locations.insert(
attached_node.get_id(),
ObservedStateLocation {
conf: observed_conf,
},
);
match observed_conf {
Some(conf) => {
// Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
// if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
self.observed
.locations
.insert(attached_node.get_id(), ObservedStateLocation { conf });
}
None => {
// Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
self.observed.locations.remove(&attached_node.get_id());
}
}
}
Ok(())
@@ -525,7 +540,12 @@ impl Reconciler {
)));
};
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
let mut wanted_conf = attached_location_conf(
generation,
&self.shard,
&self.config,
!self.intent.secondary.is_empty(),
);
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
@@ -662,10 +682,26 @@ impl Reconciler {
}
}
/// We tweak the externally-set TenantConfig while configuring
/// locations, using our awareness of whether secondary locations
/// are in use to automatically enable/disable heatmap uploads.
fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
let mut config = config.clone();
if has_secondaries {
if config.heatmap_period.is_none() {
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
}
} else {
config.heatmap_period = None;
}
config
}
pub(crate) fn attached_location_conf(
generation: Generation,
shard: &ShardIdentity,
config: &TenantConfig,
has_secondaries: bool,
) -> LocationConfig {
LocationConfig {
mode: LocationConfigMode::AttachedSingle,
@@ -674,7 +710,7 @@ pub(crate) fn attached_location_conf(
shard_number: shard.number.0,
shard_count: shard.count.literal(),
shard_stripe_size: shard.stripe_size.0,
tenant_conf: config.clone(),
tenant_conf: ha_aware_config(config, has_secondaries),
}
}
@@ -689,6 +725,6 @@ pub(crate) fn secondary_location_conf(
shard_number: shard.number.0,
shard_count: shard.count.literal(),
shard_stripe_size: shard.stripe_size.0,
tenant_conf: config.clone(),
tenant_conf: ha_aware_config(config, true),
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -577,7 +577,12 @@ impl TenantState {
.generation
.expect("Attempted to enter attached state without a generation");
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
let wanted_conf = attached_location_conf(
generation,
&self.shard,
&self.config,
!self.intent.secondary.is_empty(),
);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {

View File

@@ -2108,6 +2108,16 @@ where
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
{
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
"failpoint".into()
)));
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
anyhow::anyhow!("failpoint")
)));
}
// Spawn a new task to handle the request, to protect the handler from unexpected
// async cancellations. Most pageserver functions are not async cancellation safe.
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task

View File

@@ -1443,6 +1443,35 @@ impl TenantManager {
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let r = self
.do_shard_split(tenant_shard_id, new_shard_count, new_stripe_size, ctx)
.await;
if r.is_err() {
// Shard splitting might have left the original shard in a partially shut down state (it
// stops the shard's remote timeline client). Reset it to ensure we leave things in
// a working state.
if self.get(tenant_shard_id).is_some() {
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
// Log this error because our return value will still be the original error, not this one. This is
// a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
// (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
// setting it broken probably won't help either.
tracing::error!("Failed to reset {tenant_shard_id}: {e}");
}
}
}
r
}
pub(crate) async fn do_shard_split(
&self,
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
@@ -1477,6 +1506,10 @@ impl TenantManager {
.join(",")
);
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
let parent_shard_identity = tenant.shard_identity;
let parent_tenant_conf = tenant.get_tenant_conf();
let parent_generation = tenant.generation;
@@ -1490,6 +1523,10 @@ impl TenantManager {
return Err(e);
}
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
self.resources.deletion_queue_client.flush_advisory();
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
@@ -1511,11 +1548,16 @@ impl TenantManager {
anyhow::bail!("Detached parent shard in the middle of split!")
}
};
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Optimization: hardlink layers from the parent into the children, so that they don't have to
// re-download & duplicate the data referenced in their initial IndexPart
self.shard_split_hardlink(parent, child_shards.clone())
.await?;
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
// child shards to reach this point.
@@ -1555,6 +1597,10 @@ impl TenantManager {
.await?;
}
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
for child_shard_id in &child_shards {
let child_shard_id = *child_shard_id;
@@ -1587,6 +1633,10 @@ impl TenantManager {
timeline.timeline_id,
target_lsn
);
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
"failpoint"
)));
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
// Failure here might mean shutdown, in any case this part is an optimization
// and we shouldn't hold up the split operation.
@@ -1632,6 +1682,10 @@ impl TenantManager {
},
);
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
"failpoint"
)));
parent_slot_guard.drop_old_value()?;
// Phase 6: Release the InProgress on the parent shard

View File

@@ -2,6 +2,7 @@ pytest_plugins = (
"fixtures.pg_version",
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.compute_reconfigure",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -0,0 +1,62 @@
import concurrent.futures
from typing import Any
import pytest
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.log_helper import log
from fixtures.types import TenantId
class ComputeReconfigure:
def __init__(self, server):
self.server = server
self.control_plane_compute_hook_api = f"http://{server.host}:{server.port}/notify-attach"
self.workloads = {}
def register_workload(self, workload):
self.workloads[workload.tenant_id] = workload
@pytest.fixture(scope="function")
def compute_reconfigure_listener(make_httpserver):
"""
This fixture exposes an HTTP listener for the storage controller to submit
compute notifications to us, instead of updating neon_local endpoints itself.
Although storage controller can use neon_local directly, this causes problems when
the test is also concurrently modifying endpoints. Instead, configure storage controller
to send notifications up to this test code, which will route all endpoint updates
through Workload, which has a mutex to make concurrent updates safe.
"""
server = make_httpserver
self = ComputeReconfigure(server)
# Do neon_local endpoint reconfiguration in the background so that we can
# accept a healthy rate of calls into notify-attach.
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def handler(request: Request):
assert request.json is not None
body: dict[str, Any] = request.json
log.info(f"notify-attach request: {body}")
try:
workload = self.workloads[TenantId(body["tenant_id"])]
except KeyError:
pass
else:
# This causes the endpoint to query storage controller for its location, which
# is redundant since we already have it here, but this avoids extending the
# neon_local CLI to take full lists of locations
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
return Response(status=200)
self.server.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
yield self
reconfigure_threads.shutdown()
server.clear()

View File

@@ -2177,6 +2177,23 @@ class NeonStorageController(MetricsGetter):
)
log.info("storage controller passed consistency check")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.request(
"PUT",
f"{self.env.storage_controller_api}/debug/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
headers=self.headers(TokenScope.ADMIN),
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
def __enter__(self) -> "NeonStorageController":
return self

View File

@@ -1,3 +1,4 @@
import threading
from typing import Optional
from fixtures.log_helper import log
@@ -11,6 +12,10 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
ENDPOINT_LOCK = threading.Lock()
class Workload:
"""
@@ -41,13 +46,26 @@ class Workload:
self._endpoint: Optional[Endpoint] = None
def reconfigure(self):
"""
Request the endpoint to reconfigure based on location reported by storage controller
"""
if self._endpoint is not None:
with ENDPOINT_LOCK:
self._endpoint.reconfigure()
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
# We may be running alongside other Workloads for different tenants. Full TTID is
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
with ENDPOINT_LOCK:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id="ep-workload",
endpoint_id=endpoint_id,
)
self._endpoint.start(pageserver_id=pageserver_id)
else:
@@ -94,7 +112,7 @@ class Workload:
else:
return False
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
assert self.expect_rows >= n
max_iters = 10
@@ -132,17 +150,23 @@ class Workload:
]
)
if ingest:
# Wait for written data to be ingested by the pageserver
for tenant_shard_id, pageserver in tenant_get_shards(
self.env, self.tenant_id, pageserver_id
):
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
self.env,
endpoint,
self.tenant_id,
self.timeline_id,
pageserver_id=pageserver_id,
)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
if upload:
# force a checkpoint to trigger upload
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")

View File

@@ -1,10 +1,14 @@
import os
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union
import pytest
import requests
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
StorageControllerApiException,
tenant_get_shards,
)
from fixtures.remote_storage import s3_storage
@@ -495,3 +499,337 @@ def test_sharding_ingest(
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count
class Failure:
pageserver_id: Optional[int]
def apply(self, env: NeonEnv):
raise NotImplementedError()
def clear(self, env: NeonEnv):
"""
Clear the failure, in a way that should enable the system to proceed
to a totally clean state (all nodes online and reconciled)
"""
raise NotImplementedError()
def expect_available(self):
raise NotImplementedError()
def can_mitigate(self):
"""Whether Self.mitigate is available for use"""
return False
def mitigate(self, env: NeonEnv):
"""
Mitigate the failure in a way that should allow shard split to
complete and service to resume, but does not guarantee to leave
the whole world in a clean state (e.g. an Offline node might have
junk LocationConfigs on it)
"""
raise NotImplementedError()
def fails_forward(self, env: NeonEnv):
"""
If true, this failure results in a state that eventualy completes the split.
"""
return False
def expect_exception(self):
"""
How do we expect a call to the split API to fail?
"""
return StorageControllerApiException
class PageserverFailpoint(Failure):
def __init__(self, failpoint, pageserver_id, mitigate):
self.failpoint = failpoint
self.pageserver_id = pageserver_id
self._mitigate = mitigate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.allowed_errors.extend(
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
)
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
if self._mitigate:
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Active"})
def expect_available(self):
return True
def can_mitigate(self):
return self._mitigate
def mitigate(self, env):
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
class StorageControllerFailpoint(Failure):
def __init__(self, failpoint, action):
self.failpoint = failpoint
self.pageserver_id = None
self.action = action
def apply(self, env: NeonEnv):
env.storage_controller.configure_failpoints((self.failpoint, self.action))
def clear(self, env: NeonEnv):
if "panic" in self.action:
log.info("Restarting storage controller after panic")
env.storage_controller.stop()
env.storage_controller.start()
else:
env.storage_controller.configure_failpoints((self.failpoint, "off"))
def expect_available(self):
# Controller panics _do_ leave pageservers available, but our test code relies
# on using the locate API to update configurations in Workload, so we must skip
# these actions when the controller has been panicked.
return "panic" not in self.action
def can_mitigate(self):
return False
def fails_forward(self, env):
# Edge case: the very last failpoint that simulates a DB connection error, where
# the abort path will fail-forward and result in a complete split.
fail_forward = self.failpoint == "shard-split-post-complete"
# If the failure was a panic, then if we expect split to eventually (after restart)
# complete, we must restart before checking that.
if fail_forward and "panic" in self.action:
log.info("Restarting storage controller after panic")
env.storage_controller.stop()
env.storage_controller.start()
return fail_forward
def expect_exception(self):
if "panic" in self.action:
return requests.exceptions.ConnectionError
else:
return StorageControllerApiException
class NodeKill(Failure):
def __init__(self, pageserver_id, mitigate):
self.pageserver_id = pageserver_id
self._mitigate = mitigate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=True)
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.start()
def expect_available(self):
return False
def mitigate(self, env):
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
class CompositeFailure(Failure):
"""
Wrapper for failures in multiple components (e.g. a failpoint in the storage controller, *and*
stop a pageserver to interfere with rollback)
"""
def __init__(self, failures: list[Failure]):
self.failures = failures
self.pageserver_id = None
for f in failures:
if f.pageserver_id is not None:
self.pageserver_id = f.pageserver_id
break
def apply(self, env: NeonEnv):
for f in self.failures:
f.apply(env)
def clear(self, env):
for f in self.failures:
f.clear(env)
def expect_available(self):
return all(f.expect_available() for f in self.failures)
def mitigate(self, env):
for f in self.failures:
f.mitigate(env)
def expect_exception(self):
expect = set(f.expect_exception() for f in self.failures)
# We can't give a sensible response if our failures have different expectations
assert len(expect) == 1
return list(expect)[0]
@pytest.mark.parametrize(
"failure",
[
PageserverFailpoint("api-500", 1, False),
NodeKill(1, False),
PageserverFailpoint("api-500", 1, True),
NodeKill(1, True),
PageserverFailpoint("shard-split-pre-prepare", 1, False),
PageserverFailpoint("shard-split-post-prepare", 1, False),
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
PageserverFailpoint("shard-split-post-hardlink", 1, False),
PageserverFailpoint("shard-split-post-child-conf", 1, False),
PageserverFailpoint("shard-split-lsn-wait", 1, False),
PageserverFailpoint("shard-split-pre-finish", 1, False),
StorageControllerFailpoint("shard-split-validation", "return(1)"),
StorageControllerFailpoint("shard-split-post-begin", "return(1)"),
StorageControllerFailpoint("shard-split-post-remote", "return(1)"),
StorageControllerFailpoint("shard-split-post-complete", "return(1)"),
StorageControllerFailpoint("shard-split-validation", "panic(failpoint)"),
StorageControllerFailpoint("shard-split-post-begin", "panic(failpoint)"),
StorageControllerFailpoint("shard-split-post-remote", "panic(failpoint)"),
StorageControllerFailpoint("shard-split-post-complete", "panic(failpoint)"),
CompositeFailure(
[NodeKill(1, True), StorageControllerFailpoint("shard-split-post-begin", "return(1)")]
),
CompositeFailure(
[NodeKill(1, False), StorageControllerFailpoint("shard-split-post-begin", "return(1)")]
),
],
)
def test_sharding_split_failures(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
failure: Failure,
):
neon_env_builder.num_pageservers = 4
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
initial_shard_count = 2
split_shard_count = 4
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
for ps in env.pageservers:
# When we do node failures and abandon a shard, it will de-facto have old generation and
# thereby be unable to publish remote consistent LSN updates
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
assert (
failure.pageserver_id is None
or len(
env.get_pageserver(failure.pageserver_id)
.http_client()
.tenant_list_locations()["tenant_shards"]
)
> 0
)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(100)
# Put the environment into a failing state (exact meaning depends on `failure`)
failure.apply(env)
with pytest.raises(failure.expect_exception()):
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
# We expect that the overall operation will fail, but some split requests
# will have succeeded: the net result should be to return to a clean state, including
# detaching any child shards.
def assert_rolled_back(exclude_ps_id=None) -> None:
count = 0
for ps in env.pageservers:
if exclude_ps_id is not None and ps.id == exclude_ps_id:
continue
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
assert tenant_shard_id.shard_count == initial_shard_count
count += 1
assert count == initial_shard_count
def assert_split_done(exclude_ps_id=None) -> None:
count = 0
for ps in env.pageservers:
if exclude_ps_id is not None and ps.id == exclude_ps_id:
continue
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
assert tenant_shard_id.shard_count == split_shard_count
count += 1
assert count == split_shard_count
def finish_split():
# Having failed+rolled back, we should be able to split again
# No failures this time; it will succeed
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
workload.churn_rows(10)
workload.validate()
if failure.expect_available():
# Even though the split failed partway through, this should not have interrupted
# clients. Disable waiting for pageservers in the workload helper, because our
# failpoints may prevent API access.
# This only applies for failure modes that leave pageserver page_service API available.
workload.churn_rows(10, upload=False, ingest=False)
workload.validate()
if failure.fails_forward(env):
log.info("Fail-forward failure, checking split eventually completes...")
# A failure type which results in eventual completion of the split
wait_until(30, 1, assert_split_done)
elif failure.can_mitigate():
log.info("Mitigating failure...")
# Mitigation phase: we expect to be able to proceed with a successful shard split
failure.mitigate(env)
# The split should appear to be rolled back from the point of view of all pageservers
# apart from the one that is offline
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
finish_split()
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
# Having cleared the failure, everything should converge to a pristine state
failure.clear(env)
wait_until(30, 1, assert_split_done)
else:
# Once we restore the faulty pageserver's API to good health, rollback should
# eventually complete.
log.info("Clearing failure...")
failure.clear(env)
wait_until(30, 1, assert_rolled_back)
# Having rolled back, the tenant should be working
workload.churn_rows(10)
workload.validate()
# Splitting again should work, since we cleared the failure
finish_split()
assert_split_done()
env.storage_controller.consistency_check()