mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-29 00:00:38 +00:00
Compare commits
19 Commits
jcsp/ha-te
...
jcsp/paths
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4363132305 | ||
|
|
621ea2ec44 | ||
|
|
74d09b78c7 | ||
|
|
0cf0731d8b | ||
|
|
98723844ee | ||
|
|
73a8c97ac8 | ||
|
|
17a3c9036e | ||
|
|
8c5b310090 | ||
|
|
8224580f3e | ||
|
|
2b0f3549f7 | ||
|
|
b4972d07d4 | ||
|
|
26ae7b0b3e | ||
|
|
f8483cc4a3 | ||
|
|
cc5d6c66b3 | ||
|
|
d894d2b450 | ||
|
|
b09d686335 | ||
|
|
74d24582cf | ||
|
|
4834d22d2d | ||
|
|
86e8c43ddf |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -474,7 +474,7 @@ jobs:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_IMPL: vectored
|
||||
|
||||
# Temporary disable this step until we figure out why it's so flaky
|
||||
@@ -554,7 +554,7 @@ jobs:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -282,10 +282,8 @@ dependencies = [
|
||||
"control_plane",
|
||||
"diesel",
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"metrics",
|
||||
|
||||
@@ -17,6 +17,7 @@ use chrono::{DateTime, Utc};
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use nix::unistd::Pid;
|
||||
use postgres::error::SqlState;
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
@@ -722,8 +723,12 @@ impl ComputeNode {
|
||||
// Stop it when it's ready
|
||||
info!("waiting for postgres");
|
||||
wait_for_postgres(&mut pg, Path::new(pgdata))?;
|
||||
pg.kill()?;
|
||||
info!("sent kill signal");
|
||||
// SIGQUIT orders postgres to exit immediately. We don't want to SIGKILL
|
||||
// it to avoid orphaned processes prowling around while datadir is
|
||||
// wiped.
|
||||
let pm_pid = Pid::from_raw(pg.id() as i32);
|
||||
kill(pm_pid, Signal::SIGQUIT)?;
|
||||
info!("sent SIGQUIT signal");
|
||||
pg.wait()?;
|
||||
info!("done prewarming");
|
||||
|
||||
|
||||
@@ -302,9 +302,9 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
RoleAction::Create => {
|
||||
// This branch only runs when roles are created through the console, so it is
|
||||
// safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
|
||||
// from neon_superuser.
|
||||
// from neon_superuser. (NOTE: REPLICATION has been removed from here for now).
|
||||
let mut query: String = format!(
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
info!("running role create query: '{}'", &query);
|
||||
@@ -805,6 +805,18 @@ $$;"#,
|
||||
"",
|
||||
"",
|
||||
// Add new migrations below.
|
||||
r#"
|
||||
DO $$
|
||||
DECLARE
|
||||
role_name TEXT;
|
||||
BEGIN
|
||||
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
|
||||
END LOOP;
|
||||
END
|
||||
$$;"#,
|
||||
];
|
||||
|
||||
let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
|
||||
|
||||
@@ -19,10 +19,8 @@ aws-config.workspace = true
|
||||
aws-sdk-secretsmanager.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
@@ -24,13 +23,10 @@ struct ShardedComputeHookTenant {
|
||||
stripe_size: ShardStripeSize,
|
||||
shard_count: ShardCount,
|
||||
shards: Vec<(ShardNumber, NodeId)>,
|
||||
|
||||
// Async lock used for ensuring that remote compute hook calls are ordered identically to updates to this structure
|
||||
lock: Arc<tokio::sync::Mutex<()>>,
|
||||
}
|
||||
|
||||
enum ComputeHookTenant {
|
||||
Unsharded((NodeId, Arc<tokio::sync::Mutex<()>>)),
|
||||
Unsharded(NodeId),
|
||||
Sharded(ShardedComputeHookTenant),
|
||||
}
|
||||
|
||||
@@ -42,17 +38,9 @@ impl ComputeHookTenant {
|
||||
shards: vec![(tenant_shard_id.shard_number, node_id)],
|
||||
stripe_size,
|
||||
shard_count: tenant_shard_id.shard_count,
|
||||
lock: Arc::default(),
|
||||
})
|
||||
} else {
|
||||
Self::Unsharded((node_id, Arc::default()))
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lock(&self) -> &Arc<tokio::sync::Mutex<()>> {
|
||||
match self {
|
||||
Self::Unsharded((_node_id, lock)) => lock,
|
||||
Self::Sharded(sharded_tenant) => &sharded_tenant.lock,
|
||||
Self::Unsharded(node_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,9 +53,7 @@ impl ComputeHookTenant {
|
||||
node_id: NodeId,
|
||||
) {
|
||||
match self {
|
||||
Self::Unsharded((existing_node_id, _lock))
|
||||
if tenant_shard_id.shard_count.count() == 1 =>
|
||||
{
|
||||
Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => {
|
||||
*existing_node_id = node_id
|
||||
}
|
||||
Self::Sharded(sharded_tenant)
|
||||
@@ -136,15 +122,9 @@ pub(crate) enum NotifyError {
|
||||
}
|
||||
|
||||
impl ComputeHookTenant {
|
||||
fn maybe_reconfigure(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Option<(
|
||||
ComputeHookNotifyRequest,
|
||||
impl std::future::Future<Output = tokio::sync::OwnedMutexGuard<()>>,
|
||||
)> {
|
||||
let request = match self {
|
||||
Self::Unsharded((node_id, _lock)) => Some(ComputeHookNotifyRequest {
|
||||
fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option<ComputeHookNotifyRequest> {
|
||||
match self {
|
||||
Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest {
|
||||
tenant_id,
|
||||
shards: vec![ComputeHookNotifyRequestShard {
|
||||
shard_number: ShardNumber(0),
|
||||
@@ -178,9 +158,7 @@ impl ComputeHookTenant {
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
request.map(|r| (r, self.get_lock().clone().lock_owned()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,11 +167,8 @@ impl ComputeHookTenant {
|
||||
/// the compute connection string.
|
||||
pub(super) struct ComputeHook {
|
||||
config: Config,
|
||||
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
||||
state: tokio::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
||||
authorization_header: Option<String>,
|
||||
|
||||
// This lock is only used in testing enviroments, to serialize calls into neon_lock
|
||||
neon_local_lock: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
@@ -207,7 +182,6 @@ impl ComputeHook {
|
||||
state: Default::default(),
|
||||
config,
|
||||
authorization_header,
|
||||
neon_local_lock: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,10 +190,6 @@ impl ComputeHook {
|
||||
&self,
|
||||
reconfigure_request: ComputeHookNotifyRequest,
|
||||
) -> anyhow::Result<()> {
|
||||
// neon_local updates are not safe to call concurrently, use a lock to serialize
|
||||
// all calls to this function
|
||||
let _locked = self.neon_local_lock.lock().await;
|
||||
|
||||
let env = match LocalEnv::load_config() {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
@@ -370,38 +340,30 @@ impl ComputeHook {
|
||||
stripe_size: ShardStripeSize,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let reconfigure_request = {
|
||||
let mut locked = self.state.lock().unwrap();
|
||||
let mut locked = self.state.lock().await;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
|
||||
tenant.maybe_reconfigure(tenant_shard_id.tenant_id)
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
let Some((reconfigure_request, lock_fut)) = reconfigure_request else {
|
||||
|
||||
let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id);
|
||||
let Some(reconfigure_request) = reconfigure_request else {
|
||||
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
|
||||
// until it does.
|
||||
tracing::info!("Tenant isn't yet ready to emit a notification");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Finish acquiring the tenant's async lock: this future was created inside the self.state
|
||||
// lock above, so we are guaranteed to get this lock in the same order as callers took
|
||||
// that lock. This ordering is essential: the cloud control plane must end up with the
|
||||
// same end state for the tenant that we see.
|
||||
let _guard = lock_fut.await;
|
||||
|
||||
if let Some(notify_url) = &self.config.compute_hook_url {
|
||||
self.do_notify(notify_url, reconfigure_request, cancel)
|
||||
.await
|
||||
@@ -443,7 +405,6 @@ pub(crate) mod tests {
|
||||
tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.shards
|
||||
.len(),
|
||||
1
|
||||
@@ -451,7 +412,6 @@ pub(crate) mod tests {
|
||||
assert!(tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.stripe_size
|
||||
.is_none());
|
||||
|
||||
@@ -485,7 +445,6 @@ pub(crate) mod tests {
|
||||
tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.shards
|
||||
.len(),
|
||||
2
|
||||
@@ -494,7 +453,6 @@ pub(crate) mod tests {
|
||||
tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.stripe_size,
|
||||
Some(ShardStripeSize(32768))
|
||||
);
|
||||
|
||||
@@ -10,9 +10,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::auth::{Scope, SwappableJwtAuth};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
|
||||
use utils::http::request::{must_get_query_param, parse_request_param};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -440,24 +438,6 @@ async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiEr
|
||||
state.service.tenants_dump()
|
||||
}
|
||||
|
||||
async fn handle_balance_all(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
service.balance_all()?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_balance_attached(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
service.balance_attached()?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -574,9 +554,6 @@ pub fn make_router(
|
||||
.post("/debug/v1/consistency_check", |r| {
|
||||
request_span(r, handle_consistency_check)
|
||||
})
|
||||
.put("/debug/v1/failpoints", |r| {
|
||||
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
|
||||
})
|
||||
.get("/control/v1/tenant/:tenant_id/locate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_locate)
|
||||
})
|
||||
@@ -595,12 +572,6 @@ pub fn make_router(
|
||||
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_split)
|
||||
})
|
||||
.post("/control/v1/balance/all", |r| {
|
||||
tenant_service_handler(r, handle_balance_all)
|
||||
})
|
||||
.post("/control/v1/balance/attached", |r| {
|
||||
tenant_service_handler(r, handle_balance_attached)
|
||||
})
|
||||
// Tenant operations
|
||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
|
||||
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
|
||||
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
|
||||
/// is needed at a tenant-wide granularity.
|
||||
pub(crate) struct IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
|
||||
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
|
||||
}
|
||||
|
||||
impl<T> IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
pub(crate) fn shared(
|
||||
&self,
|
||||
key: T,
|
||||
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
let entry = locked.entry(key).or_default();
|
||||
entry.clone().read_owned()
|
||||
}
|
||||
|
||||
pub(crate) fn exclusive(
|
||||
&self,
|
||||
key: T,
|
||||
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
let entry = locked.entry(key).or_default();
|
||||
entry.clone().write_owned()
|
||||
}
|
||||
|
||||
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
|
||||
/// periodic housekeeping to avoid the map growing indefinitely
|
||||
pub(crate) fn housekeeping(&self) {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
locked.retain(|_k, lock| lock.try_write().is_err())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
entities: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ use utils::seqwait::MonotonicCounter;
|
||||
mod auth;
|
||||
mod compute_hook;
|
||||
pub mod http;
|
||||
mod id_lock_map;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
pub mod persistence;
|
||||
|
||||
@@ -11,9 +11,6 @@ use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
use pageserver_api::models::TenantConfig;
|
||||
use pageserver_api::shard::ShardConfigError;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::generation::Generation;
|
||||
@@ -75,14 +72,6 @@ pub(crate) enum DatabaseError {
|
||||
Logical(String),
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) enum AbortShardSplitStatus {
|
||||
/// We aborted the split in the database by reverting to the parent shards
|
||||
Aborted,
|
||||
/// The split had already been persisted.
|
||||
Complete,
|
||||
}
|
||||
|
||||
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
||||
|
||||
impl Persistence {
|
||||
@@ -581,42 +570,6 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Used when the remote part of a shard split failed: we will revert the database state to have only
|
||||
/// the parent shards, with SplitState::Idle.
|
||||
pub(crate) async fn abort_shard_split(
|
||||
&self,
|
||||
split_tenant_id: TenantId,
|
||||
new_shard_count: ShardCount,
|
||||
) -> DatabaseResult<AbortShardSplitStatus> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
|
||||
let aborted = conn.transaction(|conn| -> QueryResult<AbortShardSplitStatus> {
|
||||
// Clear the splitting state on parent shards
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.ne(new_shard_count.literal() as i32))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
|
||||
// Parent shards are already gone: we cannot abort.
|
||||
if updated == 0 {
|
||||
return Ok(AbortShardSplitStatus::Complete);
|
||||
}
|
||||
|
||||
// Erase child shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(new_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(AbortShardSplitStatus::Aborted)
|
||||
})?;
|
||||
|
||||
Ok(aborted)
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
|
||||
@@ -651,28 +604,6 @@ pub(crate) struct TenantShardPersistence {
|
||||
pub(crate) config: String,
|
||||
}
|
||||
|
||||
impl TenantShardPersistence {
|
||||
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
|
||||
if self.shard_count == 0 {
|
||||
Ok(ShardIdentity::unsharded())
|
||||
} else {
|
||||
Ok(ShardIdentity::new(
|
||||
ShardNumber(self.shard_number as u8),
|
||||
ShardCount::new(self.shard_count as u8),
|
||||
ShardStripeSize(self.shard_stripe_size as u32),
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
|
||||
Ok(TenantShardId {
|
||||
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
|
||||
shard_number: ShardNumber(self.shard_number as u8),
|
||||
shard_count: ShardCount::new(self.shard_count as u8),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::node::Node`] that are stored durably
|
||||
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
|
||||
#[diesel(table_name = crate::schema::nodes)]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use crate::persistence::Persistence;
|
||||
use crate::service;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
@@ -19,8 +18,6 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
pub(super) struct Reconciler {
|
||||
@@ -488,29 +485,17 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(Ok(observed)) => Some(observed),
|
||||
Some(Err(mgmt_api::Error::ApiError(status, _msg)))
|
||||
if status == StatusCode::NOT_FOUND =>
|
||||
{
|
||||
None
|
||||
}
|
||||
Some(Ok(observed)) => observed,
|
||||
Some(Err(e)) => return Err(e.into()),
|
||||
None => return Err(ReconcileError::Cancel),
|
||||
};
|
||||
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
|
||||
match observed_conf {
|
||||
Some(conf) => {
|
||||
// Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
|
||||
// if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
|
||||
self.observed
|
||||
.locations
|
||||
.insert(attached_node.get_id(), ObservedStateLocation { conf });
|
||||
}
|
||||
None => {
|
||||
// Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
|
||||
self.observed.locations.remove(&attached_node.get_id());
|
||||
}
|
||||
}
|
||||
self.observed.locations.insert(
|
||||
attached_node.get_id(),
|
||||
ObservedStateLocation {
|
||||
conf: observed_conf,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -540,12 +525,7 @@ impl Reconciler {
|
||||
)));
|
||||
};
|
||||
|
||||
let mut wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
);
|
||||
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
@@ -682,26 +662,10 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
/// We tweak the externally-set TenantConfig while configuring
|
||||
/// locations, using our awareness of whether secondary locations
|
||||
/// are in use to automatically enable/disable heatmap uploads.
|
||||
fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
}
|
||||
config
|
||||
}
|
||||
|
||||
pub(crate) fn attached_location_conf(
|
||||
generation: Generation,
|
||||
shard: &ShardIdentity,
|
||||
config: &TenantConfig,
|
||||
has_secondaries: bool,
|
||||
) -> LocationConfig {
|
||||
LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
@@ -710,7 +674,7 @@ pub(crate) fn attached_location_conf(
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: ha_aware_config(config, has_secondaries),
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -725,6 +689,6 @@ pub(crate) fn secondary_location_conf(
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: ha_aware_config(config, true),
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -577,12 +577,7 @@ impl TenantState {
|
||||
.generation
|
||||
.expect("Attempted to enter attached state without a generation");
|
||||
|
||||
let wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
);
|
||||
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
@@ -622,7 +617,7 @@ impl TenantState {
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn maybe_reconcile(
|
||||
&mut self,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
pageservers: &Arc<HashMap<NodeId, Node>>,
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
service_config: &service::Config,
|
||||
@@ -734,6 +729,7 @@ impl TenantState {
|
||||
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
||||
shard_id=%reconciler.tenant_shard_id.shard_slug());
|
||||
metrics::RECONCILER.spawned.inc();
|
||||
let result_tx = result_tx.clone();
|
||||
let join_handle = tokio::task::spawn(
|
||||
async move {
|
||||
// Wait for any previous reconcile task to complete before we start
|
||||
|
||||
@@ -774,10 +774,7 @@ impl Endpoint {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap();
|
||||
let client = reqwest::Client::new();
|
||||
let response = client
|
||||
.post(format!(
|
||||
"http://{}:{}/configure",
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::time::Duration;
|
||||
use anyhow::{bail, Context};
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::SinkExt;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::controller_api::NodeRegisterRequest;
|
||||
use pageserver_api::models::{
|
||||
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
|
||||
@@ -263,11 +262,6 @@ impl PageServerNode {
|
||||
match st {
|
||||
Ok(()) => Ok(true),
|
||||
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
|
||||
Err(mgmt_api::Error::ApiError(status, _msg))
|
||||
if status == StatusCode::SERVICE_UNAVAILABLE =>
|
||||
{
|
||||
Ok(false)
|
||||
}
|
||||
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
||||
}
|
||||
},
|
||||
|
||||
@@ -29,7 +29,6 @@ pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
mod hll;
|
||||
pub mod metric_vec_duration;
|
||||
pub use hll::{HyperLogLog, HyperLogLogVec};
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod more_process_metrics;
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
//! Helpers for observing duration on `HistogramVec` / `CounterVec` / `GaugeVec` / `MetricVec<T>`.
|
||||
|
||||
use std::{future::Future, time::Instant};
|
||||
|
||||
pub trait DurationResultObserver {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration);
|
||||
}
|
||||
|
||||
pub async fn observe_async_block_duration_by_result<
|
||||
T,
|
||||
E,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
O: DurationResultObserver,
|
||||
>(
|
||||
observer: &O,
|
||||
block: F,
|
||||
) -> Result<T, E> {
|
||||
let start = Instant::now();
|
||||
let result = block.await;
|
||||
let duration = start.elapsed();
|
||||
observer.observe_result(&result, duration);
|
||||
result
|
||||
}
|
||||
@@ -17,6 +17,7 @@ use remote_storage::{
|
||||
};
|
||||
use test_context::test_context;
|
||||
use test_context::AsyncTestContext;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
@@ -484,32 +485,33 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
|
||||
{
|
||||
let mut stream = ctx
|
||||
let stream = ctx
|
||||
.client
|
||||
.download(&path, &cancel)
|
||||
.await
|
||||
.expect("download succeeds")
|
||||
.download_stream;
|
||||
|
||||
let first = stream
|
||||
.next()
|
||||
.await
|
||||
.expect("should have the first blob")
|
||||
.expect("should have succeeded");
|
||||
let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
|
||||
|
||||
tracing::info!(len = first.len(), "downloaded first chunk");
|
||||
let first = reader.fill_buf().await.expect("should have the first blob");
|
||||
|
||||
let len = first.len();
|
||||
tracing::info!(len, "downloaded first chunk");
|
||||
|
||||
assert!(
|
||||
first.len() < len,
|
||||
first.len() < file_len,
|
||||
"uploaded file is too small, we downloaded all on first chunk"
|
||||
);
|
||||
|
||||
reader.consume(len);
|
||||
|
||||
cancel.cancel();
|
||||
|
||||
let next = stream.next().await.expect("stream should have more");
|
||||
let next = reader.fill_buf().await;
|
||||
|
||||
let e = next.expect_err("expected an error, but got a chunk?");
|
||||
|
||||
@@ -520,6 +522,10 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
|
||||
"{inner:?}"
|
||||
);
|
||||
|
||||
let e = DownloadError::from(e);
|
||||
|
||||
assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -84,6 +84,9 @@ where
|
||||
info!("Handling request");
|
||||
}
|
||||
|
||||
// Take a copy of the path for error logging
|
||||
let path = request.uri().path().to_string();
|
||||
|
||||
// No special handling for panics here. There's a `tracing_panic_hook` from another
|
||||
// module to do that globally.
|
||||
let res = handler(request).await;
|
||||
@@ -110,7 +113,7 @@ where
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
Err(err) => Ok(api_error_handler(err)),
|
||||
Err(err) => Ok(api_error_handler(err, Some(&path))),
|
||||
}
|
||||
}
|
||||
.instrument(request_span)
|
||||
|
||||
@@ -108,7 +108,7 @@ impl HttpErrorBody {
|
||||
|
||||
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
|
||||
match err.downcast::<ApiError>() {
|
||||
Ok(api_error) => api_error_handler(*api_error),
|
||||
Ok(api_error) => api_error_handler(*api_error, None),
|
||||
Err(other_error) => {
|
||||
// We expect all the request handlers to return an ApiError, so this should
|
||||
// not be reached. But just in case.
|
||||
@@ -121,12 +121,16 @@ pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
|
||||
pub fn api_error_handler(api_error: ApiError, path: Option<&str>) -> Response<Body> {
|
||||
// Print a stack trace for Internal Server errors
|
||||
|
||||
match api_error {
|
||||
ApiError::Forbidden(_) | ApiError::Unauthorized(_) => {
|
||||
warn!("Error processing HTTP request: {api_error:#}")
|
||||
warn!(
|
||||
"Error processing HTTP request: {api_error:#} {}{}",
|
||||
path.as_ref().map(|_| "at").unwrap_or(""),
|
||||
path.unwrap_or("")
|
||||
)
|
||||
}
|
||||
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
|
||||
@@ -324,11 +324,11 @@ extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
|
||||
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer) {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
(*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
|
||||
(*api).process_safekeeper_feedback(&mut (*wp))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -142,7 +142,7 @@ pub trait ApiImpl {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
|
||||
fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
||||
@@ -83,6 +83,10 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "tokio-epoll-uring";
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
|
||||
|
||||
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
|
||||
|
||||
@@ -932,6 +932,59 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/heatmap_upload:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
post:
|
||||
description: |
|
||||
If the location is in an attached mode, upload the current state to the remote heatmap
|
||||
responses:
|
||||
"200":
|
||||
description: Success
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/secondary/download:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
post:
|
||||
description: |
|
||||
If the location is in secondary mode, download latest heatmap and layers
|
||||
responses:
|
||||
"200":
|
||||
description: Success
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/:
|
||||
parameters:
|
||||
@@ -1391,7 +1444,7 @@ components:
|
||||
trace_read_requests:
|
||||
type: boolean
|
||||
heatmap_period:
|
||||
type: integer
|
||||
type: string
|
||||
TenantConfigResponse:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@@ -2103,16 +2103,6 @@ where
|
||||
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
|
||||
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
|
||||
{
|
||||
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
|
||||
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
|
||||
"failpoint".into()
|
||||
)));
|
||||
|
||||
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
|
||||
anyhow::anyhow!("failpoint")
|
||||
)));
|
||||
}
|
||||
|
||||
// Spawn a new task to handle the request, to protect the handler from unexpected
|
||||
// async cancellations. Most pageserver functions are not async cancellation safe.
|
||||
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task
|
||||
@@ -2257,7 +2247,7 @@ pub fn make_router(
|
||||
.get("/v1/location_config", |r| {
|
||||
api_handler(r, list_location_config_handler)
|
||||
})
|
||||
.get("/v1/location_config/:tenant_shard_id", |r| {
|
||||
.get("/v1/location_config/:tenant_id", |r| {
|
||||
api_handler(r, get_location_config_handler)
|
||||
})
|
||||
.put(
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use enum_map::EnumMap;
|
||||
use metrics::metric_vec_duration::DurationResultObserver;
|
||||
use metrics::{
|
||||
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
|
||||
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
@@ -1283,11 +1282,65 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|
|
||||
})
|
||||
});
|
||||
|
||||
impl DurationResultObserver for BasebackupQueryTime {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
|
||||
pub(crate) struct BasebackupQueryTimeOngoingRecording<'a, 'c> {
|
||||
parent: &'a BasebackupQueryTime,
|
||||
ctx: &'c RequestContext,
|
||||
start: std::time::Instant,
|
||||
}
|
||||
|
||||
impl BasebackupQueryTime {
|
||||
pub(crate) fn start_recording<'c: 'a, 'a>(
|
||||
&'a self,
|
||||
ctx: &'c RequestContext,
|
||||
) -> BasebackupQueryTimeOngoingRecording<'_, '_> {
|
||||
let start = Instant::now();
|
||||
match ctx.micros_spent_throttled.open() {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
rate_limit.call(|| {
|
||||
warn!(error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
|
||||
});
|
||||
}
|
||||
}
|
||||
BasebackupQueryTimeOngoingRecording {
|
||||
parent: self,
|
||||
ctx,
|
||||
start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
|
||||
pub(crate) fn observe<T, E>(self, res: &Result<T, E>) {
|
||||
let elapsed = self.start.elapsed();
|
||||
let ex_throttled = self
|
||||
.ctx
|
||||
.micros_spent_throttled
|
||||
.close_and_checked_sub_from(elapsed);
|
||||
let ex_throttled = match ex_throttled {
|
||||
Ok(ex_throttled) => ex_throttled,
|
||||
Err(error) => {
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
rate_limit.call(|| {
|
||||
warn!(error, "error deducting time spent throttled; this message is logged at a global rate limit");
|
||||
});
|
||||
elapsed
|
||||
}
|
||||
};
|
||||
let label_value = if res.is_ok() { "ok" } else { "error" };
|
||||
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
|
||||
metric.observe(duration.as_secs_f64());
|
||||
let metric = self
|
||||
.parent
|
||||
.0
|
||||
.get_metric_with_label_values(&[label_value])
|
||||
.unwrap();
|
||||
metric.observe(ex_throttled.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1964,10 +2017,8 @@ impl TimelineMetrics {
|
||||
pub(crate) fn resident_physical_size_get(&self) -> u64 {
|
||||
self.resident_physical_size_gauge.get()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineMetrics {
|
||||
fn drop(&mut self) {
|
||||
pub(crate) fn shutdown(&self) {
|
||||
let tenant_id = &self.tenant_id;
|
||||
let timeline_id = &self.timeline_id;
|
||||
let shard_id = &self.shard_id;
|
||||
@@ -2623,6 +2674,12 @@ pub fn preinitialize_metrics() {
|
||||
Lazy::force(&crate::tenant::storage_layer::layer::LAYER_IMPL_METRICS);
|
||||
Lazy::force(&disk_usage_based_eviction::METRICS);
|
||||
|
||||
for state_name in pageserver_api::models::TenantState::VARIANTS {
|
||||
// initialize the metric for all gauges, otherwise the time series might seemingly show
|
||||
// values from last restart.
|
||||
TENANT_STATE_METRIC.with_label_values(&[state_name]).set(0);
|
||||
}
|
||||
|
||||
// countervecs
|
||||
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
|
||||
.into_iter()
|
||||
|
||||
@@ -1199,7 +1199,7 @@ impl PageServerHandler {
|
||||
prev_lsn: Option<Lsn>,
|
||||
full_backup: bool,
|
||||
gzip: bool,
|
||||
ctx: RequestContext,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -1214,7 +1214,7 @@ impl PageServerHandler {
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
info!("waiting for {}", lsn);
|
||||
timeline.wait_lsn(lsn, &ctx).await?;
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
|
||||
.context("invalid basebackup lsn")?;
|
||||
@@ -1236,7 +1236,7 @@ impl PageServerHandler {
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
@@ -1257,7 +1257,7 @@ impl PageServerHandler {
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
// shutdown the encoder to ensure the gzip footer is written
|
||||
@@ -1269,7 +1269,7 @@ impl PageServerHandler {
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -1449,25 +1449,25 @@ where
|
||||
false
|
||||
};
|
||||
|
||||
::metrics::metric_vec_duration::observe_async_block_duration_by_result(
|
||||
&*metrics::BASEBACKUP_QUERY_TIME,
|
||||
async move {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
gzip,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
Result::<(), QueryError>::Ok(())
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
|
||||
let res = async {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
gzip,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
Result::<(), QueryError>::Ok(())
|
||||
}
|
||||
.await;
|
||||
metric_recording.observe(&res);
|
||||
res?;
|
||||
}
|
||||
// return pair of prev_lsn and last_lsn
|
||||
else if query_string.starts_with("get_last_record_rlsn ") {
|
||||
@@ -1563,7 +1563,7 @@ where
|
||||
prev_lsn,
|
||||
true,
|
||||
false,
|
||||
ctx,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
|
||||
@@ -272,9 +272,6 @@ pub enum TaskKind {
|
||||
// Task that uploads a file to remote storage
|
||||
RemoteUploadTask,
|
||||
|
||||
// Task that downloads a file from remote storage
|
||||
RemoteDownloadTask,
|
||||
|
||||
// task that handles the initial downloading of all tenants
|
||||
InitialLoad,
|
||||
|
||||
|
||||
@@ -1846,6 +1846,8 @@ impl Tenant {
|
||||
// Wait for any in-flight operations to complete
|
||||
self.gate.close().await;
|
||||
|
||||
remove_tenant_metrics(&self.tenant_shard_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3557,11 +3559,6 @@ async fn run_initdb(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl Drop for Tenant {
|
||||
fn drop(&mut self) {
|
||||
remove_tenant_metrics(&self.tenant_shard_id);
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub async fn dump_layerfile_from_path(
|
||||
path: &Utf8Path,
|
||||
|
||||
@@ -354,6 +354,7 @@ pub struct TenantConf {
|
||||
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
/// locations will use the heatmap uploaded by attached locations.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Duration,
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
|
||||
@@ -1440,31 +1440,6 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let r = self
|
||||
.do_shard_split(tenant_shard_id, new_shard_count, ctx)
|
||||
.await;
|
||||
if r.is_err() {
|
||||
// Shard splitting might have left the original shard in a partially shut down state (it
|
||||
// stops the shard's remote timeline client). Reset it to ensure we leave things in
|
||||
// a working state.
|
||||
if self.get(tenant_shard_id).is_some() {
|
||||
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
|
||||
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
|
||||
// Log this error because our return value will still be the original error, not this one.
|
||||
tracing::warn!("Failed to reset {tenant_shard_id}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r
|
||||
}
|
||||
|
||||
pub(crate) async fn do_shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
@@ -1491,10 +1466,6 @@ impl TenantManager {
|
||||
.join(",")
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
|
||||
if let Err(e) = tenant.split_prepare(&child_shards).await {
|
||||
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
|
||||
@@ -1504,10 +1475,6 @@ impl TenantManager {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
self.resources.deletion_queue_client.flush_advisory();
|
||||
|
||||
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
|
||||
@@ -1529,16 +1496,11 @@ impl TenantManager {
|
||||
anyhow::bail!("Detached parent shard in the middle of split!")
|
||||
}
|
||||
};
|
||||
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Optimization: hardlink layers from the parent into the children, so that they don't have to
|
||||
// re-download & duplicate the data referenced in their initial IndexPart
|
||||
self.shard_split_hardlink(parent, child_shards.clone())
|
||||
.await?;
|
||||
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
||||
// child shards to reach this point.
|
||||
@@ -1575,10 +1537,6 @@ impl TenantManager {
|
||||
.await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
||||
for child_shard_id in &child_shards {
|
||||
let child_shard_id = *child_shard_id;
|
||||
@@ -1611,10 +1569,6 @@ impl TenantManager {
|
||||
timeline.timeline_id,
|
||||
target_lsn
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
|
||||
// Failure here might mean shutdown, in any case this part is an optimization
|
||||
// and we shouldn't hold up the split operation.
|
||||
@@ -1660,10 +1614,6 @@ impl TenantManager {
|
||||
},
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
parent_slot_guard.drop_old_value()?;
|
||||
|
||||
// Phase 6: Release the InProgress on the parent shard
|
||||
|
||||
@@ -536,6 +536,18 @@ impl Drop for LayerInner {
|
||||
// carry this until we are finished for [`Layer::wait_drop`] support
|
||||
let _status = status;
|
||||
|
||||
let Some(timeline) = timeline.upgrade() else {
|
||||
// no need to nag that timeline is gone: under normal situation on
|
||||
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
};
|
||||
|
||||
let Ok(_guard) = timeline.gate.enter() else {
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
};
|
||||
|
||||
let removed = match std::fs::remove_file(path) {
|
||||
Ok(()) => true,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
@@ -554,32 +566,26 @@ impl Drop for LayerInner {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(timeline) = timeline.upgrade() {
|
||||
if removed {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
if removed {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
// demonstrating this deadlock (without spawn_blocking): stop will drop
|
||||
// queued items, which will have ResidentLayer's, and those drops would try
|
||||
// to re-entrantly lock the RemoteTimelineClient inner state.
|
||||
if !timeline.is_active() {
|
||||
tracing::info!("scheduling deletion on drop failed: {e:#}");
|
||||
} else {
|
||||
tracing::warn!("scheduling deletion on drop failed: {e:#}");
|
||||
}
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
// demonstrating this deadlock (without spawn_blocking): stop will drop
|
||||
// queued items, which will have ResidentLayer's, and those drops would try
|
||||
// to re-entrantly lock the RemoteTimelineClient inner state.
|
||||
if !timeline.is_active() {
|
||||
tracing::info!("scheduling deletion on drop failed: {e:#}");
|
||||
} else {
|
||||
LAYER_IMPL_METRICS.inc_completed_deletes();
|
||||
tracing::warn!("scheduling deletion on drop failed: {e:#}");
|
||||
}
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
|
||||
} else {
|
||||
LAYER_IMPL_METRICS.inc_completed_deletes();
|
||||
}
|
||||
} else {
|
||||
// no need to nag that timeline is gone: under normal situation on
|
||||
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -880,23 +886,18 @@ impl LayerInner {
|
||||
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let task_name = format!("download layer {}", self);
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
|
||||
// block tenant::mgr::remove_tenant_from_memory.
|
||||
|
||||
let this: Arc<Self> = self.clone();
|
||||
|
||||
crate::task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
crate::task_mgr::TaskKind::RemoteDownloadTask,
|
||||
Some(self.desc.tenant_shard_id),
|
||||
Some(self.desc.timeline_id),
|
||||
&task_name,
|
||||
false,
|
||||
async move {
|
||||
let guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| DownloadError::DownloadCancelled)?;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
|
||||
let _guard = guard;
|
||||
|
||||
let client = timeline
|
||||
.remote_client
|
||||
@@ -906,7 +907,7 @@ impl LayerInner {
|
||||
let result = client.download_layer_file(
|
||||
&this.desc.filename(),
|
||||
&this.metadata(),
|
||||
&crate::task_mgr::shutdown_token()
|
||||
&timeline.cancel
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -929,7 +930,6 @@ impl LayerInner {
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {},
|
||||
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
|
||||
_ = timeline.cancel.cancelled() => {},
|
||||
};
|
||||
|
||||
@@ -959,11 +959,10 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
match rx.await {
|
||||
Ok((Ok(()), permit)) => {
|
||||
if let Some(reason) = self
|
||||
@@ -1102,6 +1101,10 @@ impl LayerInner {
|
||||
return Err(EvictionCancelled::TimelineGone);
|
||||
};
|
||||
|
||||
let Ok(_gate) = timeline.gate.enter() else {
|
||||
return Err(EvictionCancelled::TimelineGone);
|
||||
};
|
||||
|
||||
// to avoid starting a new download while we evict, keep holding on to the
|
||||
// permit.
|
||||
let _permit = {
|
||||
|
||||
@@ -1257,6 +1257,8 @@ impl Timeline {
|
||||
|
||||
// Finally wait until any gate-holders are complete
|
||||
self.gate.close().await;
|
||||
|
||||
self.metrics.shutdown();
|
||||
}
|
||||
|
||||
pub(crate) fn set_state(&self, new_state: TimelineState) {
|
||||
|
||||
@@ -149,7 +149,7 @@ hnsw_check_available_memory(Size requested)
|
||||
struct sysinfo si;
|
||||
Size total;
|
||||
if (sysinfo(&si) < 0)
|
||||
elog(ERROR, "Failed to get amount of RAM: %n");
|
||||
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||
|
||||
total = si.totalram*si.mem_unit;
|
||||
if ((Size)NBuffers*BLCKSZ + requested >= total)
|
||||
|
||||
@@ -21,7 +21,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -38,7 +38,6 @@ neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
|
||||
CURLcode res;
|
||||
char *compute_ctl_url;
|
||||
char *postdata;
|
||||
bool ret = false;
|
||||
|
||||
if (handle == NULL)
|
||||
|
||||
@@ -316,6 +316,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC;
|
||||
TimestampTz now;
|
||||
uint64_t us_since_last_connect;
|
||||
bool broke_from_loop = false;
|
||||
|
||||
Assert(page_servers[shard_no].conn == NULL);
|
||||
|
||||
@@ -418,7 +419,9 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
|
||||
neon_shard_log(shard_no, elevel, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
return false;
|
||||
/* Returning from inside PG_TRY is bad, so we break/return later */
|
||||
broke_from_loop = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -431,6 +434,11 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
if (broke_from_loop)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
neon_shard_log(shard_no, LOG, "libpagestore: connected to '%s'", connstr);
|
||||
page_servers[shard_no].conn = conn;
|
||||
page_servers[shard_no].wes = wes;
|
||||
|
||||
6
pgxn/neon/neon--1.1--1.0.sql
Normal file
6
pgxn/neon/neon--1.1--1.0.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
-- the order of operations is important here
|
||||
-- because the view depends on the function
|
||||
|
||||
DROP VIEW IF EXISTS neon_lfc_stats CASCADE;
|
||||
|
||||
DROP FUNCTION IF EXISTS neon_get_lfc_stats CASCADE;
|
||||
1
pgxn/neon/neon--1.2--1.1.sql
Normal file
1
pgxn/neon/neon--1.2--1.1.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP VIEW IF EXISTS NEON_STAT_FILE_CACHE CASCADE;
|
||||
1
pgxn/neon/neon--1.3--1.2.sql
Normal file
1
pgxn/neon/neon--1.3--1.2.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP FUNCTION IF EXISTS approximate_working_set_size(bool) CASCADE;
|
||||
@@ -95,7 +95,6 @@ get_num_snap_files_lsn_threshold(void)
|
||||
DIR *dirdesc;
|
||||
struct dirent *de;
|
||||
char *snap_path = "pg_logical/snapshots/";
|
||||
int cnt = 0;
|
||||
int lsns_allocated = 1024;
|
||||
int lsns_num = 0;
|
||||
XLogRecPtr *lsns;
|
||||
@@ -161,9 +160,6 @@ get_num_snap_files_lsn_threshold(void)
|
||||
PGDLLEXPORT void
|
||||
LogicalSlotsMonitorMain(Datum main_arg)
|
||||
{
|
||||
TimestampTz now,
|
||||
last_checked;
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
|
||||
@@ -1888,7 +1888,6 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
int nblocks, bool skipFsync)
|
||||
{
|
||||
const PGAlignedBlock buffer = {0};
|
||||
BlockNumber curblocknum = blocknum;
|
||||
int remblocks = nblocks;
|
||||
XLogRecPtr lsn = 0;
|
||||
|
||||
|
||||
@@ -1220,7 +1220,7 @@ PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr begin
|
||||
req->epochStartLsn = wp->propEpochStartLsn;
|
||||
req->beginLsn = beginLsn;
|
||||
req->endLsn = endLsn;
|
||||
req->commitLsn = GetAcknowledgedByQuorumWALPosition(wp);
|
||||
req->commitLsn = wp->commitLsn;
|
||||
req->truncateLsn = wp->truncateLsn;
|
||||
req->proposerId = wp->greetRequest.proposerId;
|
||||
}
|
||||
@@ -1405,7 +1405,7 @@ static bool
|
||||
RecvAppendResponses(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
XLogRecPtr minQuorumLsn;
|
||||
XLogRecPtr newCommitLsn;
|
||||
bool readAnything = false;
|
||||
|
||||
while (true)
|
||||
@@ -1444,18 +1444,19 @@ RecvAppendResponses(Safekeeper *sk)
|
||||
if (!readAnything)
|
||||
return sk->state == SS_ACTIVE;
|
||||
|
||||
HandleSafekeeperResponse(wp);
|
||||
|
||||
/* update commit_lsn */
|
||||
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
|
||||
/*
|
||||
* Also send the new commit lsn to all the safekeepers.
|
||||
* Send the new value to all safekeepers.
|
||||
*/
|
||||
minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
|
||||
if (minQuorumLsn > wp->lastSentCommitLsn)
|
||||
if (newCommitLsn > wp->commitLsn)
|
||||
{
|
||||
wp->commitLsn = newCommitLsn;
|
||||
BroadcastAppendRequest(wp);
|
||||
wp->lastSentCommitLsn = minQuorumLsn;
|
||||
}
|
||||
|
||||
HandleSafekeeperResponse(wp);
|
||||
|
||||
return sk->state == SS_ACTIVE;
|
||||
}
|
||||
|
||||
@@ -1632,11 +1633,9 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
||||
static void
|
||||
HandleSafekeeperResponse(WalProposer *wp)
|
||||
{
|
||||
XLogRecPtr minQuorumLsn;
|
||||
XLogRecPtr candidateTruncateLsn;
|
||||
|
||||
minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
|
||||
wp->api.process_safekeeper_feedback(wp, minQuorumLsn);
|
||||
wp->api.process_safekeeper_feedback(wp);
|
||||
|
||||
/*
|
||||
* Try to advance truncateLsn -- the last record flushed to all
|
||||
@@ -1649,7 +1648,7 @@ HandleSafekeeperResponse(WalProposer *wp)
|
||||
* can't commit entries from previous term' in Raft); 2)
|
||||
*/
|
||||
candidateTruncateLsn = CalculateMinFlushLsn(wp);
|
||||
candidateTruncateLsn = Min(candidateTruncateLsn, minQuorumLsn);
|
||||
candidateTruncateLsn = Min(candidateTruncateLsn, wp->commitLsn);
|
||||
if (candidateTruncateLsn > wp->truncateLsn)
|
||||
{
|
||||
wp->truncateLsn = candidateTruncateLsn;
|
||||
|
||||
@@ -564,7 +564,7 @@ typedef struct walproposer_api
|
||||
* backpressure feedback and to confirm WAL persistence (has been commited
|
||||
* on the quorum of safekeepers).
|
||||
*/
|
||||
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
|
||||
void (*process_safekeeper_feedback) (WalProposer *wp);
|
||||
|
||||
/*
|
||||
* Write a log message to the internal log processor. This is used only
|
||||
@@ -646,8 +646,8 @@ typedef struct WalProposer
|
||||
/* WAL has been generated up to this point */
|
||||
XLogRecPtr availableLsn;
|
||||
|
||||
/* last commitLsn broadcasted to safekeepers */
|
||||
XLogRecPtr lastSentCommitLsn;
|
||||
/* cached GetAcknowledgedByQuorumWALPosition result */
|
||||
XLogRecPtr commitLsn;
|
||||
|
||||
ProposerGreeting greetRequest;
|
||||
|
||||
|
||||
@@ -68,6 +68,8 @@ static WalproposerShmemState *walprop_shared;
|
||||
static WalProposerConfig walprop_config;
|
||||
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
|
||||
static const walproposer_api walprop_pg;
|
||||
static volatile sig_atomic_t got_SIGUSR2 = false;
|
||||
static bool reported_sigusr2 = false;
|
||||
|
||||
static void nwp_shmem_startup_hook(void);
|
||||
static void nwp_register_gucs(void);
|
||||
@@ -101,6 +103,8 @@ static void add_nwr_event_set(Safekeeper *sk, uint32 events);
|
||||
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
|
||||
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
|
||||
|
||||
static void CheckGracefulShutdown(WalProposer *wp);
|
||||
|
||||
static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp);
|
||||
|
||||
static void
|
||||
@@ -492,6 +496,24 @@ walprop_pg_init_standalone_sync_safekeepers(void)
|
||||
BackgroundWorkerUnblockSignals();
|
||||
}
|
||||
|
||||
/*
|
||||
* We pretend to be a walsender process, and the lifecycle of a walsender is
|
||||
* slightly different than other procesess. At shutdown, walsender processes
|
||||
* stay alive until the very end, after the checkpointer has written the
|
||||
* shutdown checkpoint. When the checkpointer exits, the postmaster sends all
|
||||
* remaining walsender processes SIGUSR2. On receiving SIGUSR2, we try to send
|
||||
* the remaining WAL, and then exit. This ensures that the checkpoint record
|
||||
* reaches durable storage (in safekeepers), before the server shuts down
|
||||
* completely.
|
||||
*/
|
||||
static void
|
||||
walprop_sigusr2(SIGNAL_ARGS)
|
||||
{
|
||||
got_SIGUSR2 = true;
|
||||
|
||||
SetLatch(MyLatch);
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_pg_init_bgworker(void)
|
||||
{
|
||||
@@ -503,6 +525,7 @@ walprop_pg_init_bgworker(void)
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, die);
|
||||
pqsignal(SIGUSR2, walprop_sigusr2);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
@@ -1026,7 +1049,7 @@ static void
|
||||
StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
|
||||
{
|
||||
XLogRecPtr FlushPtr;
|
||||
TimeLineID currTLI;
|
||||
__attribute__((unused)) TimeLineID currTLI;
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (ThisTimeLineID == 0)
|
||||
@@ -1075,14 +1098,26 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
|
||||
#endif
|
||||
|
||||
/*
|
||||
* When we first start replication the standby will be behind the primary.
|
||||
* For some applications, for example synchronous replication, it is
|
||||
* important to have a clear state for this initial catchup mode, so we
|
||||
* can trigger actions when we change streaming state later. We may stay
|
||||
* in this state for a long time, which is exactly why we want to be able
|
||||
* to monitor whether or not we are still here.
|
||||
* XXX: Move straight to STOPPING state, skipping the STREAMING state.
|
||||
*
|
||||
* This is a bit weird. Normal walsenders stay in STREAMING state, until
|
||||
* the checkpointer signals them that it is about to start writing the
|
||||
* shutdown checkpoint. The walsenders acknowledge that they have received
|
||||
* that signal by switching to STOPPING state. That tells the walsenders
|
||||
* that they must not write any new WAL.
|
||||
*
|
||||
* However, we cannot easily intercept that signal from the checkpointer.
|
||||
* It's sent by WalSndInitStopping(), using
|
||||
* SendProcSignal(PROCSIGNAL_WALSND_INIT_STOPPING). It's received by
|
||||
* HandleWalSndInitStopping, which sets a process-local got_STOPPING flag.
|
||||
* However, that's all private to walsender.c.
|
||||
*
|
||||
* We don't need to do anything special upon receiving the signal, the
|
||||
* walproposer doesn't write any WAL anyway, so we skip the STREAMING
|
||||
* state and go directly to STOPPING mode. That way, the checkpointer
|
||||
* won't wait for us.
|
||||
*/
|
||||
WalSndSetState(WALSNDSTATE_CATCHUP);
|
||||
WalSndSetState(WALSNDSTATE_STOPPING);
|
||||
|
||||
/*
|
||||
* Don't allow a request to stream from a future point in WAL that hasn't
|
||||
@@ -1122,6 +1157,8 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
|
||||
static void
|
||||
WalSndLoop(WalProposer *wp)
|
||||
{
|
||||
XLogRecPtr flushPtr;
|
||||
|
||||
/* Clear any already-pending wakeups */
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
@@ -1130,9 +1167,6 @@ WalSndLoop(WalProposer *wp)
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
XLogBroadcastWalProposer(wp);
|
||||
|
||||
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
|
||||
WalSndSetState(WALSNDSTATE_STREAMING);
|
||||
WalProposerPoll(wp);
|
||||
}
|
||||
}
|
||||
@@ -1230,7 +1264,6 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
|
||||
TimeLineID timeline;
|
||||
XLogRecPtr startpos;
|
||||
XLogRecPtr endpos;
|
||||
uint64 download_range_mb;
|
||||
|
||||
startpos = GetLogRepRestartLSN(wp);
|
||||
if (startpos == InvalidXLogRecPtr)
|
||||
@@ -1745,6 +1778,9 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
|
||||
{
|
||||
ConditionVariableCancelSleep();
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CheckGracefulShutdown(wp);
|
||||
|
||||
*events = WL_LATCH_SET;
|
||||
return 1;
|
||||
}
|
||||
@@ -1798,6 +1834,41 @@ walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn)
|
||||
exit(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Like vanilla walsender, on sigusr2 send all remaining WAL and exit.
|
||||
*
|
||||
* Note that unlike sync-safekeepers waiting here is not reliable: we
|
||||
* don't check that majority of safekeepers received and persisted
|
||||
* commit_lsn -- only that walproposer reached it (which immediately
|
||||
* broadcasts new value). Doing that without incurring redundant control
|
||||
* file syncing would need wp -> sk protocol change. OTOH unlike
|
||||
* sync-safekeepers which must bump commit_lsn or basebackup will fail,
|
||||
* this catchup is important only for tests where safekeepers/network
|
||||
* don't crash on their own.
|
||||
*/
|
||||
static void
|
||||
CheckGracefulShutdown(WalProposer *wp)
|
||||
{
|
||||
if (got_SIGUSR2)
|
||||
{
|
||||
if (!reported_sigusr2)
|
||||
{
|
||||
XLogRecPtr flushPtr = walprop_pg_get_flush_rec_ptr(wp);
|
||||
|
||||
wpg_log(LOG, "walproposer will send and wait for remaining WAL between %X/%X and %X/%X",
|
||||
LSN_FORMAT_ARGS(wp->commitLsn), LSN_FORMAT_ARGS(flushPtr));
|
||||
reported_sigusr2 = true;
|
||||
}
|
||||
|
||||
if (wp->commitLsn >= walprop_pg_get_flush_rec_ptr(wp))
|
||||
{
|
||||
wpg_log(LOG, "walproposer sent all WAL up to %X/%X, exiting",
|
||||
LSN_FORMAT_ARGS(wp->commitLsn));
|
||||
proc_exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Choose most advanced PageserverFeedback and set it to *rf.
|
||||
*/
|
||||
@@ -1878,7 +1949,7 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
|
||||
* None of that is functional in sync-safekeepers.
|
||||
*/
|
||||
static void
|
||||
walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
|
||||
walprop_pg_process_safekeeper_feedback(WalProposer *wp)
|
||||
{
|
||||
HotStandbyFeedback hsFeedback;
|
||||
XLogRecPtr oldDiskConsistentLsn;
|
||||
@@ -1893,10 +1964,10 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
|
||||
replication_feedback_set(&quorumFeedback.rf);
|
||||
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
|
||||
|
||||
if (commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
|
||||
if (wp->commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
|
||||
{
|
||||
if (commitLsn > quorumFeedback.flushLsn)
|
||||
quorumFeedback.flushLsn = commitLsn;
|
||||
if (wp->commitLsn > quorumFeedback.flushLsn)
|
||||
quorumFeedback.flushLsn = wp->commitLsn;
|
||||
|
||||
/*
|
||||
* Advance the replication slot to commitLsn. WAL before it is
|
||||
@@ -1929,6 +2000,8 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
|
||||
XidFromFullTransactionId(hsFeedback.catalog_xmin),
|
||||
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
|
||||
}
|
||||
|
||||
CheckGracefulShutdown(wp);
|
||||
}
|
||||
|
||||
static XLogRecPtr
|
||||
|
||||
@@ -182,8 +182,6 @@ test_consume_memory(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
test_release_memory(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TimestampTz start;
|
||||
|
||||
if (PG_ARGISNULL(0))
|
||||
{
|
||||
if (consume_cxt)
|
||||
|
||||
@@ -220,6 +220,9 @@ enter_seccomp_mode(void)
|
||||
}
|
||||
#endif /* HAVE_LIBSECCOMP */
|
||||
|
||||
PGDLLEXPORT void
|
||||
WalRedoMain(int argc, char *argv[]);
|
||||
|
||||
/*
|
||||
* Entry point for the WAL redo process.
|
||||
*
|
||||
|
||||
@@ -73,7 +73,7 @@ pub mod errors {
|
||||
// Status 406: endpoint is disabled (we don't allow connections).
|
||||
format!("{REQUEST_FAILED}: endpoint is disabled")
|
||||
}
|
||||
http::StatusCode::LOCKED => {
|
||||
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
|
||||
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
|
||||
format!("{REQUEST_FAILED}: endpoint is temporary unavailable. check your quotas and/or contact our support")
|
||||
}
|
||||
@@ -91,6 +91,12 @@ pub mod errors {
|
||||
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
text,
|
||||
} if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
text,
|
||||
@@ -120,6 +126,11 @@ pub mod errors {
|
||||
status: http::StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => true,
|
||||
// don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
} => !text.contains("compute time quota of non-primary branches is exceeded"),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
|
||||
@@ -69,6 +69,12 @@ fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
}) if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
|
||||
@@ -196,6 +196,7 @@ pub struct SimulationApi {
|
||||
safekeepers: RefCell<Vec<SafekeeperConn>>,
|
||||
disk: Arc<DiskWalProposer>,
|
||||
redo_start_lsn: Option<Lsn>,
|
||||
last_logged_commit_lsn: u64,
|
||||
shmem: UnsafeCell<walproposer::bindings::WalproposerShmemState>,
|
||||
config: Config,
|
||||
event_set: RefCell<Option<EventSet>>,
|
||||
@@ -228,6 +229,7 @@ impl SimulationApi {
|
||||
safekeepers: RefCell::new(sk_conns),
|
||||
disk: args.disk,
|
||||
redo_start_lsn: args.redo_start_lsn,
|
||||
last_logged_commit_lsn: 0,
|
||||
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
|
||||
mutex: 0,
|
||||
feedback: PageserverFeedback {
|
||||
@@ -596,14 +598,11 @@ impl ApiImpl for SimulationApi {
|
||||
}
|
||||
}
|
||||
|
||||
fn process_safekeeper_feedback(
|
||||
&self,
|
||||
wp: &mut walproposer::bindings::WalProposer,
|
||||
commit_lsn: u64,
|
||||
) {
|
||||
debug!("process_safekeeper_feedback, commit_lsn={}", commit_lsn);
|
||||
if commit_lsn > wp.lastSentCommitLsn {
|
||||
self.os.log_event(format!("commit_lsn;{}", commit_lsn));
|
||||
fn process_safekeeper_feedback(&mut self, wp: &mut walproposer::bindings::WalProposer) {
|
||||
debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
|
||||
if wp.commitLsn > self.last_logged_commit_lsn {
|
||||
self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));
|
||||
self.last_logged_commit_lsn = wp.commitLsn;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,8 @@ FLAKY_TESTS_QUERY = """
|
||||
DISTINCT parent_suite, suite, name
|
||||
FROM results
|
||||
WHERE
|
||||
started_at > CURRENT_DATE - INTERVAL '%s' day
|
||||
started_at > CURRENT_DATE - INTERVAL '10' day
|
||||
AND started_at > '2024-03-11 14:50:11.845+00' -- we switched the default PAGESERVER_VIRTUAL_FILE_IO_ENGINE to `tokio-epoll-uring` from `std-fs` on this date, we want to ignore the flaky tests for `std-fs`
|
||||
AND (
|
||||
(status IN ('failed', 'broken') AND reference = 'refs/heads/main')
|
||||
OR flaky
|
||||
@@ -46,11 +47,14 @@ def main(args: argparse.Namespace):
|
||||
logging.error("cannot fetch flaky tests from the DB due to an error", exc)
|
||||
rows = []
|
||||
|
||||
# If a test run has non-default PAGESERVER_VIRTUAL_FILE_IO_ENGINE (i.e. not empty, not std-fs),
|
||||
# If a test run has non-default PAGESERVER_VIRTUAL_FILE_IO_ENGINE (i.e. not empty, not tokio-epoll-uring),
|
||||
# use it to parametrize test name along with build_type and pg_version
|
||||
#
|
||||
# See test_runner/fixtures/parametrize.py for details
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in (
|
||||
"",
|
||||
"tokio-epoll-uring",
|
||||
):
|
||||
pageserver_virtual_file_io_engine_parameter = f"-{io_engine}"
|
||||
else:
|
||||
pageserver_virtual_file_io_engine_parameter = ""
|
||||
|
||||
@@ -15,11 +15,11 @@ import threading
|
||||
import time
|
||||
import uuid
|
||||
from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from fcntl import LOCK_EX, LOCK_UN, flock
|
||||
from functools import cached_property
|
||||
from functools import cached_property, partial
|
||||
from itertools import chain, product
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
@@ -70,6 +70,8 @@ from fixtures.remote_storage import (
|
||||
default_remote_storage,
|
||||
remote_storage_to_toml_inline_table,
|
||||
)
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import (
|
||||
ATTACHMENT_NAME_REGEX,
|
||||
@@ -1518,7 +1520,6 @@ class NeonCli(AbstractNeonCli):
|
||||
conf: Optional[Dict[str, Any]] = None,
|
||||
shard_count: Optional[int] = None,
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
placement_policy: Optional[str] = None,
|
||||
set_default: bool = False,
|
||||
) -> Tuple[TenantId, TimelineId]:
|
||||
"""
|
||||
@@ -1552,9 +1553,6 @@ class NeonCli(AbstractNeonCli):
|
||||
if shard_stripe_size is not None:
|
||||
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
|
||||
|
||||
if placement_policy is not None:
|
||||
args.extend(["--placement-policy", str(placement_policy)])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
return tenant_id, timeline_id
|
||||
@@ -2172,37 +2170,6 @@ class NeonAttachmentService(MetricsGetter):
|
||||
)
|
||||
log.info("Attachment service passed consistency check")
|
||||
|
||||
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.request(
|
||||
"PUT",
|
||||
f"{self.env.attachment_service_api}/debug/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
|
||||
def balance_all(self):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/balance/all",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def balance_attached(self):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/balance/attached",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def __enter__(self) -> "NeonAttachmentService":
|
||||
return self
|
||||
|
||||
@@ -2357,16 +2324,16 @@ class NeonPageserver(PgProtocol):
|
||||
def assert_no_errors(self):
|
||||
logfile = self.workdir / "pageserver.log"
|
||||
if not logfile.exists():
|
||||
log.warning(f"Skipping log check on pageserver {self.id}: {logfile} does not exist")
|
||||
log.warning(f"Skipping log check: {logfile} does not exist")
|
||||
return
|
||||
|
||||
with logfile.open("r") as f:
|
||||
errors = scan_pageserver_log_for_errors(f, self.allowed_errors)
|
||||
|
||||
for _lineno, error in errors:
|
||||
log.info(f"not allowed error (pageserver {self.id}): {error.strip()}")
|
||||
log.info(f"not allowed error: {error.strip()}")
|
||||
|
||||
assert not errors, f"Pageserver {self.id}: {errors}"
|
||||
assert not errors
|
||||
|
||||
def assert_no_metric_errors(self):
|
||||
"""
|
||||
@@ -2582,6 +2549,20 @@ class PgBin:
|
||||
)
|
||||
return base_path
|
||||
|
||||
def get_pg_controldata_checkpoint_lsn(self, pgdata: str) -> Lsn:
|
||||
"""
|
||||
Run pg_controldata on given datadir and extract checkpoint lsn.
|
||||
"""
|
||||
|
||||
pg_controldata_path = os.path.join(self.pg_bin_path, "pg_controldata")
|
||||
cmd = f"{pg_controldata_path} -D {pgdata}"
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
|
||||
checkpoint_lsn = re.findall(
|
||||
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
|
||||
)[0]
|
||||
log.info(f"last checkpoint at {checkpoint_lsn}")
|
||||
return Lsn(checkpoint_lsn)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -> PgBin:
|
||||
@@ -3600,220 +3581,6 @@ class Safekeeper:
|
||||
return segments
|
||||
|
||||
|
||||
# Walreceiver as returned by sk's timeline status endpoint.
|
||||
@dataclass
|
||||
class Walreceiver:
|
||||
conn_id: int
|
||||
state: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperTimelineStatus:
|
||||
acceptor_epoch: int
|
||||
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
|
||||
flush_lsn: Lsn
|
||||
commit_lsn: Lsn
|
||||
timeline_start_lsn: Lsn
|
||||
backup_lsn: Lsn
|
||||
peer_horizon_lsn: Lsn
|
||||
remote_consistent_lsn: Lsn
|
||||
walreceivers: List[Walreceiver]
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperMetrics:
|
||||
# These are metrics from Prometheus which uses float64 internally.
|
||||
# As a consequence, values may differ from real original int64s.
|
||||
flush_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
|
||||
commit_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
|
||||
|
||||
|
||||
class SafekeeperHttpClient(requests.Session):
|
||||
HTTPError = requests.HTTPError
|
||||
|
||||
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
|
||||
super().__init__()
|
||||
self.port = port
|
||||
self.auth_token = auth_token
|
||||
self.is_testing_enabled = is_testing_enabled
|
||||
|
||||
if auth_token is not None:
|
||||
self.headers["Authorization"] = f"Bearer {auth_token}"
|
||||
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def is_testing_enabled_or_skip(self):
|
||||
if not self.is_testing_enabled:
|
||||
pytest.skip("safekeeper was built without 'testing' feature")
|
||||
|
||||
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
|
||||
params = params or {}
|
||||
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
|
||||
res.raise_for_status()
|
||||
res_json = json.loads(res.text)
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def patch_control_file(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
patch: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
res = self.patch(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file",
|
||||
json={
|
||||
"updates": patch,
|
||||
"apply_fields": list(patch.keys()),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
|
||||
json=body,
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
def timeline_digest(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
|
||||
) -> Dict[str, Any]:
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
|
||||
params={
|
||||
"from_lsn": str(from_lsn),
|
||||
"until_lsn": str(until_lsn),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_create(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
|
||||
commit_lsn: Lsn,
|
||||
):
|
||||
body = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"pg_version": pg_version,
|
||||
"commit_lsn": str(commit_lsn),
|
||||
}
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
|
||||
res.raise_for_status()
|
||||
|
||||
def timeline_status(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> SafekeeperTimelineStatus:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
|
||||
res.raise_for_status()
|
||||
resj = res.json()
|
||||
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
|
||||
return SafekeeperTimelineStatus(
|
||||
acceptor_epoch=resj["acceptor_state"]["epoch"],
|
||||
pg_version=resj["pg_info"]["pg_version"],
|
||||
flush_lsn=Lsn(resj["flush_lsn"]),
|
||||
commit_lsn=Lsn(resj["commit_lsn"]),
|
||||
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
|
||||
backup_lsn=Lsn(resj["backup_lsn"]),
|
||||
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
|
||||
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
|
||||
walreceivers=walreceivers,
|
||||
)
|
||||
|
||||
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
|
||||
json=body,
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
# only_local doesn't remove segments in the remote storage.
|
||||
def timeline_delete(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
|
||||
) -> Dict[Any, Any]:
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
|
||||
params={
|
||||
"only_local": str(only_local).lower(),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]:
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def get_metrics_str(self) -> str:
|
||||
request_result = self.get(f"http://localhost:{self.port}/metrics")
|
||||
request_result.raise_for_status()
|
||||
return request_result.text
|
||||
|
||||
def get_metrics(self) -> SafekeeperMetrics:
|
||||
all_metrics_text = self.get_metrics_str()
|
||||
|
||||
metrics = SafekeeperMetrics()
|
||||
for match in re.finditer(
|
||||
r'^safekeeper_flush_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
|
||||
all_metrics_text,
|
||||
re.MULTILINE,
|
||||
):
|
||||
metrics.flush_lsn_inexact[(TenantId(match.group(1)), TimelineId(match.group(2)))] = int(
|
||||
match.group(3)
|
||||
)
|
||||
for match in re.finditer(
|
||||
r'^safekeeper_commit_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
|
||||
all_metrics_text,
|
||||
re.MULTILINE,
|
||||
):
|
||||
metrics.commit_lsn_inexact[
|
||||
(TenantId(match.group(1)), TimelineId(match.group(2)))
|
||||
] = int(match.group(3))
|
||||
return metrics
|
||||
|
||||
|
||||
class S3Scrubber:
|
||||
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):
|
||||
self.env = env
|
||||
@@ -4123,24 +3890,21 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
|
||||
|
||||
# pg is the existing and running compute node, that we want to compare with a basebackup
|
||||
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint):
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
|
||||
# Get the timeline ID. We need it for the 'basebackup' command
|
||||
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
|
||||
# many tests already checkpoint, but do it just in case
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
# wait for pageserver to catch up
|
||||
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
# stop postgres to ensure that files won't change
|
||||
endpoint.stop()
|
||||
|
||||
# Read the shutdown checkpoint's LSN
|
||||
checkpoint_lsn = pg_bin.get_pg_controldata_checkpoint_lsn(endpoint.pg_data_dir_path())
|
||||
|
||||
# Take a basebackup from pageserver
|
||||
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
|
||||
restored_dir_path.mkdir(exist_ok=True)
|
||||
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
|
||||
|
||||
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
|
||||
@@ -4148,7 +3912,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
{psql_path} \
|
||||
--no-psqlrc \
|
||||
postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline_id} {checkpoint_lsn}' \
|
||||
| tar -x -C {restored_dir_path}
|
||||
"""
|
||||
|
||||
@@ -4297,6 +4061,49 @@ def wait_for_last_flush_lsn(
|
||||
return min(results)
|
||||
|
||||
|
||||
def flush_ep_to_pageserver(
|
||||
env: NeonEnv,
|
||||
ep: Endpoint,
|
||||
tenant: TenantId,
|
||||
timeline: TimelineId,
|
||||
pageserver_id: Optional[int] = None,
|
||||
) -> Lsn:
|
||||
"""
|
||||
Stop endpoint and wait until all committed WAL reaches the pageserver
|
||||
(last_record_lsn). This is for use by tests which want everything written so
|
||||
far to reach pageserver *and* expecting that no more data will arrive until
|
||||
endpoint starts again, so unlike wait_for_last_flush_lsn it polls
|
||||
safekeepers instead of compute to learn LSN.
|
||||
|
||||
Returns the catch up LSN.
|
||||
"""
|
||||
ep.stop()
|
||||
|
||||
commit_lsn: Lsn = Lsn(0)
|
||||
# In principle in the absense of failures polling single sk would be enough.
|
||||
for sk in env.safekeepers:
|
||||
cli = sk.http_client()
|
||||
# wait until compute connections are gone
|
||||
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, tenant, timeline))
|
||||
commit_lsn = max(cli.get_commit_lsn(tenant, timeline), commit_lsn)
|
||||
|
||||
# Note: depending on WAL filtering implementation, probably most shards
|
||||
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
|
||||
# is broken in sharded case.
|
||||
shards = tenant_get_shards(env, tenant, pageserver_id)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
log.info(
|
||||
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
|
||||
)
|
||||
waited = wait_for_last_record_lsn(
|
||||
pageserver.http_client(), tenant_shard_id, timeline, commit_lsn
|
||||
)
|
||||
|
||||
assert waited >= commit_lsn
|
||||
|
||||
return commit_lsn
|
||||
|
||||
|
||||
def wait_for_wal_insert_lsn(
|
||||
env: NeonEnv,
|
||||
endpoint: Endpoint,
|
||||
|
||||
@@ -46,9 +46,12 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
|
||||
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=tokio-epoll-uring`
|
||||
# And do not change test name for default `pageserver_virtual_file_io_engine=std-fs` to keep tests statistics
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
|
||||
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=std-fs`
|
||||
# And do not change test name for default `pageserver_virtual_file_io_engine=tokio-epoll-uring` to keep tests statistics
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in (
|
||||
"",
|
||||
"tokio-epoll-uring",
|
||||
):
|
||||
metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine])
|
||||
|
||||
# For performance tests, parametrize also by platform
|
||||
|
||||
0
test_runner/fixtures/safekeeper/__init__.py
Normal file
0
test_runner/fixtures/safekeeper/__init__.py
Normal file
227
test_runner/fixtures/safekeeper/http.py
Normal file
227
test_runner/fixtures/safekeeper/http.py
Normal file
@@ -0,0 +1,227 @@
|
||||
import json
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
# Walreceiver as returned by sk's timeline status endpoint.
|
||||
@dataclass
|
||||
class Walreceiver:
|
||||
conn_id: int
|
||||
state: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperTimelineStatus:
|
||||
acceptor_epoch: int
|
||||
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
|
||||
flush_lsn: Lsn
|
||||
commit_lsn: Lsn
|
||||
timeline_start_lsn: Lsn
|
||||
backup_lsn: Lsn
|
||||
peer_horizon_lsn: Lsn
|
||||
remote_consistent_lsn: Lsn
|
||||
walreceivers: List[Walreceiver]
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperMetrics:
|
||||
# These are metrics from Prometheus which uses float64 internally.
|
||||
# As a consequence, values may differ from real original int64s.
|
||||
flush_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
|
||||
commit_lsn_inexact: Dict[Tuple[TenantId, TimelineId], int] = field(default_factory=dict)
|
||||
|
||||
|
||||
class SafekeeperHttpClient(requests.Session):
|
||||
HTTPError = requests.HTTPError
|
||||
|
||||
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
|
||||
super().__init__()
|
||||
self.port = port
|
||||
self.auth_token = auth_token
|
||||
self.is_testing_enabled = is_testing_enabled
|
||||
|
||||
if auth_token is not None:
|
||||
self.headers["Authorization"] = f"Bearer {auth_token}"
|
||||
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def is_testing_enabled_or_skip(self):
|
||||
if not self.is_testing_enabled:
|
||||
pytest.skip("safekeeper was built without 'testing' feature")
|
||||
|
||||
def configure_failpoints(self, config_strings: Union[Tuple[str, str], List[Tuple[str, str]]]):
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
|
||||
params = params or {}
|
||||
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
|
||||
res.raise_for_status()
|
||||
res_json = json.loads(res.text)
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def patch_control_file(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
patch: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
res = self.patch(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file",
|
||||
json={
|
||||
"updates": patch,
|
||||
"apply_fields": list(patch.keys()),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
|
||||
json=body,
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
def timeline_digest(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
|
||||
) -> Dict[str, Any]:
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
|
||||
params={
|
||||
"from_lsn": str(from_lsn),
|
||||
"until_lsn": str(until_lsn),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_create(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
|
||||
commit_lsn: Lsn,
|
||||
):
|
||||
body = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"pg_version": pg_version,
|
||||
"commit_lsn": str(commit_lsn),
|
||||
}
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
|
||||
res.raise_for_status()
|
||||
|
||||
def timeline_status(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> SafekeeperTimelineStatus:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
|
||||
res.raise_for_status()
|
||||
resj = res.json()
|
||||
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
|
||||
return SafekeeperTimelineStatus(
|
||||
acceptor_epoch=resj["acceptor_state"]["epoch"],
|
||||
pg_version=resj["pg_info"]["pg_version"],
|
||||
flush_lsn=Lsn(resj["flush_lsn"]),
|
||||
commit_lsn=Lsn(resj["commit_lsn"]),
|
||||
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
|
||||
backup_lsn=Lsn(resj["backup_lsn"]),
|
||||
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
|
||||
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
|
||||
walreceivers=walreceivers,
|
||||
)
|
||||
|
||||
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
|
||||
return self.timeline_status(tenant_id, timeline_id).commit_lsn
|
||||
|
||||
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
|
||||
json=body,
|
||||
)
|
||||
res.raise_for_status()
|
||||
|
||||
# only_local doesn't remove segments in the remote storage.
|
||||
def timeline_delete(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
|
||||
) -> Dict[Any, Any]:
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
|
||||
params={
|
||||
"only_local": str(only_local).lower(),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]:
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def get_metrics_str(self) -> str:
|
||||
request_result = self.get(f"http://localhost:{self.port}/metrics")
|
||||
request_result.raise_for_status()
|
||||
return request_result.text
|
||||
|
||||
def get_metrics(self) -> SafekeeperMetrics:
|
||||
all_metrics_text = self.get_metrics_str()
|
||||
|
||||
metrics = SafekeeperMetrics()
|
||||
for match in re.finditer(
|
||||
r'^safekeeper_flush_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
|
||||
all_metrics_text,
|
||||
re.MULTILINE,
|
||||
):
|
||||
metrics.flush_lsn_inexact[(TenantId(match.group(1)), TimelineId(match.group(2)))] = int(
|
||||
match.group(3)
|
||||
)
|
||||
for match in re.finditer(
|
||||
r'^safekeeper_commit_lsn{tenant_id="([0-9a-f]+)",timeline_id="([0-9a-f]+)"} (\S+)$',
|
||||
all_metrics_text,
|
||||
re.MULTILINE,
|
||||
):
|
||||
metrics.commit_lsn_inexact[
|
||||
(TenantId(match.group(1)), TimelineId(match.group(2)))
|
||||
] = int(match.group(3))
|
||||
return metrics
|
||||
11
test_runner/fixtures/safekeeper/utils.py
Normal file
11
test_runner/fixtures/safekeeper/utils.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
def are_walreceivers_absent(
|
||||
sk_http_cli: SafekeeperHttpClient, tenant_id: TenantId, timeline_id: TimelineId
|
||||
):
|
||||
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
|
||||
return len(status.walreceivers) == 0
|
||||
@@ -1,4 +1,3 @@
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -12,10 +11,6 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
ENDPOINT_LOCK = threading.Lock()
|
||||
|
||||
|
||||
class Workload:
|
||||
"""
|
||||
@@ -46,30 +41,17 @@ class Workload:
|
||||
|
||||
self._endpoint: Optional[Endpoint] = None
|
||||
|
||||
def reconfigure(self):
|
||||
"""
|
||||
Request the endpoint to reconfigure based on location reported by storage controller
|
||||
"""
|
||||
if self._endpoint is not None:
|
||||
with ENDPOINT_LOCK:
|
||||
self._endpoint.reconfigure()
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
# We may be running alongside other Workloads for different tenants. Full TTID is
|
||||
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
|
||||
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
|
||||
|
||||
with ENDPOINT_LOCK:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id="ep-workload",
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
connstring = self._endpoint.safe_psql(
|
||||
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
||||
@@ -112,7 +94,7 @@ class Workload:
|
||||
else:
|
||||
return False
|
||||
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
|
||||
assert self.expect_rows >= n
|
||||
|
||||
max_iters = 10
|
||||
@@ -150,28 +132,22 @@ class Workload:
|
||||
]
|
||||
)
|
||||
|
||||
if ingest:
|
||||
# Wait for written data to be ingested by the pageserver
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env,
|
||||
endpoint,
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
pageserver_id=pageserver_id,
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
|
||||
if upload:
|
||||
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
if upload:
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
|
||||
def validate(self, pageserver_id: Optional[int] = None):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
|
||||
@@ -4,12 +4,11 @@ import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
flush_ep_to_pageserver,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pageserver.utils import wait_for_upload
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
# Crates a few layers, ensures that we can evict them (removing locally but keeping track of them anyway)
|
||||
@@ -46,14 +45,15 @@ def test_basic_eviction(
|
||||
FROM generate_series(1, 5000000) g
|
||||
"""
|
||||
)
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
|
||||
# stops the endpoint
|
||||
current_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
# disable compute & sks to avoid on-demand downloads by walreceiver / getpage
|
||||
endpoint.stop()
|
||||
# stop sks to avoid on-demand downloads by walreceiver / getpage; endpoint
|
||||
# has already been stopped by flush_ep_to_pageserver
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
from fixtures.pageserver.types import (
|
||||
DeltaLayerFileName,
|
||||
ImageLayerFileName,
|
||||
@@ -115,8 +115,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
endpoint.stop()
|
||||
last_record_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
|
||||
endpoint.wait_for_migrations()
|
||||
|
||||
num_migrations = 8
|
||||
num_migrations = 9
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SELECT id FROM neon_migration.migration_id")
|
||||
|
||||
@@ -29,3 +29,34 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
log.info(res)
|
||||
assert len(res) == 1
|
||||
assert len(res[0]) == 5
|
||||
|
||||
|
||||
# Verify that the neon extension can be upgraded/downgraded.
|
||||
def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_neon_extension_compatibility")
|
||||
|
||||
endpoint_main = env.endpoints.create("test_neon_extension_compatibility")
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
all_versions = ["1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.3"
|
||||
for idx, begin_version in enumerate(all_versions):
|
||||
for target_version in all_versions[idx + 1 :]:
|
||||
if current_version != begin_version:
|
||||
cur.execute(
|
||||
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {current_version}->{begin_version}"
|
||||
)
|
||||
current_version = begin_version
|
||||
# downgrade
|
||||
cur.execute(
|
||||
f"ALTER EXTENSION neon UPDATE TO '{target_version}'; -- {begin_version}->{target_version}"
|
||||
)
|
||||
# upgrade
|
||||
cur.execute(
|
||||
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}"
|
||||
)
|
||||
|
||||
@@ -8,6 +8,7 @@ from typing import Any, DefaultDict, Dict, Tuple
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
flush_ep_to_pageserver,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
@@ -517,7 +518,7 @@ def test_compaction_downloads_on_demand_without_image_creation(neon_env_builder:
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("update a set id = -id")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
|
||||
@@ -1,17 +1,13 @@
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
AttachmentServiceApiException,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import Lsn, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
@@ -404,245 +400,3 @@ def test_sharding_ingest(
|
||||
|
||||
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
||||
assert huge_layer_count <= shard_count
|
||||
|
||||
|
||||
class Failure:
|
||||
pageserver_id: Optional[int]
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
raise NotImplementedError()
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
"""
|
||||
Clear the failure, in a way that should enable the system to proceed
|
||||
to a totally clean state (all nodes online and reconciled)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def expect_available(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def can_mitigate(self):
|
||||
"""Whether Self.mitigate is available for use"""
|
||||
return False
|
||||
|
||||
def mitigate(self, env: NeonEnv):
|
||||
"""
|
||||
Mitigate the failure in a way that should allow shard split to
|
||||
complete and service to resume, but does not guarantee to leave
|
||||
the whole world in a clean state (e.g. an Offline node might have
|
||||
junk LocationConfigs on it)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def fails_forward(self):
|
||||
"""
|
||||
If true, this failure results in a state that eventualy completes the split.
|
||||
"""
|
||||
return False
|
||||
|
||||
|
||||
class PageserverFailpoint(Failure):
|
||||
def __init__(self, failpoint, pageserver_id, mitigate):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.allowed_errors.extend(
|
||||
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
|
||||
)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
if self._mitigate:
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Active"})
|
||||
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
def can_mitigate(self):
|
||||
return self._mitigate
|
||||
|
||||
def mitigate(self, env):
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
class StorageControllerFailpoint(Failure):
|
||||
def __init__(self, failpoint):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = None
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
env.attachment_service.configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
env.attachment_service.configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
def can_mitigate(self):
|
||||
return False
|
||||
|
||||
def fails_forward(self):
|
||||
# Edge case: the very last failpoint that simulates a DB connection error, where
|
||||
# the abort path will fail-forward and result in a complete split.
|
||||
return self.failpoint == "shard-split-post-complete"
|
||||
|
||||
|
||||
class NodeKill(Failure):
|
||||
def __init__(self, pageserver_id, mitigate):
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
|
||||
def expect_available(self):
|
||||
return False
|
||||
|
||||
def mitigate(self, env):
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"failure",
|
||||
[
|
||||
PageserverFailpoint("api-500", 1, False),
|
||||
NodeKill(1, False),
|
||||
PageserverFailpoint("api-500", 1, True),
|
||||
NodeKill(1, True),
|
||||
PageserverFailpoint("shard-split-pre-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-post-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-child-conf", 1, False),
|
||||
PageserverFailpoint("shard-split-lsn-wait", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-finish", 1, False),
|
||||
StorageControllerFailpoint("shard-split-validation"),
|
||||
StorageControllerFailpoint("shard-split-post-begin"),
|
||||
StorageControllerFailpoint("shard-split-post-remote"),
|
||||
StorageControllerFailpoint("shard-split-post-complete"),
|
||||
],
|
||||
)
|
||||
def test_sharding_split_failures(neon_env_builder: NeonEnvBuilder, failure: Failure):
|
||||
neon_env_builder.num_pageservers = 4
|
||||
initial_shard_count = 2
|
||||
split_shard_count = 4
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
|
||||
assert (
|
||||
failure.pageserver_id is None
|
||||
or len(
|
||||
env.get_pageserver(failure.pageserver_id)
|
||||
.http_client()
|
||||
.tenant_list_locations()["tenant_shards"]
|
||||
)
|
||||
> 0
|
||||
)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
# Set one pageserver to 500 all requests, then do a split
|
||||
# TODO: also test with a long-blocking failure: controller should time out its request and then
|
||||
# clean up in a well defined way.
|
||||
failure.apply(env)
|
||||
|
||||
with pytest.raises(AttachmentServiceApiException):
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=4)
|
||||
|
||||
# We expect that the overall operation will fail, but some split requests
|
||||
# will have succeeded: the net result should be to return to a clean state, including
|
||||
# detaching any child shards.
|
||||
def assert_rolled_back(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == initial_shard_count
|
||||
count += 1
|
||||
assert count == initial_shard_count
|
||||
|
||||
def assert_split_done(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == split_shard_count
|
||||
count += 1
|
||||
assert count == split_shard_count
|
||||
|
||||
def finish_split():
|
||||
# Having failed+rolled back, we should be able to split again
|
||||
# No failures this time; it will succeed
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
if failure.expect_available():
|
||||
# Even though the split failed partway through, this should not have interrupted
|
||||
# clients. Disable waiting for pageservers in the workload helper, because our
|
||||
# failpoints may prevent API access.
|
||||
# This only applies for failure modes that leave pageserver page_service API available.
|
||||
workload.churn_rows(10, upload=False, ingest=False)
|
||||
workload.validate()
|
||||
|
||||
if failure.fails_forward():
|
||||
# A failure type which results in eventual completion of the split
|
||||
wait_until(30, 1, assert_split_done)
|
||||
elif failure.can_mitigate():
|
||||
# Mitigation phase: we expect to be able to proceed with a successful shard split
|
||||
failure.mitigate(env)
|
||||
|
||||
# The split should appear to be rolled back from the point of view of all pageservers
|
||||
# apart from the one that is offline
|
||||
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
finish_split()
|
||||
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
# Having cleared the failure, everything should converge to a pristine state
|
||||
failure.clear(env)
|
||||
wait_until(30, 1, assert_split_done)
|
||||
else:
|
||||
# Once we restore the faulty pageserver's API to good health, rollback should
|
||||
# eventually complete.
|
||||
failure.clear(env)
|
||||
|
||||
wait_until(30, 1, assert_rolled_back)
|
||||
|
||||
# Having rolled back, the tenant should be working
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
# Splitting again should work, since we cleared the failure
|
||||
finish_split()
|
||||
assert_split_done()
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import concurrent.futures
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
@@ -25,9 +23,8 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
@@ -773,186 +770,3 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
|
||||
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
|
||||
def test_storcon_rolling_failures(
|
||||
neon_env_builder: NeonEnvBuilder, httpserver: HTTPServer, httpserver_listen_address
|
||||
):
|
||||
neon_env_builder.num_pageservers = 8
|
||||
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify-attach"
|
||||
|
||||
workloads: dict[TenantId, Workload] = {}
|
||||
|
||||
# Do neon_local endpoint reconfiguration in the background so that we can
|
||||
# accept a healthy rate of calls into notify-attach.
|
||||
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
def handler(request: Request):
|
||||
"""
|
||||
Although storage controller can use neon_local directly, this causes problems when
|
||||
the test is also concurrently modifying endpoints. Instead, configure storage controller
|
||||
to send notifications up to this test code, which will route all endpoint updates
|
||||
through Workload, which has a mutex to make it all safe.
|
||||
"""
|
||||
assert request.json is not None
|
||||
body: dict[str, Any] = request.json
|
||||
log.info(f"notify-attach request: {body}")
|
||||
|
||||
try:
|
||||
workload = workloads[TenantId(body["tenant_id"])]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
# This causes the endpoint to query storage controller for its location, which
|
||||
# is redundant since we already have it here, but this avoids extending the
|
||||
# neon_local CLI to take full lists of locations
|
||||
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for ps in env.pageservers:
|
||||
# We will do unclean detaches
|
||||
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
|
||||
n_tenants = 32
|
||||
tenants = [(env.initial_tenant, env.initial_timeline)]
|
||||
for i in range(0, n_tenants - 1):
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
shard_count = [1, 2, 4][i % 3]
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
|
||||
)
|
||||
tenants.append((tenant_id, timeline_id))
|
||||
|
||||
# Background pain:
|
||||
# - TODO: some fraction of pageserver API requests hang
|
||||
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
|
||||
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
|
||||
# the ones we're using for availability checks.
|
||||
|
||||
rng = random.Random(0xDEADBEEF)
|
||||
|
||||
for tenant_id, timeline_id in tenants:
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workloads[tenant_id] = workload
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
counts = get_node_shard_counts(env, [t[0] for t in tenants])
|
||||
assert counts[node_id] == 0
|
||||
|
||||
def attachments_active():
|
||||
for tid, _tlid in tenants:
|
||||
for shard in env.attachment_service.locate(tid):
|
||||
psid = shard["node_id"]
|
||||
tsid = TenantShardId.parse(shard["shard_id"])
|
||||
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
|
||||
assert status["state"]["slug"] == "Active"
|
||||
log.info(f"Shard {tsid} active on node {psid}")
|
||||
|
||||
failpoints = ("api-503", "5%1000*return(1)")
|
||||
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints(failpoints)
|
||||
|
||||
def for_all_workloads(callback, timeout=60):
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
for _tenant_id, workload in workloads.items():
|
||||
futs.append(pool.submit(callback, workload))
|
||||
|
||||
for f in futs:
|
||||
f.result(timeout=timeout)
|
||||
|
||||
def clean_fail_restore():
|
||||
"""
|
||||
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
|
||||
locations to activate, then SIGTERM it.
|
||||
- Endpoints should not fail any queries
|
||||
- New attach locations should activate within bounded time.
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
victim.stop(immediate=False)
|
||||
|
||||
traffic()
|
||||
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
|
||||
# Revert shards to attach at their original locations
|
||||
env.attachment_service.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def hard_fail_restore():
|
||||
"""
|
||||
Simulate an unexpected death of a pageserver node
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
victim.stop(immediate=True)
|
||||
# TODO: once we implement heartbeats detecting node failures, remove this
|
||||
# explicit marking offline and rely on storage controller to detect it itself.
|
||||
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
traffic()
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
env.attachment_service.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def traffic():
|
||||
"""
|
||||
Check that all tenants are working for postgres clients
|
||||
"""
|
||||
|
||||
def exercise_one(workload):
|
||||
workload.churn_rows(100)
|
||||
workload.validate()
|
||||
|
||||
for_all_workloads(exercise_one)
|
||||
|
||||
def init_one(workload):
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
for_all_workloads(init_one, timeout=60)
|
||||
|
||||
for i in range(0, 20):
|
||||
mode = rng.choice([0, 1, 2])
|
||||
log.info(f"Iteration {i}, mode {mode}")
|
||||
if mode == 0:
|
||||
# Traffic interval: sometimes, instead of a failure, just let the clients
|
||||
# write a load of data. This avoids chaos tests ending up with unrealistically
|
||||
# small quantities of data in flight.
|
||||
traffic()
|
||||
elif mode == 1:
|
||||
clean_fail_restore()
|
||||
elif mode == 2:
|
||||
hard_fail_restore()
|
||||
|
||||
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
|
||||
# Success criteria:
|
||||
# - New attach locations should activate within bounded time
|
||||
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
|
||||
|
||||
# TODO: fail and remove: fail a node, and remove it from the cluster.
|
||||
# Success criteria:
|
||||
# - Endpoints should not fail any queries
|
||||
# - New attach locations should activate within bounded time
|
||||
# - New secondary locations should fill up with data within bounded time
|
||||
|
||||
# TODO: somehow need to wait for reconciles to complete before doing consistency check
|
||||
# (or make the check wait).
|
||||
|
||||
# Do consistency check on every iteration, not just at the end: this makes it more obvious
|
||||
# which change caused an issue.
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
@@ -190,6 +190,8 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
# So by ignoring these instead of waiting for empty upload queue
|
||||
# we execute more distinct code paths.
|
||||
'.*stopping left-over name="remote upload".*',
|
||||
# an on-demand is cancelled by shutdown
|
||||
".*initial size calculation failed: downloading failed, possibly for shutdown",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -213,7 +213,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
# This happens when timeline remains are cleaned up during loading
|
||||
".*Timeline dir entry become invalid.*",
|
||||
# In one of the branches we poll for tenant to become active. Polls can generate this log message:
|
||||
f".*Tenant {env.initial_tenant} is not active*",
|
||||
f".*Tenant {env.initial_tenant} is not active.*",
|
||||
# an on-demand is cancelled by shutdown
|
||||
".*initial size calculation failed: downloading failed, possibly for shutdown",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -28,7 +28,6 @@ from fixtures.neon_fixtures import (
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
Safekeeper,
|
||||
SafekeeperHttpClient,
|
||||
SafekeeperPort,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
@@ -46,6 +45,8 @@ from fixtures.remote_storage import (
|
||||
default_remote_storage,
|
||||
s3_storage,
|
||||
)
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import get_dir_size, query_scalar, start_in_background
|
||||
|
||||
@@ -1097,12 +1098,6 @@ def is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
|
||||
return all([flush_lsns[0] == flsn for flsn in flush_lsns])
|
||||
|
||||
|
||||
def are_walreceivers_absent(sk_http_cli, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
|
||||
return len(status.walreceivers) == 0
|
||||
|
||||
|
||||
# Assert by xxd that WAL on given safekeepers is identical. No compute must be
|
||||
# running for this to be reliable.
|
||||
def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
|
||||
@@ -1347,6 +1342,36 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
|
||||
|
||||
# Test that when compute is terminated in fast (or smart) mode, walproposer is
|
||||
# allowed to run and self terminate after shutdown checkpoint is written, so it
|
||||
# commits it to safekeepers before exiting. This not required for correctness,
|
||||
# but needed for tests using check_restored_datadir_content.
|
||||
def test_wp_graceful_shutdown(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_wp_graceful_shutdown")
|
||||
ep = env.endpoints.create_start("test_wp_graceful_shutdown")
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
ep.stop()
|
||||
|
||||
# figure out checkpoint lsn
|
||||
ckpt_lsn = pg_bin.get_pg_controldata_checkpoint_lsn(ep.pg_data_dir_path())
|
||||
|
||||
sk_http_cli = env.safekeepers[0].http_client()
|
||||
commit_lsn = sk_http_cli.timeline_status(tenant_id, timeline_id).commit_lsn
|
||||
# Note: this is in memory value. Graceful shutdown of walproposer currently
|
||||
# doesn't guarantee persisted value, which is ok as we need it only for
|
||||
# tests. Persisting it without risking too many cf flushes needs a wp -> sk
|
||||
# protocol change. (though in reality shutdown sync-safekeepers does flush
|
||||
# of cf, so most of the time persisted value wouldn't lag)
|
||||
log.info(f"sk commit_lsn {commit_lsn}")
|
||||
# note that ckpt_lsn is the *beginning* of checkpoint record, so commit_lsn
|
||||
# must be actually higher
|
||||
assert commit_lsn > ckpt_lsn, "safekeeper must have checkpoint record"
|
||||
|
||||
|
||||
class SafekeeperEnv:
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: f49a962b9b...b980d6f090
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: e8b9a28006...56f32c0e73
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 072697b225...9007894722
7
vendor/revisions.json
vendored
7
vendor/revisions.json
vendored
@@ -1,6 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "072697b2250da3251af75887b577104554b9cd44",
|
||||
"postgres-v15": "e8b9a28006a550d7ca7cbb9bd0238eb9cd57bbd8",
|
||||
"postgres-v14": "f49a962b9b3715d6f47017d1dcf905c36f93ae5e"
|
||||
"postgres-v16": "90078947229aa7f9ac5f7ed4527b2c7386d5332b",
|
||||
"postgres-v15": "56f32c0e7330d17aaeee8bf211a73995180bd133",
|
||||
"postgres-v14": "b980d6f090c676e55fb2c830fb2434f532f635c0"
|
||||
}
|
||||
|
||||
|
||||
@@ -142,6 +142,51 @@ files:
|
||||
query: |
|
||||
select datname, state, count(*) as count from pg_stat_activity where state <> '' group by datname, state;
|
||||
|
||||
- metric_name: pg_stats_userdb
|
||||
type: gauge
|
||||
help: 'Stats for the oldest non-system db'
|
||||
key_labels:
|
||||
- datname
|
||||
value_label: kind
|
||||
values:
|
||||
- db_size
|
||||
- deadlocks
|
||||
# Rows
|
||||
- inserted
|
||||
- updated
|
||||
- deleted
|
||||
# We export stats for only one non-system database. Without this limit
|
||||
# it is too easy to abuse the system by creating lots of databases.
|
||||
# We can try lifting this limit in the future after we understand the needs better.
|
||||
query: |
|
||||
select pg_database_size(datname) as db_size, deadlocks,
|
||||
tup_inserted as inserted, tup_updated as updated, tup_deleted as deleted,
|
||||
datname
|
||||
from pg_stat_database
|
||||
where datname IN (
|
||||
select datname
|
||||
from pg_database
|
||||
where datname <> 'postgres' and not datistemplate
|
||||
order by oid
|
||||
limit 1
|
||||
);
|
||||
|
||||
- metric_name: max_cluster_size
|
||||
type: gauge
|
||||
help: 'neon.max_cluster_size setting'
|
||||
key_labels:
|
||||
values: [max_cluster_size]
|
||||
query: |
|
||||
select setting::int as max_cluster_size from pg_settings where name = 'neon.max_cluster_size';
|
||||
|
||||
- metric_name: db_total_size
|
||||
type: gauge
|
||||
help: 'Size of all databases'
|
||||
key_labels:
|
||||
values: [total]
|
||||
query: |
|
||||
select sum(pg_database_size(datname)) as total from pg_database;
|
||||
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user