mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-10 03:40:37 +00:00
Compare commits
19 Commits
conrad/pro
...
jcsp/ha-te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
151815c2d3 | ||
|
|
b04cf3ea8b | ||
|
|
3d228968ef | ||
|
|
f812904eff | ||
|
|
8a3b1e0876 | ||
|
|
ef994de66b | ||
|
|
665cb8c398 | ||
|
|
1f9e32734e | ||
|
|
0f0606da3c | ||
|
|
9bdbe2d630 | ||
|
|
acd397a5f4 | ||
|
|
25bd74fd6a | ||
|
|
223810fd79 | ||
|
|
2baac6f6e6 | ||
|
|
27815678e7 | ||
|
|
59523444cc | ||
|
|
7a1f83854f | ||
|
|
da22557383 | ||
|
|
7ee3b59ba8 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -282,8 +282,10 @@ dependencies = [
|
|||||||
"control_plane",
|
"control_plane",
|
||||||
"diesel",
|
"diesel",
|
||||||
"diesel_migrations",
|
"diesel_migrations",
|
||||||
|
"fail",
|
||||||
"futures",
|
"futures",
|
||||||
"git-version",
|
"git-version",
|
||||||
|
"hex",
|
||||||
"humantime",
|
"humantime",
|
||||||
"hyper",
|
"hyper",
|
||||||
"metrics",
|
"metrics",
|
||||||
|
|||||||
@@ -19,8 +19,10 @@ aws-config.workspace = true
|
|||||||
aws-sdk-secretsmanager.workspace = true
|
aws-sdk-secretsmanager.workspace = true
|
||||||
camino.workspace = true
|
camino.workspace = true
|
||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
|
fail.workspace = true
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
git-version.workspace = true
|
git-version.workspace = true
|
||||||
|
hex.workspace = true
|
||||||
hyper.workspace = true
|
hyper.workspace = true
|
||||||
humantime.workspace = true
|
humantime.workspace = true
|
||||||
once_cell.workspace = true
|
once_cell.workspace = true
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
use std::{collections::HashMap, time::Duration};
|
use std::{collections::HashMap, time::Duration};
|
||||||
|
|
||||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||||
@@ -23,10 +24,13 @@ struct ShardedComputeHookTenant {
|
|||||||
stripe_size: ShardStripeSize,
|
stripe_size: ShardStripeSize,
|
||||||
shard_count: ShardCount,
|
shard_count: ShardCount,
|
||||||
shards: Vec<(ShardNumber, NodeId)>,
|
shards: Vec<(ShardNumber, NodeId)>,
|
||||||
|
|
||||||
|
// Async lock used for ensuring that remote compute hook calls are ordered identically to updates to this structure
|
||||||
|
lock: Arc<tokio::sync::Mutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ComputeHookTenant {
|
enum ComputeHookTenant {
|
||||||
Unsharded(NodeId),
|
Unsharded((NodeId, Arc<tokio::sync::Mutex<()>>)),
|
||||||
Sharded(ShardedComputeHookTenant),
|
Sharded(ShardedComputeHookTenant),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,9 +42,17 @@ impl ComputeHookTenant {
|
|||||||
shards: vec![(tenant_shard_id.shard_number, node_id)],
|
shards: vec![(tenant_shard_id.shard_number, node_id)],
|
||||||
stripe_size,
|
stripe_size,
|
||||||
shard_count: tenant_shard_id.shard_count,
|
shard_count: tenant_shard_id.shard_count,
|
||||||
|
lock: Arc::default(),
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
Self::Unsharded(node_id)
|
Self::Unsharded((node_id, Arc::default()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_lock(&self) -> &Arc<tokio::sync::Mutex<()>> {
|
||||||
|
match self {
|
||||||
|
Self::Unsharded((_node_id, lock)) => lock,
|
||||||
|
Self::Sharded(sharded_tenant) => &sharded_tenant.lock,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,7 +65,9 @@ impl ComputeHookTenant {
|
|||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
) {
|
) {
|
||||||
match self {
|
match self {
|
||||||
Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => {
|
Self::Unsharded((existing_node_id, _lock))
|
||||||
|
if tenant_shard_id.shard_count.count() == 1 =>
|
||||||
|
{
|
||||||
*existing_node_id = node_id
|
*existing_node_id = node_id
|
||||||
}
|
}
|
||||||
Self::Sharded(sharded_tenant)
|
Self::Sharded(sharded_tenant)
|
||||||
@@ -122,9 +136,15 @@ pub(crate) enum NotifyError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ComputeHookTenant {
|
impl ComputeHookTenant {
|
||||||
fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option<ComputeHookNotifyRequest> {
|
fn maybe_reconfigure(
|
||||||
match self {
|
&self,
|
||||||
Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest {
|
tenant_id: TenantId,
|
||||||
|
) -> Option<(
|
||||||
|
ComputeHookNotifyRequest,
|
||||||
|
impl std::future::Future<Output = tokio::sync::OwnedMutexGuard<()>>,
|
||||||
|
)> {
|
||||||
|
let request = match self {
|
||||||
|
Self::Unsharded((node_id, _lock)) => Some(ComputeHookNotifyRequest {
|
||||||
tenant_id,
|
tenant_id,
|
||||||
shards: vec![ComputeHookNotifyRequestShard {
|
shards: vec![ComputeHookNotifyRequestShard {
|
||||||
shard_number: ShardNumber(0),
|
shard_number: ShardNumber(0),
|
||||||
@@ -158,7 +178,9 @@ impl ComputeHookTenant {
|
|||||||
);
|
);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
|
request.map(|r| (r, self.get_lock().clone().lock_owned()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,8 +189,11 @@ impl ComputeHookTenant {
|
|||||||
/// the compute connection string.
|
/// the compute connection string.
|
||||||
pub(super) struct ComputeHook {
|
pub(super) struct ComputeHook {
|
||||||
config: Config,
|
config: Config,
|
||||||
state: tokio::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
||||||
authorization_header: Option<String>,
|
authorization_header: Option<String>,
|
||||||
|
|
||||||
|
// This lock is only used in testing enviroments, to serialize calls into neon_lock
|
||||||
|
neon_local_lock: tokio::sync::Mutex<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ComputeHook {
|
impl ComputeHook {
|
||||||
@@ -182,6 +207,7 @@ impl ComputeHook {
|
|||||||
state: Default::default(),
|
state: Default::default(),
|
||||||
config,
|
config,
|
||||||
authorization_header,
|
authorization_header,
|
||||||
|
neon_local_lock: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,6 +216,10 @@ impl ComputeHook {
|
|||||||
&self,
|
&self,
|
||||||
reconfigure_request: ComputeHookNotifyRequest,
|
reconfigure_request: ComputeHookNotifyRequest,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
// neon_local updates are not safe to call concurrently, use a lock to serialize
|
||||||
|
// all calls to this function
|
||||||
|
let _locked = self.neon_local_lock.lock().await;
|
||||||
|
|
||||||
let env = match LocalEnv::load_config() {
|
let env = match LocalEnv::load_config() {
|
||||||
Ok(e) => e,
|
Ok(e) => e,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -340,30 +370,38 @@ impl ComputeHook {
|
|||||||
stripe_size: ShardStripeSize,
|
stripe_size: ShardStripeSize,
|
||||||
cancel: &CancellationToken,
|
cancel: &CancellationToken,
|
||||||
) -> Result<(), NotifyError> {
|
) -> Result<(), NotifyError> {
|
||||||
let mut locked = self.state.lock().await;
|
let reconfigure_request = {
|
||||||
|
let mut locked = self.state.lock().unwrap();
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
|
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
|
||||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||||
tenant_shard_id,
|
tenant_shard_id,
|
||||||
stripe_size,
|
stripe_size,
|
||||||
node_id,
|
node_id,
|
||||||
)),
|
)),
|
||||||
Entry::Occupied(e) => {
|
Entry::Occupied(e) => {
|
||||||
let tenant = e.into_mut();
|
let tenant = e.into_mut();
|
||||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||||
tenant
|
tenant
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tenant.maybe_reconfigure(tenant_shard_id.tenant_id)
|
||||||
};
|
};
|
||||||
|
let Some((reconfigure_request, lock_fut)) = reconfigure_request else {
|
||||||
let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id);
|
|
||||||
let Some(reconfigure_request) = reconfigure_request else {
|
|
||||||
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
|
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
|
||||||
// until it does.
|
// until it does.
|
||||||
tracing::info!("Tenant isn't yet ready to emit a notification");
|
tracing::info!("Tenant isn't yet ready to emit a notification");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Finish acquiring the tenant's async lock: this future was created inside the self.state
|
||||||
|
// lock above, so we are guaranteed to get this lock in the same order as callers took
|
||||||
|
// that lock. This ordering is essential: the cloud control plane must end up with the
|
||||||
|
// same end state for the tenant that we see.
|
||||||
|
let _guard = lock_fut.await;
|
||||||
|
|
||||||
if let Some(notify_url) = &self.config.compute_hook_url {
|
if let Some(notify_url) = &self.config.compute_hook_url {
|
||||||
self.do_notify(notify_url, reconfigure_request, cancel)
|
self.do_notify(notify_url, reconfigure_request, cancel)
|
||||||
.await
|
.await
|
||||||
@@ -405,6 +443,7 @@ pub(crate) mod tests {
|
|||||||
tenant_state
|
tenant_state
|
||||||
.maybe_reconfigure(tenant_id)
|
.maybe_reconfigure(tenant_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.0
|
||||||
.shards
|
.shards
|
||||||
.len(),
|
.len(),
|
||||||
1
|
1
|
||||||
@@ -412,6 +451,7 @@ pub(crate) mod tests {
|
|||||||
assert!(tenant_state
|
assert!(tenant_state
|
||||||
.maybe_reconfigure(tenant_id)
|
.maybe_reconfigure(tenant_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.0
|
||||||
.stripe_size
|
.stripe_size
|
||||||
.is_none());
|
.is_none());
|
||||||
|
|
||||||
@@ -445,6 +485,7 @@ pub(crate) mod tests {
|
|||||||
tenant_state
|
tenant_state
|
||||||
.maybe_reconfigure(tenant_id)
|
.maybe_reconfigure(tenant_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.0
|
||||||
.shards
|
.shards
|
||||||
.len(),
|
.len(),
|
||||||
2
|
2
|
||||||
@@ -453,6 +494,7 @@ pub(crate) mod tests {
|
|||||||
tenant_state
|
tenant_state
|
||||||
.maybe_reconfigure(tenant_id)
|
.maybe_reconfigure(tenant_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.0
|
||||||
.stripe_size,
|
.stripe_size,
|
||||||
Some(ShardStripeSize(32768))
|
Some(ShardStripeSize(32768))
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -10,7 +10,9 @@ use pageserver_api::shard::TenantShardId;
|
|||||||
use pageserver_client::mgmt_api;
|
use pageserver_client::mgmt_api;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use utils::auth::{Scope, SwappableJwtAuth};
|
use utils::auth::{Scope, SwappableJwtAuth};
|
||||||
|
use utils::failpoint_support::failpoints_handler;
|
||||||
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
|
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
|
||||||
use utils::http::request::{must_get_query_param, parse_request_param};
|
use utils::http::request::{must_get_query_param, parse_request_param};
|
||||||
use utils::id::{TenantId, TimelineId};
|
use utils::id::{TenantId, TimelineId};
|
||||||
@@ -438,6 +440,24 @@ async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiEr
|
|||||||
state.service.tenants_dump()
|
state.service.tenants_dump()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_balance_all(
|
||||||
|
service: Arc<Service>,
|
||||||
|
req: Request<Body>,
|
||||||
|
) -> Result<Response<Body>, ApiError> {
|
||||||
|
check_permissions(&req, Scope::Admin)?;
|
||||||
|
service.balance_all()?;
|
||||||
|
json_response(StatusCode::OK, ())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_balance_attached(
|
||||||
|
service: Arc<Service>,
|
||||||
|
req: Request<Body>,
|
||||||
|
) -> Result<Response<Body>, ApiError> {
|
||||||
|
check_permissions(&req, Scope::Admin)?;
|
||||||
|
service.balance_attached()?;
|
||||||
|
json_response(StatusCode::OK, ())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
check_permissions(&req, Scope::Admin)?;
|
check_permissions(&req, Scope::Admin)?;
|
||||||
|
|
||||||
@@ -554,6 +574,9 @@ pub fn make_router(
|
|||||||
.post("/debug/v1/consistency_check", |r| {
|
.post("/debug/v1/consistency_check", |r| {
|
||||||
request_span(r, handle_consistency_check)
|
request_span(r, handle_consistency_check)
|
||||||
})
|
})
|
||||||
|
.put("/debug/v1/failpoints", |r| {
|
||||||
|
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
|
||||||
|
})
|
||||||
.get("/control/v1/tenant/:tenant_id/locate", |r| {
|
.get("/control/v1/tenant/:tenant_id/locate", |r| {
|
||||||
tenant_service_handler(r, handle_tenant_locate)
|
tenant_service_handler(r, handle_tenant_locate)
|
||||||
})
|
})
|
||||||
@@ -572,6 +595,12 @@ pub fn make_router(
|
|||||||
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
|
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
|
||||||
tenant_service_handler(r, handle_tenant_shard_split)
|
tenant_service_handler(r, handle_tenant_shard_split)
|
||||||
})
|
})
|
||||||
|
.post("/control/v1/balance/all", |r| {
|
||||||
|
tenant_service_handler(r, handle_balance_all)
|
||||||
|
})
|
||||||
|
.post("/control/v1/balance/attached", |r| {
|
||||||
|
tenant_service_handler(r, handle_balance_attached)
|
||||||
|
})
|
||||||
// Tenant operations
|
// Tenant operations
|
||||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||||
|
|||||||
54
control_plane/attachment_service/src/id_lock_map.rs
Normal file
54
control_plane/attachment_service/src/id_lock_map.rs
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
|
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
|
||||||
|
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
|
||||||
|
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
|
||||||
|
/// is needed at a tenant-wide granularity.
|
||||||
|
pub(crate) struct IdLockMap<T>
|
||||||
|
where
|
||||||
|
T: Eq + PartialEq + std::hash::Hash,
|
||||||
|
{
|
||||||
|
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
|
||||||
|
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> IdLockMap<T>
|
||||||
|
where
|
||||||
|
T: Eq + PartialEq + std::hash::Hash,
|
||||||
|
{
|
||||||
|
pub(crate) fn shared(
|
||||||
|
&self,
|
||||||
|
key: T,
|
||||||
|
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
|
||||||
|
let mut locked = self.entities.lock().unwrap();
|
||||||
|
let entry = locked.entry(key).or_default();
|
||||||
|
entry.clone().read_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn exclusive(
|
||||||
|
&self,
|
||||||
|
key: T,
|
||||||
|
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
|
||||||
|
let mut locked = self.entities.lock().unwrap();
|
||||||
|
let entry = locked.entry(key).or_default();
|
||||||
|
entry.clone().write_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
|
||||||
|
/// periodic housekeeping to avoid the map growing indefinitely
|
||||||
|
pub(crate) fn housekeeping(&self) {
|
||||||
|
let mut locked = self.entities.lock().unwrap();
|
||||||
|
locked.retain(|_k, lock| lock.try_write().is_err())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Default for IdLockMap<T>
|
||||||
|
where
|
||||||
|
T: Eq + PartialEq + std::hash::Hash,
|
||||||
|
{
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
entities: std::sync::Mutex::new(HashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ use utils::seqwait::MonotonicCounter;
|
|||||||
mod auth;
|
mod auth;
|
||||||
mod compute_hook;
|
mod compute_hook;
|
||||||
pub mod http;
|
pub mod http;
|
||||||
|
mod id_lock_map;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
mod node;
|
mod node;
|
||||||
pub mod persistence;
|
pub mod persistence;
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ use diesel::prelude::*;
|
|||||||
use diesel::Connection;
|
use diesel::Connection;
|
||||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||||
use pageserver_api::models::TenantConfig;
|
use pageserver_api::models::TenantConfig;
|
||||||
|
use pageserver_api::shard::ShardConfigError;
|
||||||
|
use pageserver_api::shard::ShardIdentity;
|
||||||
|
use pageserver_api::shard::ShardStripeSize;
|
||||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use utils::generation::Generation;
|
use utils::generation::Generation;
|
||||||
@@ -72,6 +75,14 @@ pub(crate) enum DatabaseError {
|
|||||||
Logical(String),
|
Logical(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub(crate) enum AbortShardSplitStatus {
|
||||||
|
/// We aborted the split in the database by reverting to the parent shards
|
||||||
|
Aborted,
|
||||||
|
/// The split had already been persisted.
|
||||||
|
Complete,
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
||||||
|
|
||||||
impl Persistence {
|
impl Persistence {
|
||||||
@@ -570,6 +581,42 @@ impl Persistence {
|
|||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Used when the remote part of a shard split failed: we will revert the database state to have only
|
||||||
|
/// the parent shards, with SplitState::Idle.
|
||||||
|
pub(crate) async fn abort_shard_split(
|
||||||
|
&self,
|
||||||
|
split_tenant_id: TenantId,
|
||||||
|
new_shard_count: ShardCount,
|
||||||
|
) -> DatabaseResult<AbortShardSplitStatus> {
|
||||||
|
use crate::schema::tenant_shards::dsl::*;
|
||||||
|
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
|
||||||
|
let aborted = conn.transaction(|conn| -> QueryResult<AbortShardSplitStatus> {
|
||||||
|
// Clear the splitting state on parent shards
|
||||||
|
let updated = diesel::update(tenant_shards)
|
||||||
|
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||||
|
.filter(shard_count.ne(new_shard_count.literal() as i32))
|
||||||
|
.set((splitting.eq(0),))
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
// Parent shards are already gone: we cannot abort.
|
||||||
|
if updated == 0 {
|
||||||
|
return Ok(AbortShardSplitStatus::Complete);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Erase child shards
|
||||||
|
diesel::delete(tenant_shards)
|
||||||
|
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||||
|
.filter(shard_count.eq(new_shard_count.literal() as i32))
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
Ok(AbortShardSplitStatus::Aborted)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(aborted)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
|
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
|
||||||
@@ -604,6 +651,28 @@ pub(crate) struct TenantShardPersistence {
|
|||||||
pub(crate) config: String,
|
pub(crate) config: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TenantShardPersistence {
|
||||||
|
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
|
||||||
|
if self.shard_count == 0 {
|
||||||
|
Ok(ShardIdentity::unsharded())
|
||||||
|
} else {
|
||||||
|
Ok(ShardIdentity::new(
|
||||||
|
ShardNumber(self.shard_number as u8),
|
||||||
|
ShardCount::new(self.shard_count as u8),
|
||||||
|
ShardStripeSize(self.shard_stripe_size as u32),
|
||||||
|
)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
|
||||||
|
Ok(TenantShardId {
|
||||||
|
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
|
||||||
|
shard_number: ShardNumber(self.shard_number as u8),
|
||||||
|
shard_count: ShardCount::new(self.shard_count as u8),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Parts of [`crate::node::Node`] that are stored durably
|
/// Parts of [`crate::node::Node`] that are stored durably
|
||||||
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
|
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
|
||||||
#[diesel(table_name = crate::schema::nodes)]
|
#[diesel(table_name = crate::schema::nodes)]
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use crate::persistence::Persistence;
|
use crate::persistence::Persistence;
|
||||||
use crate::service;
|
use crate::service;
|
||||||
|
use hyper::StatusCode;
|
||||||
use pageserver_api::models::{
|
use pageserver_api::models::{
|
||||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||||
};
|
};
|
||||||
@@ -18,6 +19,8 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
|||||||
use crate::node::Node;
|
use crate::node::Node;
|
||||||
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
|
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
|
||||||
|
|
||||||
|
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||||
|
|
||||||
/// Object with the lifetime of the background reconcile task that is created
|
/// Object with the lifetime of the background reconcile task that is created
|
||||||
/// for tenants which have a difference between their intent and observed states.
|
/// for tenants which have a difference between their intent and observed states.
|
||||||
pub(super) struct Reconciler {
|
pub(super) struct Reconciler {
|
||||||
@@ -485,17 +488,29 @@ impl Reconciler {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Some(Ok(observed)) => observed,
|
Some(Ok(observed)) => Some(observed),
|
||||||
|
Some(Err(mgmt_api::Error::ApiError(status, _msg)))
|
||||||
|
if status == StatusCode::NOT_FOUND =>
|
||||||
|
{
|
||||||
|
None
|
||||||
|
}
|
||||||
Some(Err(e)) => return Err(e.into()),
|
Some(Err(e)) => return Err(e.into()),
|
||||||
None => return Err(ReconcileError::Cancel),
|
None => return Err(ReconcileError::Cancel),
|
||||||
};
|
};
|
||||||
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
|
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
|
||||||
self.observed.locations.insert(
|
match observed_conf {
|
||||||
attached_node.get_id(),
|
Some(conf) => {
|
||||||
ObservedStateLocation {
|
// Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
|
||||||
conf: observed_conf,
|
// if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
|
||||||
},
|
self.observed
|
||||||
);
|
.locations
|
||||||
|
.insert(attached_node.get_id(), ObservedStateLocation { conf });
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
|
||||||
|
self.observed.locations.remove(&attached_node.get_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -525,7 +540,12 @@ impl Reconciler {
|
|||||||
)));
|
)));
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
let mut wanted_conf = attached_location_conf(
|
||||||
|
generation,
|
||||||
|
&self.shard,
|
||||||
|
&self.config,
|
||||||
|
!self.intent.secondary.is_empty(),
|
||||||
|
);
|
||||||
match self.observed.locations.get(&node.get_id()) {
|
match self.observed.locations.get(&node.get_id()) {
|
||||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||||
// Nothing to do
|
// Nothing to do
|
||||||
@@ -662,10 +682,26 @@ impl Reconciler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// We tweak the externally-set TenantConfig while configuring
|
||||||
|
/// locations, using our awareness of whether secondary locations
|
||||||
|
/// are in use to automatically enable/disable heatmap uploads.
|
||||||
|
fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
|
||||||
|
let mut config = config.clone();
|
||||||
|
if has_secondaries {
|
||||||
|
if config.heatmap_period.is_none() {
|
||||||
|
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
config.heatmap_period = None;
|
||||||
|
}
|
||||||
|
config
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn attached_location_conf(
|
pub(crate) fn attached_location_conf(
|
||||||
generation: Generation,
|
generation: Generation,
|
||||||
shard: &ShardIdentity,
|
shard: &ShardIdentity,
|
||||||
config: &TenantConfig,
|
config: &TenantConfig,
|
||||||
|
has_secondaries: bool,
|
||||||
) -> LocationConfig {
|
) -> LocationConfig {
|
||||||
LocationConfig {
|
LocationConfig {
|
||||||
mode: LocationConfigMode::AttachedSingle,
|
mode: LocationConfigMode::AttachedSingle,
|
||||||
@@ -674,7 +710,7 @@ pub(crate) fn attached_location_conf(
|
|||||||
shard_number: shard.number.0,
|
shard_number: shard.number.0,
|
||||||
shard_count: shard.count.literal(),
|
shard_count: shard.count.literal(),
|
||||||
shard_stripe_size: shard.stripe_size.0,
|
shard_stripe_size: shard.stripe_size.0,
|
||||||
tenant_conf: config.clone(),
|
tenant_conf: ha_aware_config(config, has_secondaries),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -689,6 +725,6 @@ pub(crate) fn secondary_location_conf(
|
|||||||
shard_number: shard.number.0,
|
shard_number: shard.number.0,
|
||||||
shard_count: shard.count.literal(),
|
shard_count: shard.count.literal(),
|
||||||
shard_stripe_size: shard.stripe_size.0,
|
shard_stripe_size: shard.stripe_size.0,
|
||||||
tenant_conf: config.clone(),
|
tenant_conf: ha_aware_config(config, true),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -577,7 +577,12 @@ impl TenantState {
|
|||||||
.generation
|
.generation
|
||||||
.expect("Attempted to enter attached state without a generation");
|
.expect("Attempted to enter attached state without a generation");
|
||||||
|
|
||||||
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
let wanted_conf = attached_location_conf(
|
||||||
|
generation,
|
||||||
|
&self.shard,
|
||||||
|
&self.config,
|
||||||
|
!self.intent.secondary.is_empty(),
|
||||||
|
);
|
||||||
match self.observed.locations.get(&node_id) {
|
match self.observed.locations.get(&node_id) {
|
||||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||||
Some(_) | None => {
|
Some(_) | None => {
|
||||||
|
|||||||
@@ -774,7 +774,10 @@ impl Endpoint {
|
|||||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::builder()
|
||||||
|
.timeout(Duration::from_secs(30))
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
let response = client
|
let response = client
|
||||||
.post(format!(
|
.post(format!(
|
||||||
"http://{}:{}/configure",
|
"http://{}:{}/configure",
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ use std::time::Duration;
|
|||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use camino::Utf8PathBuf;
|
use camino::Utf8PathBuf;
|
||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
|
use hyper::StatusCode;
|
||||||
use pageserver_api::controller_api::NodeRegisterRequest;
|
use pageserver_api::controller_api::NodeRegisterRequest;
|
||||||
use pageserver_api::models::{
|
use pageserver_api::models::{
|
||||||
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
|
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
|
||||||
@@ -262,6 +263,11 @@ impl PageServerNode {
|
|||||||
match st {
|
match st {
|
||||||
Ok(()) => Ok(true),
|
Ok(()) => Ok(true),
|
||||||
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
|
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
|
||||||
|
Err(mgmt_api::Error::ApiError(status, _msg))
|
||||||
|
if status == StatusCode::SERVICE_UNAVAILABLE =>
|
||||||
|
{
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -2103,6 +2103,16 @@ where
|
|||||||
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
|
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
|
||||||
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
|
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
|
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
|
||||||
|
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
|
||||||
|
"failpoint".into()
|
||||||
|
)));
|
||||||
|
|
||||||
|
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
|
||||||
|
anyhow::anyhow!("failpoint")
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
// Spawn a new task to handle the request, to protect the handler from unexpected
|
// Spawn a new task to handle the request, to protect the handler from unexpected
|
||||||
// async cancellations. Most pageserver functions are not async cancellation safe.
|
// async cancellations. Most pageserver functions are not async cancellation safe.
|
||||||
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task
|
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task
|
||||||
@@ -2247,7 +2257,7 @@ pub fn make_router(
|
|||||||
.get("/v1/location_config", |r| {
|
.get("/v1/location_config", |r| {
|
||||||
api_handler(r, list_location_config_handler)
|
api_handler(r, list_location_config_handler)
|
||||||
})
|
})
|
||||||
.get("/v1/location_config/:tenant_id", |r| {
|
.get("/v1/location_config/:tenant_shard_id", |r| {
|
||||||
api_handler(r, get_location_config_handler)
|
api_handler(r, get_location_config_handler)
|
||||||
})
|
})
|
||||||
.put(
|
.put(
|
||||||
|
|||||||
@@ -1440,6 +1440,31 @@ impl TenantManager {
|
|||||||
tenant_shard_id: TenantShardId,
|
tenant_shard_id: TenantShardId,
|
||||||
new_shard_count: ShardCount,
|
new_shard_count: ShardCount,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
|
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||||
|
let r = self
|
||||||
|
.do_shard_split(tenant_shard_id, new_shard_count, ctx)
|
||||||
|
.await;
|
||||||
|
if r.is_err() {
|
||||||
|
// Shard splitting might have left the original shard in a partially shut down state (it
|
||||||
|
// stops the shard's remote timeline client). Reset it to ensure we leave things in
|
||||||
|
// a working state.
|
||||||
|
if self.get(tenant_shard_id).is_some() {
|
||||||
|
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
|
||||||
|
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
|
||||||
|
// Log this error because our return value will still be the original error, not this one.
|
||||||
|
tracing::warn!("Failed to reset {tenant_shard_id}: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn do_shard_split(
|
||||||
|
&self,
|
||||||
|
tenant_shard_id: TenantShardId,
|
||||||
|
new_shard_count: ShardCount,
|
||||||
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||||
|
|
||||||
@@ -1466,6 +1491,10 @@ impl TenantManager {
|
|||||||
.join(",")
|
.join(",")
|
||||||
);
|
);
|
||||||
|
|
||||||
|
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
|
||||||
|
"failpoint"
|
||||||
|
)));
|
||||||
|
|
||||||
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
|
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
|
||||||
if let Err(e) = tenant.split_prepare(&child_shards).await {
|
if let Err(e) = tenant.split_prepare(&child_shards).await {
|
||||||
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
|
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
|
||||||
@@ -1475,6 +1504,10 @@ impl TenantManager {
|
|||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
|
||||||
|
"failpoint"
|
||||||
|
)));
|
||||||
|
|
||||||
self.resources.deletion_queue_client.flush_advisory();
|
self.resources.deletion_queue_client.flush_advisory();
|
||||||
|
|
||||||
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
|
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
|
||||||
@@ -1496,11 +1529,16 @@ impl TenantManager {
|
|||||||
anyhow::bail!("Detached parent shard in the middle of split!")
|
anyhow::bail!("Detached parent shard in the middle of split!")
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
|
||||||
|
"failpoint"
|
||||||
|
)));
|
||||||
// Optimization: hardlink layers from the parent into the children, so that they don't have to
|
// Optimization: hardlink layers from the parent into the children, so that they don't have to
|
||||||
// re-download & duplicate the data referenced in their initial IndexPart
|
// re-download & duplicate the data referenced in their initial IndexPart
|
||||||
self.shard_split_hardlink(parent, child_shards.clone())
|
self.shard_split_hardlink(parent, child_shards.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
|
||||||
|
"failpoint"
|
||||||
|
)));
|
||||||
|
|
||||||
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
||||||
// child shards to reach this point.
|
// child shards to reach this point.
|
||||||
@@ -1537,6 +1575,10 @@ impl TenantManager {
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
|
||||||
|
"failpoint"
|
||||||
|
)));
|
||||||
|
|
||||||
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
||||||
for child_shard_id in &child_shards {
|
for child_shard_id in &child_shards {
|
||||||
let child_shard_id = *child_shard_id;
|
let child_shard_id = *child_shard_id;
|
||||||
@@ -1569,6 +1611,10 @@ impl TenantManager {
|
|||||||
timeline.timeline_id,
|
timeline.timeline_id,
|
||||||
target_lsn
|
target_lsn
|
||||||
);
|
);
|
||||||
|
|
||||||
|
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
|
||||||
|
"failpoint"
|
||||||
|
)));
|
||||||
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
|
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
|
||||||
// Failure here might mean shutdown, in any case this part is an optimization
|
// Failure here might mean shutdown, in any case this part is an optimization
|
||||||
// and we shouldn't hold up the split operation.
|
// and we shouldn't hold up the split operation.
|
||||||
@@ -1614,6 +1660,10 @@ impl TenantManager {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
|
||||||
|
"failpoint"
|
||||||
|
)));
|
||||||
|
|
||||||
parent_slot_guard.drop_old_value()?;
|
parent_slot_guard.drop_old_value()?;
|
||||||
|
|
||||||
// Phase 6: Release the InProgress on the parent shard
|
// Phase 6: Release the InProgress on the parent shard
|
||||||
|
|||||||
@@ -1518,6 +1518,7 @@ class NeonCli(AbstractNeonCli):
|
|||||||
conf: Optional[Dict[str, Any]] = None,
|
conf: Optional[Dict[str, Any]] = None,
|
||||||
shard_count: Optional[int] = None,
|
shard_count: Optional[int] = None,
|
||||||
shard_stripe_size: Optional[int] = None,
|
shard_stripe_size: Optional[int] = None,
|
||||||
|
placement_policy: Optional[str] = None,
|
||||||
set_default: bool = False,
|
set_default: bool = False,
|
||||||
) -> Tuple[TenantId, TimelineId]:
|
) -> Tuple[TenantId, TimelineId]:
|
||||||
"""
|
"""
|
||||||
@@ -1551,6 +1552,9 @@ class NeonCli(AbstractNeonCli):
|
|||||||
if shard_stripe_size is not None:
|
if shard_stripe_size is not None:
|
||||||
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
|
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
|
||||||
|
|
||||||
|
if placement_policy is not None:
|
||||||
|
args.extend(["--placement-policy", str(placement_policy)])
|
||||||
|
|
||||||
res = self.raw_cli(args)
|
res = self.raw_cli(args)
|
||||||
res.check_returncode()
|
res.check_returncode()
|
||||||
return tenant_id, timeline_id
|
return tenant_id, timeline_id
|
||||||
@@ -2168,6 +2172,37 @@ class NeonAttachmentService(MetricsGetter):
|
|||||||
)
|
)
|
||||||
log.info("Attachment service passed consistency check")
|
log.info("Attachment service passed consistency check")
|
||||||
|
|
||||||
|
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
|
||||||
|
if isinstance(config_strings, tuple):
|
||||||
|
pairs = [config_strings]
|
||||||
|
else:
|
||||||
|
pairs = config_strings
|
||||||
|
|
||||||
|
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||||
|
|
||||||
|
res = self.request(
|
||||||
|
"PUT",
|
||||||
|
f"{self.env.attachment_service_api}/debug/v1/failpoints",
|
||||||
|
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||||
|
headers=self.headers(TokenScope.ADMIN),
|
||||||
|
)
|
||||||
|
log.info(f"Got failpoints request response code {res.status_code}")
|
||||||
|
res.raise_for_status()
|
||||||
|
|
||||||
|
def balance_all(self):
|
||||||
|
self.request(
|
||||||
|
"POST",
|
||||||
|
f"{self.env.attachment_service_api}/control/v1/balance/all",
|
||||||
|
headers=self.headers(TokenScope.ADMIN),
|
||||||
|
)
|
||||||
|
|
||||||
|
def balance_attached(self):
|
||||||
|
self.request(
|
||||||
|
"POST",
|
||||||
|
f"{self.env.attachment_service_api}/control/v1/balance/attached",
|
||||||
|
headers=self.headers(TokenScope.ADMIN),
|
||||||
|
)
|
||||||
|
|
||||||
def __enter__(self) -> "NeonAttachmentService":
|
def __enter__(self) -> "NeonAttachmentService":
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@@ -2322,16 +2357,16 @@ class NeonPageserver(PgProtocol):
|
|||||||
def assert_no_errors(self):
|
def assert_no_errors(self):
|
||||||
logfile = self.workdir / "pageserver.log"
|
logfile = self.workdir / "pageserver.log"
|
||||||
if not logfile.exists():
|
if not logfile.exists():
|
||||||
log.warning(f"Skipping log check: {logfile} does not exist")
|
log.warning(f"Skipping log check on pageserver {self.id}: {logfile} does not exist")
|
||||||
return
|
return
|
||||||
|
|
||||||
with logfile.open("r") as f:
|
with logfile.open("r") as f:
|
||||||
errors = scan_pageserver_log_for_errors(f, self.allowed_errors)
|
errors = scan_pageserver_log_for_errors(f, self.allowed_errors)
|
||||||
|
|
||||||
for _lineno, error in errors:
|
for _lineno, error in errors:
|
||||||
log.info(f"not allowed error: {error.strip()}")
|
log.info(f"not allowed error (pageserver {self.id}): {error.strip()}")
|
||||||
|
|
||||||
assert not errors
|
assert not errors, f"Pageserver {self.id}: {errors}"
|
||||||
|
|
||||||
def assert_no_metric_errors(self):
|
def assert_no_metric_errors(self):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import threading
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
@@ -11,6 +12,10 @@ from fixtures.neon_fixtures import (
|
|||||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||||
from fixtures.types import TenantId, TimelineId
|
from fixtures.types import TenantId, TimelineId
|
||||||
|
|
||||||
|
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||||
|
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||||
|
ENDPOINT_LOCK = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
class Workload:
|
class Workload:
|
||||||
"""
|
"""
|
||||||
@@ -41,17 +46,30 @@ class Workload:
|
|||||||
|
|
||||||
self._endpoint: Optional[Endpoint] = None
|
self._endpoint: Optional[Endpoint] = None
|
||||||
|
|
||||||
|
def reconfigure(self):
|
||||||
|
"""
|
||||||
|
Request the endpoint to reconfigure based on location reported by storage controller
|
||||||
|
"""
|
||||||
|
if self._endpoint is not None:
|
||||||
|
with ENDPOINT_LOCK:
|
||||||
|
self._endpoint.reconfigure()
|
||||||
|
|
||||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||||
if self._endpoint is None:
|
# We may be running alongside other Workloads for different tenants. Full TTID is
|
||||||
self._endpoint = self.env.endpoints.create(
|
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
|
||||||
self.branch_name,
|
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
|
||||||
tenant_id=self.tenant_id,
|
|
||||||
pageserver_id=pageserver_id,
|
with ENDPOINT_LOCK:
|
||||||
endpoint_id="ep-workload",
|
if self._endpoint is None:
|
||||||
)
|
self._endpoint = self.env.endpoints.create(
|
||||||
self._endpoint.start(pageserver_id=pageserver_id)
|
self.branch_name,
|
||||||
else:
|
tenant_id=self.tenant_id,
|
||||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
pageserver_id=pageserver_id,
|
||||||
|
endpoint_id=endpoint_id,
|
||||||
|
)
|
||||||
|
self._endpoint.start(pageserver_id=pageserver_id)
|
||||||
|
else:
|
||||||
|
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||||
|
|
||||||
connstring = self._endpoint.safe_psql(
|
connstring = self._endpoint.safe_psql(
|
||||||
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
||||||
@@ -94,7 +112,7 @@ class Workload:
|
|||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
|
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
|
||||||
assert self.expect_rows >= n
|
assert self.expect_rows >= n
|
||||||
|
|
||||||
max_iters = 10
|
max_iters = 10
|
||||||
@@ -132,22 +150,28 @@ class Workload:
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
if ingest:
|
||||||
self.env, self.tenant_id, pageserver_id
|
# Wait for written data to be ingested by the pageserver
|
||||||
):
|
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||||
last_flush_lsn = wait_for_last_flush_lsn(
|
self.env, self.tenant_id, pageserver_id
|
||||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
):
|
||||||
)
|
last_flush_lsn = wait_for_last_flush_lsn(
|
||||||
ps_http = pageserver.http_client()
|
self.env,
|
||||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
endpoint,
|
||||||
|
self.tenant_id,
|
||||||
|
self.timeline_id,
|
||||||
|
pageserver_id=pageserver_id,
|
||||||
|
)
|
||||||
|
ps_http = pageserver.http_client()
|
||||||
|
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||||
|
|
||||||
if upload:
|
if upload:
|
||||||
# force a checkpoint to trigger upload
|
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
|
||||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||||
else:
|
else:
|
||||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||||
|
|
||||||
def validate(self, pageserver_id: Optional[int] = None):
|
def validate(self, pageserver_id: Optional[int] = None):
|
||||||
endpoint = self.endpoint(pageserver_id)
|
endpoint = self.endpoint(pageserver_id)
|
||||||
|
|||||||
@@ -1,13 +1,17 @@
|
|||||||
import os
|
import os
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from fixtures.neon_fixtures import (
|
from fixtures.neon_fixtures import (
|
||||||
|
AttachmentServiceApiException,
|
||||||
|
NeonEnv,
|
||||||
NeonEnvBuilder,
|
NeonEnvBuilder,
|
||||||
tenant_get_shards,
|
tenant_get_shards,
|
||||||
)
|
)
|
||||||
from fixtures.remote_storage import s3_storage
|
from fixtures.remote_storage import s3_storage
|
||||||
from fixtures.types import Lsn, TenantShardId, TimelineId
|
from fixtures.types import Lsn, TenantShardId, TimelineId
|
||||||
|
from fixtures.utils import wait_until
|
||||||
from fixtures.workload import Workload
|
from fixtures.workload import Workload
|
||||||
|
|
||||||
|
|
||||||
@@ -400,3 +404,245 @@ def test_sharding_ingest(
|
|||||||
|
|
||||||
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
||||||
assert huge_layer_count <= shard_count
|
assert huge_layer_count <= shard_count
|
||||||
|
|
||||||
|
|
||||||
|
class Failure:
|
||||||
|
pageserver_id: Optional[int]
|
||||||
|
|
||||||
|
def apply(self, env: NeonEnv):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def clear(self, env: NeonEnv):
|
||||||
|
"""
|
||||||
|
Clear the failure, in a way that should enable the system to proceed
|
||||||
|
to a totally clean state (all nodes online and reconciled)
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def expect_available(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def can_mitigate(self):
|
||||||
|
"""Whether Self.mitigate is available for use"""
|
||||||
|
return False
|
||||||
|
|
||||||
|
def mitigate(self, env: NeonEnv):
|
||||||
|
"""
|
||||||
|
Mitigate the failure in a way that should allow shard split to
|
||||||
|
complete and service to resume, but does not guarantee to leave
|
||||||
|
the whole world in a clean state (e.g. an Offline node might have
|
||||||
|
junk LocationConfigs on it)
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def fails_forward(self):
|
||||||
|
"""
|
||||||
|
If true, this failure results in a state that eventualy completes the split.
|
||||||
|
"""
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class PageserverFailpoint(Failure):
|
||||||
|
def __init__(self, failpoint, pageserver_id, mitigate):
|
||||||
|
self.failpoint = failpoint
|
||||||
|
self.pageserver_id = pageserver_id
|
||||||
|
self._mitigate = mitigate
|
||||||
|
|
||||||
|
def apply(self, env: NeonEnv):
|
||||||
|
pageserver = env.get_pageserver(self.pageserver_id)
|
||||||
|
pageserver.allowed_errors.extend(
|
||||||
|
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
|
||||||
|
)
|
||||||
|
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
|
||||||
|
|
||||||
|
def clear(self, env: NeonEnv):
|
||||||
|
pageserver = env.get_pageserver(self.pageserver_id)
|
||||||
|
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||||
|
if self._mitigate:
|
||||||
|
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Active"})
|
||||||
|
|
||||||
|
def expect_available(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def can_mitigate(self):
|
||||||
|
return self._mitigate
|
||||||
|
|
||||||
|
def mitigate(self, env):
|
||||||
|
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||||
|
|
||||||
|
|
||||||
|
class StorageControllerFailpoint(Failure):
|
||||||
|
def __init__(self, failpoint):
|
||||||
|
self.failpoint = failpoint
|
||||||
|
self.pageserver_id = None
|
||||||
|
|
||||||
|
def apply(self, env: NeonEnv):
|
||||||
|
env.attachment_service.configure_failpoints((self.failpoint, "return(1)"))
|
||||||
|
|
||||||
|
def clear(self, env: NeonEnv):
|
||||||
|
env.attachment_service.configure_failpoints((self.failpoint, "off"))
|
||||||
|
|
||||||
|
def expect_available(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def can_mitigate(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def fails_forward(self):
|
||||||
|
# Edge case: the very last failpoint that simulates a DB connection error, where
|
||||||
|
# the abort path will fail-forward and result in a complete split.
|
||||||
|
return self.failpoint == "shard-split-post-complete"
|
||||||
|
|
||||||
|
|
||||||
|
class NodeKill(Failure):
|
||||||
|
def __init__(self, pageserver_id, mitigate):
|
||||||
|
self.pageserver_id = pageserver_id
|
||||||
|
self._mitigate = mitigate
|
||||||
|
|
||||||
|
def apply(self, env: NeonEnv):
|
||||||
|
pageserver = env.get_pageserver(self.pageserver_id)
|
||||||
|
pageserver.stop(immediate=True)
|
||||||
|
|
||||||
|
def clear(self, env: NeonEnv):
|
||||||
|
pageserver = env.get_pageserver(self.pageserver_id)
|
||||||
|
pageserver.start()
|
||||||
|
|
||||||
|
def expect_available(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def mitigate(self, env):
|
||||||
|
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"failure",
|
||||||
|
[
|
||||||
|
PageserverFailpoint("api-500", 1, False),
|
||||||
|
NodeKill(1, False),
|
||||||
|
PageserverFailpoint("api-500", 1, True),
|
||||||
|
NodeKill(1, True),
|
||||||
|
PageserverFailpoint("shard-split-pre-prepare", 1, False),
|
||||||
|
PageserverFailpoint("shard-split-post-prepare", 1, False),
|
||||||
|
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
|
||||||
|
PageserverFailpoint("shard-split-post-hardlink", 1, False),
|
||||||
|
PageserverFailpoint("shard-split-post-child-conf", 1, False),
|
||||||
|
PageserverFailpoint("shard-split-lsn-wait", 1, False),
|
||||||
|
PageserverFailpoint("shard-split-pre-finish", 1, False),
|
||||||
|
StorageControllerFailpoint("shard-split-validation"),
|
||||||
|
StorageControllerFailpoint("shard-split-post-begin"),
|
||||||
|
StorageControllerFailpoint("shard-split-post-remote"),
|
||||||
|
StorageControllerFailpoint("shard-split-post-complete"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_sharding_split_failures(neon_env_builder: NeonEnvBuilder, failure: Failure):
|
||||||
|
neon_env_builder.num_pageservers = 4
|
||||||
|
initial_shard_count = 2
|
||||||
|
split_shard_count = 4
|
||||||
|
|
||||||
|
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
|
||||||
|
tenant_id = env.initial_tenant
|
||||||
|
timeline_id = env.initial_timeline
|
||||||
|
|
||||||
|
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
|
||||||
|
assert (
|
||||||
|
failure.pageserver_id is None
|
||||||
|
or len(
|
||||||
|
env.get_pageserver(failure.pageserver_id)
|
||||||
|
.http_client()
|
||||||
|
.tenant_list_locations()["tenant_shards"]
|
||||||
|
)
|
||||||
|
> 0
|
||||||
|
)
|
||||||
|
|
||||||
|
workload = Workload(env, tenant_id, timeline_id)
|
||||||
|
workload.init()
|
||||||
|
workload.write_rows(100)
|
||||||
|
|
||||||
|
# Set one pageserver to 500 all requests, then do a split
|
||||||
|
# TODO: also test with a long-blocking failure: controller should time out its request and then
|
||||||
|
# clean up in a well defined way.
|
||||||
|
failure.apply(env)
|
||||||
|
|
||||||
|
with pytest.raises(AttachmentServiceApiException):
|
||||||
|
env.attachment_service.tenant_shard_split(tenant_id, shard_count=4)
|
||||||
|
|
||||||
|
# We expect that the overall operation will fail, but some split requests
|
||||||
|
# will have succeeded: the net result should be to return to a clean state, including
|
||||||
|
# detaching any child shards.
|
||||||
|
def assert_rolled_back(exclude_ps_id=None) -> None:
|
||||||
|
count = 0
|
||||||
|
for ps in env.pageservers:
|
||||||
|
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||||
|
for loc in locations:
|
||||||
|
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||||
|
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||||
|
assert tenant_shard_id.shard_count == initial_shard_count
|
||||||
|
count += 1
|
||||||
|
assert count == initial_shard_count
|
||||||
|
|
||||||
|
def assert_split_done(exclude_ps_id=None) -> None:
|
||||||
|
count = 0
|
||||||
|
for ps in env.pageservers:
|
||||||
|
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||||
|
for loc in locations:
|
||||||
|
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||||
|
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||||
|
assert tenant_shard_id.shard_count == split_shard_count
|
||||||
|
count += 1
|
||||||
|
assert count == split_shard_count
|
||||||
|
|
||||||
|
def finish_split():
|
||||||
|
# Having failed+rolled back, we should be able to split again
|
||||||
|
# No failures this time; it will succeed
|
||||||
|
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||||
|
|
||||||
|
workload.churn_rows(10)
|
||||||
|
workload.validate()
|
||||||
|
|
||||||
|
if failure.expect_available():
|
||||||
|
# Even though the split failed partway through, this should not have interrupted
|
||||||
|
# clients. Disable waiting for pageservers in the workload helper, because our
|
||||||
|
# failpoints may prevent API access.
|
||||||
|
# This only applies for failure modes that leave pageserver page_service API available.
|
||||||
|
workload.churn_rows(10, upload=False, ingest=False)
|
||||||
|
workload.validate()
|
||||||
|
|
||||||
|
if failure.fails_forward():
|
||||||
|
# A failure type which results in eventual completion of the split
|
||||||
|
wait_until(30, 1, assert_split_done)
|
||||||
|
elif failure.can_mitigate():
|
||||||
|
# Mitigation phase: we expect to be able to proceed with a successful shard split
|
||||||
|
failure.mitigate(env)
|
||||||
|
|
||||||
|
# The split should appear to be rolled back from the point of view of all pageservers
|
||||||
|
# apart from the one that is offline
|
||||||
|
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
|
||||||
|
|
||||||
|
finish_split()
|
||||||
|
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
|
||||||
|
|
||||||
|
# Having cleared the failure, everything should converge to a pristine state
|
||||||
|
failure.clear(env)
|
||||||
|
wait_until(30, 1, assert_split_done)
|
||||||
|
else:
|
||||||
|
# Once we restore the faulty pageserver's API to good health, rollback should
|
||||||
|
# eventually complete.
|
||||||
|
failure.clear(env)
|
||||||
|
|
||||||
|
wait_until(30, 1, assert_rolled_back)
|
||||||
|
|
||||||
|
# Having rolled back, the tenant should be working
|
||||||
|
workload.churn_rows(10)
|
||||||
|
workload.validate()
|
||||||
|
|
||||||
|
# Splitting again should work, since we cleared the failure
|
||||||
|
finish_split()
|
||||||
|
assert_split_done()
|
||||||
|
|
||||||
|
env.attachment_service.consistency_check()
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import concurrent.futures
|
||||||
|
import random
|
||||||
import time
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
@@ -23,8 +25,9 @@ from fixtures.pageserver.utils import (
|
|||||||
)
|
)
|
||||||
from fixtures.pg_version import PgVersion
|
from fixtures.pg_version import PgVersion
|
||||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||||
from fixtures.types import TenantId, TimelineId
|
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||||
from fixtures.utils import run_pg_bench_small, wait_until
|
from fixtures.utils import run_pg_bench_small, wait_until
|
||||||
|
from fixtures.workload import Workload
|
||||||
from mypy_boto3_s3.type_defs import (
|
from mypy_boto3_s3.type_defs import (
|
||||||
ObjectTypeDef,
|
ObjectTypeDef,
|
||||||
)
|
)
|
||||||
@@ -770,3 +773,186 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
|
|||||||
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
|
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
|
||||||
|
|
||||||
env.attachment_service.consistency_check()
|
env.attachment_service.consistency_check()
|
||||||
|
|
||||||
|
|
||||||
|
def test_storcon_rolling_failures(
|
||||||
|
neon_env_builder: NeonEnvBuilder, httpserver: HTTPServer, httpserver_listen_address
|
||||||
|
):
|
||||||
|
neon_env_builder.num_pageservers = 8
|
||||||
|
|
||||||
|
(host, port) = httpserver_listen_address
|
||||||
|
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify-attach"
|
||||||
|
|
||||||
|
workloads: dict[TenantId, Workload] = {}
|
||||||
|
|
||||||
|
# Do neon_local endpoint reconfiguration in the background so that we can
|
||||||
|
# accept a healthy rate of calls into notify-attach.
|
||||||
|
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||||
|
|
||||||
|
def handler(request: Request):
|
||||||
|
"""
|
||||||
|
Although storage controller can use neon_local directly, this causes problems when
|
||||||
|
the test is also concurrently modifying endpoints. Instead, configure storage controller
|
||||||
|
to send notifications up to this test code, which will route all endpoint updates
|
||||||
|
through Workload, which has a mutex to make it all safe.
|
||||||
|
"""
|
||||||
|
assert request.json is not None
|
||||||
|
body: dict[str, Any] = request.json
|
||||||
|
log.info(f"notify-attach request: {body}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
workload = workloads[TenantId(body["tenant_id"])]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# This causes the endpoint to query storage controller for its location, which
|
||||||
|
# is redundant since we already have it here, but this avoids extending the
|
||||||
|
# neon_local CLI to take full lists of locations
|
||||||
|
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
|
||||||
|
|
||||||
|
return Response(status=200)
|
||||||
|
|
||||||
|
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||||
|
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
for ps in env.pageservers:
|
||||||
|
# We will do unclean detaches
|
||||||
|
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||||
|
|
||||||
|
n_tenants = 32
|
||||||
|
tenants = [(env.initial_tenant, env.initial_timeline)]
|
||||||
|
for i in range(0, n_tenants - 1):
|
||||||
|
tenant_id = TenantId.generate()
|
||||||
|
timeline_id = TimelineId.generate()
|
||||||
|
shard_count = [1, 2, 4][i % 3]
|
||||||
|
env.neon_cli.create_tenant(
|
||||||
|
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
|
||||||
|
)
|
||||||
|
tenants.append((tenant_id, timeline_id))
|
||||||
|
|
||||||
|
# Background pain:
|
||||||
|
# - TODO: some fraction of pageserver API requests hang
|
||||||
|
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
|
||||||
|
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
|
||||||
|
# the ones we're using for availability checks.
|
||||||
|
|
||||||
|
rng = random.Random(0xDEADBEEF)
|
||||||
|
|
||||||
|
for tenant_id, timeline_id in tenants:
|
||||||
|
workload = Workload(env, tenant_id, timeline_id)
|
||||||
|
workloads[tenant_id] = workload
|
||||||
|
|
||||||
|
def node_evacuated(node_id: int):
|
||||||
|
counts = get_node_shard_counts(env, [t[0] for t in tenants])
|
||||||
|
assert counts[node_id] == 0
|
||||||
|
|
||||||
|
def attachments_active():
|
||||||
|
for tid, _tlid in tenants:
|
||||||
|
for shard in env.attachment_service.locate(tid):
|
||||||
|
psid = shard["node_id"]
|
||||||
|
tsid = TenantShardId.parse(shard["shard_id"])
|
||||||
|
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
|
||||||
|
assert status["state"]["slug"] == "Active"
|
||||||
|
log.info(f"Shard {tsid} active on node {psid}")
|
||||||
|
|
||||||
|
failpoints = ("api-503", "5%1000*return(1)")
|
||||||
|
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
|
||||||
|
for ps in env.pageservers:
|
||||||
|
ps.http_client().configure_failpoints(failpoints)
|
||||||
|
|
||||||
|
def for_all_workloads(callback, timeout=60):
|
||||||
|
futs = []
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||||
|
for _tenant_id, workload in workloads.items():
|
||||||
|
futs.append(pool.submit(callback, workload))
|
||||||
|
|
||||||
|
for f in futs:
|
||||||
|
f.result(timeout=timeout)
|
||||||
|
|
||||||
|
def clean_fail_restore():
|
||||||
|
"""
|
||||||
|
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
|
||||||
|
locations to activate, then SIGTERM it.
|
||||||
|
- Endpoints should not fail any queries
|
||||||
|
- New attach locations should activate within bounded time.
|
||||||
|
"""
|
||||||
|
victim = rng.choice(env.pageservers)
|
||||||
|
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||||
|
|
||||||
|
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||||
|
wait_until(10, 1, attachments_active)
|
||||||
|
|
||||||
|
victim.stop(immediate=False)
|
||||||
|
|
||||||
|
traffic()
|
||||||
|
|
||||||
|
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||||
|
|
||||||
|
# Revert shards to attach at their original locations
|
||||||
|
env.attachment_service.balance_attached()
|
||||||
|
wait_until(10, 1, attachments_active)
|
||||||
|
|
||||||
|
def hard_fail_restore():
|
||||||
|
"""
|
||||||
|
Simulate an unexpected death of a pageserver node
|
||||||
|
"""
|
||||||
|
victim = rng.choice(env.pageservers)
|
||||||
|
victim.stop(immediate=True)
|
||||||
|
# TODO: once we implement heartbeats detecting node failures, remove this
|
||||||
|
# explicit marking offline and rely on storage controller to detect it itself.
|
||||||
|
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||||
|
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||||
|
wait_until(10, 1, attachments_active)
|
||||||
|
traffic()
|
||||||
|
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||||
|
env.attachment_service.balance_attached()
|
||||||
|
wait_until(10, 1, attachments_active)
|
||||||
|
|
||||||
|
def traffic():
|
||||||
|
"""
|
||||||
|
Check that all tenants are working for postgres clients
|
||||||
|
"""
|
||||||
|
|
||||||
|
def exercise_one(workload):
|
||||||
|
workload.churn_rows(100)
|
||||||
|
workload.validate()
|
||||||
|
|
||||||
|
for_all_workloads(exercise_one)
|
||||||
|
|
||||||
|
def init_one(workload):
|
||||||
|
workload.init()
|
||||||
|
workload.write_rows(100)
|
||||||
|
|
||||||
|
for_all_workloads(init_one, timeout=60)
|
||||||
|
|
||||||
|
for i in range(0, 20):
|
||||||
|
mode = rng.choice([0, 1, 2])
|
||||||
|
log.info(f"Iteration {i}, mode {mode}")
|
||||||
|
if mode == 0:
|
||||||
|
# Traffic interval: sometimes, instead of a failure, just let the clients
|
||||||
|
# write a load of data. This avoids chaos tests ending up with unrealistically
|
||||||
|
# small quantities of data in flight.
|
||||||
|
traffic()
|
||||||
|
elif mode == 1:
|
||||||
|
clean_fail_restore()
|
||||||
|
elif mode == 2:
|
||||||
|
hard_fail_restore()
|
||||||
|
|
||||||
|
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
|
||||||
|
# Success criteria:
|
||||||
|
# - New attach locations should activate within bounded time
|
||||||
|
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
|
||||||
|
|
||||||
|
# TODO: fail and remove: fail a node, and remove it from the cluster.
|
||||||
|
# Success criteria:
|
||||||
|
# - Endpoints should not fail any queries
|
||||||
|
# - New attach locations should activate within bounded time
|
||||||
|
# - New secondary locations should fill up with data within bounded time
|
||||||
|
|
||||||
|
# TODO: somehow need to wait for reconciles to complete before doing consistency check
|
||||||
|
# (or make the check wait).
|
||||||
|
|
||||||
|
# Do consistency check on every iteration, not just at the end: this makes it more obvious
|
||||||
|
# which change caused an issue.
|
||||||
|
env.attachment_service.consistency_check()
|
||||||
|
|||||||
Reference in New Issue
Block a user