Compare commits

..

13 Commits

Author SHA1 Message Date
Konstantin Knizhnik
42d0b040f8 Fix merge conflicts 2024-03-11 17:34:05 +02:00
Konstantin Knizhnik
9832638c09 Add compression tag to BLOBs stored in image layer 2024-03-10 22:10:12 +02:00
Konstantin Knizhnik
62e7638c69 Store compression algorithm in image layer metadata 2024-03-10 21:54:47 +02:00
Konstantin Knizhnik
0dad8e427d Update pageserver/src/walingest.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2024-03-10 21:52:03 +02:00
Konstantin Knizhnik
4cfa2fdca5 Support compression of get_page responses 2024-03-10 21:52:01 +02:00
Konstantin Knizhnik
56ddf8e37f Build Postgres with lz4 support 2024-03-10 21:49:34 +02:00
Konstantin Knizhnik
ed4bb3073f Resolve merge conflict 2024-03-10 21:48:47 +02:00
Konstantin Knizhnik
b7e7aeed4d Peform compression of page images in storage 2024-03-10 21:48:00 +02:00
Konstantin Knizhnik
a880178cca Support lx4 WAL compression 2024-03-10 21:38:00 +02:00
Joonas Koivunen
b09d686335 fix: on-demand downloads can outlive timeline shutdown (#7051)
## Problem

Before this PR, it was possible that on-demand downloads were started
after `Timeline::shutdown()`.

For example, we have observed a walreceiver-connection-handler-initiated
on-demand download that was started after `Timeline::shutdown()`s final
`task_mgr::shutdown_tasks()` call.

The underlying issue is that `task_mgr::shutdown_tasks()` isn't sticky,
i.e., new tasks can be spawned during or after
`task_mgr::shutdown_tasks()`.

Cc: https://github.com/neondatabase/neon/issues/4175 in lieu of a more
specific issue for task_mgr. We already decided we want to get rid of it
anyways.

Original investigation:
https://neondb.slack.com/archives/C033RQ5SPDH/p1709824952465949

## Changes

- enter gate while downloading
- use timeline cancellation token for cancelling download

thereby, fixes #7054

Entering the gate might also remove recent "kept the gate from closing"
in staging.
2024-03-09 13:09:08 +00:00
Christian Schwarz
74d24582cf throttling: exclude throttled time from basebackup (fixup of #6953) (#7072)
PR #6953 only excluded throttled time from the handle_pagerequests
(aka smgr metrics).

This PR implements the deduction for `basebackup ` queries.

The other page_service methods either don't use Timeline::get
or they aren't used in production.

Found by manually inspecting in [staging
logs](https://neonprod.grafana.net/explore?schemaVersion=1&panes=%7B%22wx8%22:%7B%22datasource%22:%22xHHYY0dVz%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22%7Bhostname%3D%5C%22pageserver-0.eu-west-1.aws.neon.build%5C%22%7D%20%7C~%20%60git-env%7CERR%7CWARN%60%22,%22queryType%22:%22range%22,%22datasource%22:%7B%22type%22:%22loki%22,%22uid%22:%22xHHYY0dVz%22%7D,%22editorMode%22:%22code%22%7D%5D,%22range%22:%7B%22to%22:%221709919114642%22,%22from%22:%221709904430898%22%7D%7D%7D).
2024-03-09 13:37:02 +01:00
Sasha Krassovsky
4834d22d2d Revoke REPLICATION (#7052)
## Problem
Currently users can cause problems with replication
## Summary of changes
Don't let them replicate
2024-03-08 22:24:30 +00:00
Anastasia Lubennikova
86e8c43ddf Add downgrade scripts for neon extension. (#7065)
## Problem

When we start compute with newer version of extension (i.e. 1.2) and
then rollback the release, downgrading the compute version, next compute
start will try to update extension to the latest version available in
neon.control (i.e. 1.1).

Thus we need to provide downgrade scripts like neon--1.2--1.1.sql

These scripts must revert the changes made by the upgrade scripts in the
reverse order. This is necessary to ensure that the next upgrade will
work correctly.

In general, we need to write upgrade and downgrade scripts to be more
robust and add IF EXISTS / CREATE OR REPLACE clauses to all statements
(where applicable).

## Summary of changes
Adds downgrade scripts.
Adds test cases for extension downgrade/upgrade. 

fixes #7066

This is a follow-up for
https://app.incident.io/neondb/incidents/167?tab=follow-ups

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Alex Chi Z <iskyzh@gmail.com>
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
2024-03-08 20:42:35 +00:00
54 changed files with 634 additions and 1780 deletions

12
Cargo.lock generated
View File

@@ -282,10 +282,8 @@ dependencies = [
"control_plane",
"diesel",
"diesel_migrations",
"fail",
"futures",
"git-version",
"hex",
"humantime",
"hyper",
"metrics",
@@ -2843,6 +2841,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lz4_flex"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
dependencies = [
"twox-hash",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3513,6 +3520,7 @@ dependencies = [
"hyper",
"itertools",
"leaky-bucket",
"lz4_flex",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -100,6 +100,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11.1"
md5 = "0.7.0"
memoffset = "0.8"
native-tls = "0.2"

View File

@@ -23,12 +23,12 @@ endif
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
# Seccomp BPF is only available for Linux
PG_CONFIGURE_OPTS += --with-libseccomp
PG_CONFIGURE_OPTS += --with-lz4 --with-libseccomp
else ifeq ($(UNAME_S),Darwin)
# macOS with brew-installed openssl requires explicit paths
# It can be configured with OPENSSL_PREFIX variable
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
PG_CONFIGURE_OPTS += --with-lz4 --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
# brew formulae are keg-only and not symlinked into HOMEBREW_PREFIX, force their usage

View File

@@ -302,9 +302,9 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
RoleAction::Create => {
// This branch only runs when roles are created through the console, so it is
// safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
// from neon_superuser.
// from neon_superuser. (NOTE: REPLICATION has been removed from here for now).
let mut query: String = format!(
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS IN ROLE neon_superuser",
name.pg_quote()
);
info!("running role create query: '{}'", &query);
@@ -805,6 +805,18 @@ $$;"#,
"",
"",
// Add new migrations below.
r#"
DO $$
DECLARE
role_name TEXT;
BEGIN
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
END LOOP;
END
$$;"#,
];
let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";

View File

@@ -19,10 +19,8 @@ aws-config.workspace = true
aws-sdk-secretsmanager.workspace = true
camino.workspace = true
clap.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
hyper.workspace = true
humantime.workspace = true
once_cell.workspace = true

View File

@@ -1,4 +1,3 @@
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
@@ -24,13 +23,10 @@ struct ShardedComputeHookTenant {
stripe_size: ShardStripeSize,
shard_count: ShardCount,
shards: Vec<(ShardNumber, NodeId)>,
// Async lock used for ensuring that remote compute hook calls are ordered identically to updates to this structure
lock: Arc<tokio::sync::Mutex<()>>,
}
enum ComputeHookTenant {
Unsharded((NodeId, Arc<tokio::sync::Mutex<()>>)),
Unsharded(NodeId),
Sharded(ShardedComputeHookTenant),
}
@@ -42,17 +38,9 @@ impl ComputeHookTenant {
shards: vec![(tenant_shard_id.shard_number, node_id)],
stripe_size,
shard_count: tenant_shard_id.shard_count,
lock: Arc::default(),
})
} else {
Self::Unsharded((node_id, Arc::default()))
}
}
fn get_lock(&self) -> &Arc<tokio::sync::Mutex<()>> {
match self {
Self::Unsharded((_node_id, lock)) => lock,
Self::Sharded(sharded_tenant) => &sharded_tenant.lock,
Self::Unsharded(node_id)
}
}
@@ -65,9 +53,7 @@ impl ComputeHookTenant {
node_id: NodeId,
) {
match self {
Self::Unsharded((existing_node_id, _lock))
if tenant_shard_id.shard_count.count() == 1 =>
{
Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => {
*existing_node_id = node_id
}
Self::Sharded(sharded_tenant)
@@ -136,15 +122,9 @@ pub(crate) enum NotifyError {
}
impl ComputeHookTenant {
fn maybe_reconfigure(
&self,
tenant_id: TenantId,
) -> Option<(
ComputeHookNotifyRequest,
impl std::future::Future<Output = tokio::sync::OwnedMutexGuard<()>>,
)> {
let request = match self {
Self::Unsharded((node_id, _lock)) => Some(ComputeHookNotifyRequest {
fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option<ComputeHookNotifyRequest> {
match self {
Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest {
tenant_id,
shards: vec![ComputeHookNotifyRequestShard {
shard_number: ShardNumber(0),
@@ -178,9 +158,7 @@ impl ComputeHookTenant {
);
None
}
};
request.map(|r| (r, self.get_lock().clone().lock_owned()))
}
}
}
@@ -189,11 +167,8 @@ impl ComputeHookTenant {
/// the compute connection string.
pub(super) struct ComputeHook {
config: Config,
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
state: tokio::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
authorization_header: Option<String>,
// This lock is only used in testing enviroments, to serialize calls into neon_lock
neon_local_lock: tokio::sync::Mutex<()>,
}
impl ComputeHook {
@@ -207,7 +182,6 @@ impl ComputeHook {
state: Default::default(),
config,
authorization_header,
neon_local_lock: Default::default(),
}
}
@@ -216,10 +190,6 @@ impl ComputeHook {
&self,
reconfigure_request: ComputeHookNotifyRequest,
) -> anyhow::Result<()> {
// neon_local updates are not safe to call concurrently, use a lock to serialize
// all calls to this function
let _locked = self.neon_local_lock.lock().await;
let env = match LocalEnv::load_config() {
Ok(e) => e,
Err(e) => {
@@ -370,38 +340,30 @@ impl ComputeHook {
stripe_size: ShardStripeSize,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let reconfigure_request = {
let mut locked = self.state.lock().unwrap();
let mut locked = self.state.lock().await;
use std::collections::hash_map::Entry;
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant
}
};
tenant.maybe_reconfigure(tenant_shard_id.tenant_id)
use std::collections::hash_map::Entry;
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant
}
};
let Some((reconfigure_request, lock_fut)) = reconfigure_request else {
let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id);
let Some(reconfigure_request) = reconfigure_request else {
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
// until it does.
tracing::info!("Tenant isn't yet ready to emit a notification");
return Ok(());
};
// Finish acquiring the tenant's async lock: this future was created inside the self.state
// lock above, so we are guaranteed to get this lock in the same order as callers took
// that lock. This ordering is essential: the cloud control plane must end up with the
// same end state for the tenant that we see.
let _guard = lock_fut.await;
if let Some(notify_url) = &self.config.compute_hook_url {
self.do_notify(notify_url, reconfigure_request, cancel)
.await
@@ -443,7 +405,6 @@ pub(crate) mod tests {
tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.shards
.len(),
1
@@ -451,7 +412,6 @@ pub(crate) mod tests {
assert!(tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.stripe_size
.is_none());
@@ -485,7 +445,6 @@ pub(crate) mod tests {
tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.shards
.len(),
2
@@ -494,7 +453,6 @@ pub(crate) mod tests {
tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.stripe_size,
Some(ShardStripeSize(32768))
);

View File

@@ -10,9 +10,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::auth::{Scope, SwappableJwtAuth};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
use utils::http::request::{must_get_query_param, parse_request_param};
use utils::id::{TenantId, TimelineId};
@@ -440,24 +438,6 @@ async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiEr
state.service.tenants_dump()
}
async fn handle_balance_all(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
service.balance_all()?;
json_response(StatusCode::OK, ())
}
async fn handle_balance_attached(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
service.balance_attached()?;
json_response(StatusCode::OK, ())
}
async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -574,9 +554,6 @@ pub fn make_router(
.post("/debug/v1/consistency_check", |r| {
request_span(r, handle_consistency_check)
})
.put("/debug/v1/failpoints", |r| {
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
})
.get("/control/v1/tenant/:tenant_id/locate", |r| {
tenant_service_handler(r, handle_tenant_locate)
})
@@ -595,12 +572,6 @@ pub fn make_router(
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
tenant_service_handler(r, handle_tenant_shard_split)
})
.post("/control/v1/balance/all", |r| {
tenant_service_handler(r, handle_balance_all)
})
.post("/control/v1/balance/attached", |r| {
tenant_service_handler(r, handle_balance_attached)
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.

View File

@@ -1,54 +0,0 @@
use std::{collections::HashMap, sync::Arc};
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
/// is needed at a tenant-wide granularity.
pub(crate) struct IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
}
impl<T> IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
pub(crate) fn shared(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().read_owned()
}
pub(crate) fn exclusive(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().write_owned()
}
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {
let mut locked = self.entities.lock().unwrap();
locked.retain(|_k, lock| lock.try_write().is_err())
}
}
impl<T> Default for IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
fn default() -> Self {
Self {
entities: std::sync::Mutex::new(HashMap::new()),
}
}
}

View File

@@ -4,7 +4,6 @@ use utils::seqwait::MonotonicCounter;
mod auth;
mod compute_hook;
pub mod http;
mod id_lock_map;
pub mod metrics;
mod node;
pub mod persistence;

View File

@@ -11,9 +11,6 @@ use diesel::prelude::*;
use diesel::Connection;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
@@ -75,14 +72,6 @@ pub(crate) enum DatabaseError {
Logical(String),
}
#[must_use]
pub(crate) enum AbortShardSplitStatus {
/// We aborted the split in the database by reverting to the parent shards
Aborted,
/// The split had already been persisted.
Complete,
}
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
@@ -581,42 +570,6 @@ impl Persistence {
})
.await
}
/// Used when the remote part of a shard split failed: we will revert the database state to have only
/// the parent shards, with SplitState::Idle.
pub(crate) async fn abort_shard_split(
&self,
split_tenant_id: TenantId,
new_shard_count: ShardCount,
) -> DatabaseResult<AbortShardSplitStatus> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
let aborted = conn.transaction(|conn| -> QueryResult<AbortShardSplitStatus> {
// Clear the splitting state on parent shards
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.ne(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)?;
// Parent shards are already gone: we cannot abort.
if updated == 0 {
return Ok(AbortShardSplitStatus::Complete);
}
// Erase child shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.execute(conn)?;
Ok(AbortShardSplitStatus::Aborted)
})?;
Ok(aborted)
})
.await
}
}
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
@@ -651,28 +604,6 @@ pub(crate) struct TenantShardPersistence {
pub(crate) config: String,
}
impl TenantShardPersistence {
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
if self.shard_count == 0 {
Ok(ShardIdentity::unsharded())
} else {
Ok(ShardIdentity::new(
ShardNumber(self.shard_number as u8),
ShardCount::new(self.shard_count as u8),
ShardStripeSize(self.shard_stripe_size as u32),
)?)
}
}
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[diesel(table_name = crate::schema::nodes)]

View File

@@ -1,6 +1,5 @@
use crate::persistence::Persistence;
use crate::service;
use hyper::StatusCode;
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
@@ -19,8 +18,6 @@ use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
/// Object with the lifetime of the background reconcile task that is created
/// for tenants which have a difference between their intent and observed states.
pub(super) struct Reconciler {
@@ -488,29 +485,17 @@ impl Reconciler {
)
.await
{
Some(Ok(observed)) => Some(observed),
Some(Err(mgmt_api::Error::ApiError(status, _msg)))
if status == StatusCode::NOT_FOUND =>
{
None
}
Some(Ok(observed)) => observed,
Some(Err(e)) => return Err(e.into()),
None => return Err(ReconcileError::Cancel),
};
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
match observed_conf {
Some(conf) => {
// Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
// if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
self.observed
.locations
.insert(attached_node.get_id(), ObservedStateLocation { conf });
}
None => {
// Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
self.observed.locations.remove(&attached_node.get_id());
}
}
self.observed.locations.insert(
attached_node.get_id(),
ObservedStateLocation {
conf: observed_conf,
},
);
}
Ok(())
@@ -540,12 +525,7 @@ impl Reconciler {
)));
};
let mut wanted_conf = attached_location_conf(
generation,
&self.shard,
&self.config,
!self.intent.secondary.is_empty(),
);
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
@@ -682,26 +662,10 @@ impl Reconciler {
}
}
/// We tweak the externally-set TenantConfig while configuring
/// locations, using our awareness of whether secondary locations
/// are in use to automatically enable/disable heatmap uploads.
fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
let mut config = config.clone();
if has_secondaries {
if config.heatmap_period.is_none() {
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
}
} else {
config.heatmap_period = None;
}
config
}
pub(crate) fn attached_location_conf(
generation: Generation,
shard: &ShardIdentity,
config: &TenantConfig,
has_secondaries: bool,
) -> LocationConfig {
LocationConfig {
mode: LocationConfigMode::AttachedSingle,
@@ -710,7 +674,7 @@ pub(crate) fn attached_location_conf(
shard_number: shard.number.0,
shard_count: shard.count.literal(),
shard_stripe_size: shard.stripe_size.0,
tenant_conf: ha_aware_config(config, has_secondaries),
tenant_conf: config.clone(),
}
}
@@ -725,6 +689,6 @@ pub(crate) fn secondary_location_conf(
shard_number: shard.number.0,
shard_count: shard.count.literal(),
shard_stripe_size: shard.stripe_size.0,
tenant_conf: ha_aware_config(config, true),
tenant_conf: config.clone(),
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -577,12 +577,7 @@ impl TenantState {
.generation
.expect("Attempted to enter attached state without a generation");
let wanted_conf = attached_location_conf(
generation,
&self.shard,
&self.config,
!self.intent.secondary.is_empty(),
);
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {

View File

@@ -774,10 +774,7 @@ impl Endpoint {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap();
let client = reqwest::Client::new();
let response = client
.post(format!(
"http://{}:{}/configure",

View File

@@ -17,7 +17,6 @@ use std::time::Duration;
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use hyper::StatusCode;
use pageserver_api::controller_api::NodeRegisterRequest;
use pageserver_api::models::{
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
@@ -263,11 +262,6 @@ impl PageServerNode {
match st {
Ok(()) => Ok(true),
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
Err(mgmt_api::Error::ApiError(status, _msg))
if status == StatusCode::SERVICE_UNAVAILABLE =>
{
Ok(false)
}
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
}
},

View File

@@ -29,7 +29,6 @@ pub mod launch_timestamp;
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
mod hll;
pub mod metric_vec_duration;
pub use hll::{HyperLogLog, HyperLogLogVec};
#[cfg(target_os = "linux")]
pub mod more_process_metrics;

View File

@@ -1,23 +0,0 @@
//! Helpers for observing duration on `HistogramVec` / `CounterVec` / `GaugeVec` / `MetricVec<T>`.
use std::{future::Future, time::Instant};
pub trait DurationResultObserver {
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration);
}
pub async fn observe_async_block_duration_by_result<
T,
E,
F: Future<Output = Result<T, E>>,
O: DurationResultObserver,
>(
observer: &O,
block: F,
) -> Result<T, E> {
let start = Instant::now();
let result = block.await;
let duration = start.elapsed();
observer.observe_result(&result, duration);
result
}

View File

@@ -757,6 +757,7 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
GetCompressedPage(PagestreamGetPageResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -996,6 +997,12 @@ impl PagestreamBeMessage {
bytes.put(&resp.page[..]);
}
Self::GetCompressedPage(resp) => {
bytes.put_u8(105); /* tag from pagestore_client.h */
bytes.put_u16(resp.page.len() as u16);
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put(resp.message.as_bytes());
@@ -1078,6 +1085,7 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
Self::GetCompressedPage(_) => "GetCompressedPage",
}
}
}

View File

@@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<boo
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
}
pub fn bkpimage_is_compressed_lz4(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
dispatch_pgversion!(
version,
Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info))
)
}
pub fn generate_wal_segment(
segno: u64,
system_id: u64,

View File

@@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c *
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
}
pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool {
false
}

View File

@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & ANY_COMPRESS_FLAG) != 0
}
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
}

View File

@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
(bimg_info & ANY_COMPRESS_FLAG) != 0
}
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
}

View File

@@ -17,6 +17,7 @@ use remote_storage::{
};
use test_context::test_context;
use test_context::AsyncTestContext;
use tokio::io::AsyncBufReadExt;
use tokio_util::sync::CancellationToken;
use tracing::info;
@@ -484,32 +485,33 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
{
let mut stream = ctx
let stream = ctx
.client
.download(&path, &cancel)
.await
.expect("download succeeds")
.download_stream;
let first = stream
.next()
.await
.expect("should have the first blob")
.expect("should have succeeded");
let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
tracing::info!(len = first.len(), "downloaded first chunk");
let first = reader.fill_buf().await.expect("should have the first blob");
let len = first.len();
tracing::info!(len, "downloaded first chunk");
assert!(
first.len() < len,
first.len() < file_len,
"uploaded file is too small, we downloaded all on first chunk"
);
reader.consume(len);
cancel.cancel();
let next = stream.next().await.expect("stream should have more");
let next = reader.fill_buf().await;
let e = next.expect_err("expected an error, but got a chunk?");
@@ -520,6 +522,10 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
"{inner:?}"
);
let e = DownloadError::from(e);
assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
}
let cancel = CancellationToken::new();

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -157,6 +157,7 @@ impl PagestreamClient {
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetCompressedPage(_)
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",

View File

@@ -2103,16 +2103,6 @@ where
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
{
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
"failpoint".into()
)));
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
anyhow::anyhow!("failpoint")
)));
}
// Spawn a new task to handle the request, to protect the handler from unexpected
// async cancellations. Most pageserver functions are not async cancellation safe.
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task
@@ -2257,7 +2247,7 @@ pub fn make_router(
.get("/v1/location_config", |r| {
api_handler(r, list_location_config_handler)
})
.get("/v1/location_config/:tenant_shard_id", |r| {
.get("/v1/location_config/:tenant_id", |r| {
api_handler(r, get_location_config_handler)
})
.put(

View File

@@ -40,7 +40,14 @@ use tracing::info;
/// format, bump this!
/// Note that TimelineMetadata uses its own version number to track
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const STORAGE_FORMAT_VERSION: u16 = 4;
/// Minimal sorage format version with compression support
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
/// Page image compression algorithm
pub const NO_COMPRESSION: u8 = 0;
pub const LZ4_COMPRESSION: u8 = 0;
pub const DEFAULT_PG_VERSION: u32 = 15;

View File

@@ -1,5 +1,4 @@
use enum_map::EnumMap;
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
@@ -1283,11 +1282,65 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|
})
});
impl DurationResultObserver for BasebackupQueryTime {
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
pub(crate) struct BasebackupQueryTimeOngoingRecording<'a, 'c> {
parent: &'a BasebackupQueryTime,
ctx: &'c RequestContext,
start: std::time::Instant,
}
impl BasebackupQueryTime {
pub(crate) fn start_recording<'c: 'a, 'a>(
&'a self,
ctx: &'c RequestContext,
) -> BasebackupQueryTimeOngoingRecording<'_, '_> {
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!(error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
BasebackupQueryTimeOngoingRecording {
parent: self,
ctx,
start,
}
}
}
impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
pub(crate) fn observe<T, E>(self, res: &Result<T, E>) {
let elapsed = self.start.elapsed();
let ex_throttled = self
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(ex_throttled) => ex_throttled,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!(error, "error deducting time spent throttled; this message is logged at a global rate limit");
});
elapsed
}
};
let label_value = if res.is_ok() { "ok" } else { "error" };
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
metric.observe(duration.as_secs_f64());
let metric = self
.parent
.0
.get_metric_with_label_values(&[label_value])
.unwrap();
metric.observe(ex_throttled.as_secs_f64());
}
}

View File

@@ -1155,9 +1155,18 @@ impl PageServerHandler {
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
let compressed = lz4_flex::block::compress(&page);
if compressed.len() < page.len() {
Ok(PagestreamBeMessage::GetCompressedPage(
PagestreamGetPageResponse {
page: Bytes::from(compressed),
},
))
} else {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
}
}
#[instrument(skip_all, fields(shard_id))]
@@ -1199,7 +1208,7 @@ impl PageServerHandler {
prev_lsn: Option<Lsn>,
full_backup: bool,
gzip: bool,
ctx: RequestContext,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -1214,7 +1223,7 @@ impl PageServerHandler {
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
info!("waiting for {}", lsn);
timeline.wait_lsn(lsn, &ctx).await?;
timeline.wait_lsn(lsn, ctx).await?;
timeline
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
.context("invalid basebackup lsn")?;
@@ -1236,7 +1245,7 @@ impl PageServerHandler {
lsn,
prev_lsn,
full_backup,
&ctx,
ctx,
)
.await?;
} else {
@@ -1257,7 +1266,7 @@ impl PageServerHandler {
lsn,
prev_lsn,
full_backup,
&ctx,
ctx,
)
.await?;
// shutdown the encoder to ensure the gzip footer is written
@@ -1269,7 +1278,7 @@ impl PageServerHandler {
lsn,
prev_lsn,
full_backup,
&ctx,
ctx,
)
.await?;
}
@@ -1449,25 +1458,25 @@ where
false
};
::metrics::metric_vec_duration::observe_async_block_duration_by_result(
&*metrics::BASEBACKUP_QUERY_TIME,
async move {
self.handle_basebackup_request(
pgb,
tenant_id,
timeline_id,
lsn,
None,
false,
gzip,
ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Result::<(), QueryError>::Ok(())
},
)
.await?;
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
let res = async {
self.handle_basebackup_request(
pgb,
tenant_id,
timeline_id,
lsn,
None,
false,
gzip,
&ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Result::<(), QueryError>::Ok(())
}
.await;
metric_recording.observe(&res);
res?;
}
// return pair of prev_lsn and last_lsn
else if query_string.starts_with("get_last_record_rlsn ") {
@@ -1563,7 +1572,7 @@ where
prev_lsn,
true,
false,
ctx,
&ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -16,6 +16,7 @@ use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use lz4_flex;
use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
let compressed = lz4_flex::block::compress(&img);
if compressed.len() < img.len() {
self.put(
rel_block_to_key(rel, blknum),
Value::CompressedImage(Bytes::from(compressed)),
);
} else {
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
}
Ok(())
}
@@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> {
if let Some((_, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else if let Value::CompressedImage(img) = value {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
Ok(Bytes::from(decompressed))
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates

View File

@@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE};
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),
/// An compressed page image contains a full copy of the page
CompressedImage(Bytes),
/// A WalRecord value contains a WAL record that needs to be
/// replayed get the full value. Replaying the WAL record
/// might need a previous version of the value (if will_init()
@@ -22,12 +24,17 @@ pub enum Value {
impl Value {
pub fn is_image(&self) -> bool {
matches!(self, Value::Image(_))
match self {
Value::Image(_) => true,
Value::CompressedImage(_) => true,
Value::WalRecord(_) => false,
}
}
pub fn will_init(&self) -> bool {
match self {
Value::Image(_) => true,
Value::CompressedImage(_) => true,
Value::WalRecord(rec) => rec.will_init(),
}
}

View File

@@ -272,9 +272,6 @@ pub enum TaskKind {
// Task that uploads a file to remote storage
RemoteUploadTask,
// Task that downloads a file from remote storage
RemoteDownloadTask,
// task that handles the initial downloading of all tenants
InitialLoad,

View File

@@ -11,13 +11,16 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
use lz4_flex;
use postgres_ffi::BLCKSZ;
use std::cmp::min;
use std::io::{Error, ErrorKind};
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_compressed_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let blknum = (offset / PAGE_SZ as u64) as u32;
let off = (offset % PAGE_SZ as u64) as usize;
let buf = self.read_blk(blknum, ctx).await?;
let compression_alg = buf[off];
let res = self.read_blob(offset + 1, ctx).await?;
if compression_alg == LZ4_COMPRESSION {
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
})
} else {
assert_eq!(compression_alg, NO_COMPRESSION);
Ok(res)
}
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_blob_into_buf(
@@ -211,6 +237,58 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(src_buf, Ok(()))
}
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result<u64, Error> {
let offset = self.offset;
let len = srcbuf.len();
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let mut is_compressed = false;
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(NO_COMPRESSION);
io_buf.put_u8(len as u8);
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", len),
));
}
if len == BLCKSZ as usize {
let compressed = lz4_flex::block::compress(&srcbuf);
if compressed.len() < len {
io_buf.put_u8(LZ4_COMPRESSION);
let mut len_buf = (compressed.len() as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
io_buf.extend_from_slice(&compressed[..]);
is_compressed = true;
}
if is_compressed {
io_buf.put_u8(NO_COMPRESSION);
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
}
}
}
let (io_buf, hdr_res) = self.write_all(io_buf).await;
match hdr_res {
Ok(_) => (),
Err(e) => return Err(e),
}
self.io_buf = Some(io_buf);
if is_compressed {
hdr_res.map(|_| offset)
} else {
let (_buf, res) = self.write_all(srcbuf).await;
res.map(|_| offset)
}
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
@@ -227,7 +305,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf).await
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
@@ -242,8 +319,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf).await
}
self.write_all(io_buf).await
}
.await;
self.io_buf = Some(io_buf);

View File

@@ -1440,31 +1440,6 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let r = self
.do_shard_split(tenant_shard_id, new_shard_count, ctx)
.await;
if r.is_err() {
// Shard splitting might have left the original shard in a partially shut down state (it
// stops the shard's remote timeline client). Reset it to ensure we leave things in
// a working state.
if self.get(tenant_shard_id).is_some() {
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
// Log this error because our return value will still be the original error, not this one.
tracing::warn!("Failed to reset {tenant_shard_id}: {e}");
}
}
}
r
}
pub(crate) async fn do_shard_split(
&self,
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
@@ -1491,10 +1466,6 @@ impl TenantManager {
.join(",")
);
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
if let Err(e) = tenant.split_prepare(&child_shards).await {
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
@@ -1504,10 +1475,6 @@ impl TenantManager {
return Err(e);
}
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
self.resources.deletion_queue_client.flush_advisory();
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
@@ -1529,16 +1496,11 @@ impl TenantManager {
anyhow::bail!("Detached parent shard in the middle of split!")
}
};
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Optimization: hardlink layers from the parent into the children, so that they don't have to
// re-download & duplicate the data referenced in their initial IndexPart
self.shard_split_hardlink(parent, child_shards.clone())
.await?;
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
// child shards to reach this point.
@@ -1575,10 +1537,6 @@ impl TenantManager {
.await?;
}
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
for child_shard_id in &child_shards {
let child_shard_id = *child_shard_id;
@@ -1611,10 +1569,6 @@ impl TenantManager {
timeline.timeline_id,
target_lsn
);
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
"failpoint"
)));
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
// Failure here might mean shutdown, in any case this part is an optimization
// and we shouldn't hold up the split operation.
@@ -1660,10 +1614,6 @@ impl TenantManager {
},
);
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
"failpoint"
)));
parent_slot_guard.drop_old_value()?;
// Phase 6: Release the InProgress on the parent shard

View File

@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use postgres_ffi::BLCKSZ;
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
@@ -147,12 +148,13 @@ impl ValuesReconstructState {
lsn: Lsn,
value: Value,
) -> ValueReconstructSituation {
let state = self
let mut error: Option<PageReconstructError> = None;
let key_state = self
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let situation = if let Ok(state) = key_state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
@@ -160,6 +162,21 @@ impl ValuesReconstructState {
state.img = Some((lsn, img));
true
}
Value::CompressedImage(img) => {
match lz4_flex::block::decompress(&img, BLCKSZ as usize) {
Ok(decompressed) => {
state.img = Some((lsn, Bytes::from(decompressed)));
true
}
Err(e) => {
error = Some(PageReconstructError::from(anyhow::anyhow!(
"Failed to decompress blobrom virtual file: {}",
e
)));
true
}
}
}
Value::WalRecord(rec) => {
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
@@ -178,7 +195,11 @@ impl ValuesReconstructState {
state.situation
} else {
ValueReconstructSituation::Complete
};
if let Some(err) = error {
*key_state = Err(err);
}
situation
}
/// Returns the Lsn at which this key is cached if one exists.

View File

@@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
@@ -813,6 +814,12 @@ impl DeltaLayerInner {
need_image = false;
break;
}
Value::CompressedImage(img) => {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed)));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
@@ -1102,6 +1109,9 @@ impl DeltaLayerInner {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::CompressedImage(img) => {
format!(" compressed img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
@@ -1138,6 +1148,11 @@ impl DeltaLayerInner {
let checkpoint = CheckPoint::decode(&img)?;
println!(" CHECKPOINT: {:?}", checkpoint);
}
Value::CompressedImage(img) => {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
let checkpoint = CheckPoint::decode(&decompressed)?;
println!(" CHECKPOINT: {:?}", checkpoint);
}
Value::WalRecord(_rec) => {
println!(" unexpected walrecord value for checkpoint key");
}

View File

@@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use crate::{
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
@@ -153,6 +155,7 @@ pub struct ImageLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
format_version: u16,
lsn: Lsn,
@@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner {
f.debug_struct("ImageLayerInner")
.field("index_start_blk", &self.index_start_blk)
.field("index_root_blk", &self.index_root_blk)
.field("format_version", &self.format_version)
.finish()
}
}
@@ -408,6 +412,7 @@ impl ImageLayerInner {
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
format_version: actual_summary.format_version,
lsn,
file,
file_id,
@@ -436,18 +441,20 @@ impl ImageLayerInner {
)
.await?
{
let blob = block_reader
.block_cursor()
.read_blob(
offset,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build(),
)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build();
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
block_reader
.block_cursor()
.read_compressed_blob(offset, &ctx)
.await
} else {
block_reader.block_cursor().read_blob(offset, &ctx).await
})
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
@@ -658,10 +665,7 @@ impl ImageLayerWriterInner {
///
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img).await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
let off = self.blob_writer.write_compressed_blob(img).await?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;

View File

@@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::walrecord;
use anyhow::{anyhow, ensure, Result};
use bytes::Bytes;
use lz4_flex;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, OnceLock};
use tracing::*;
@@ -133,6 +136,9 @@ impl InMemoryLayer {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
Ok(Value::CompressedImage(img)) => {
write!(&mut desc, " compressed img {} bytes", img.len())?;
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
write!(
@@ -184,6 +190,11 @@ impl InMemoryLayer {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
}
Value::CompressedImage(img) => {
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed)));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));

View File

@@ -880,23 +880,18 @@ impl LayerInner {
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let task_name = format!("download layer {}", self);
let (tx, rx) = tokio::sync::oneshot::channel();
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
let this: Arc<Self> = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
crate::task_mgr::TaskKind::RemoteDownloadTask,
Some(self.desc.tenant_shard_id),
Some(self.desc.timeline_id),
&task_name,
false,
async move {
let guard = timeline
.gate
.enter()
.map_err(|_| DownloadError::DownloadCancelled)?;
tokio::task::spawn(async move {
let _guard = guard;
let client = timeline
.remote_client
@@ -906,7 +901,7 @@ impl LayerInner {
let result = client.download_layer_file(
&this.desc.filename(),
&this.metadata(),
&crate::task_mgr::shutdown_token()
&timeline.cancel
)
.await;
@@ -929,7 +924,6 @@ impl LayerInner {
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
_ = timeline.cancel.cancelled() => {},
};
@@ -959,11 +953,10 @@ impl LayerInner {
}
}
}
Ok(())
}
.in_current_span(),
);
match rx.await {
Ok((Ok(()), permit)) => {
if let Some(reason) = self

View File

@@ -471,8 +471,9 @@ impl WalIngest {
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
// only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? ||
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?)
// do not materialize null pages because them most likely be soon replaced with real data
&& blk.bimg_len != 0
{
@@ -480,7 +481,21 @@ impl WalIngest {
let img_len = blk.bimg_len as usize;
let img_offs = blk.bimg_offset as usize;
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
if postgres_ffi::bkpimage_is_compressed_lz4(
blk.bimg_info,
modification.tline.pg_version,
)? {
let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize;
let decompressed = lz4_flex::block::decompress(
&decoded.record[img_offs..img_offs + img_len],
decompressed_img_len,
)
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
assert_eq!(decompressed.len(), decompressed_img_len);
image.extend_from_slice(&decompressed);
} else {
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
}
if blk.hole_length != 0 {
let tail = image.split_off(blk.hole_offset as usize);

View File

@@ -18,10 +18,10 @@ OBJS = \
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
SHLIB_LINK = -lcurl -llz4
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -0,0 +1,6 @@
-- the order of operations is important here
-- because the view depends on the function
DROP VIEW IF EXISTS neon_lfc_stats CASCADE;
DROP FUNCTION IF EXISTS neon_get_lfc_stats CASCADE;

View File

@@ -0,0 +1 @@
DROP VIEW IF EXISTS NEON_STAT_FILE_CACHE CASCADE;

View File

@@ -0,0 +1 @@
DROP FUNCTION IF EXISTS approximate_working_set_size(bool) CASCADE;

View File

@@ -44,6 +44,7 @@ typedef enum
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
T_NeonGetCompressedPageResponse
} NeonMessageTag;
/* base struct for c-style inheritance */
@@ -144,6 +145,15 @@ typedef struct
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
{
NeonMessageTag tag;
uint16 compressed_size;
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetCompressedPageResponse;
#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size))
typedef struct
{
NeonMessageTag tag;

View File

@@ -45,6 +45,10 @@
*/
#include "postgres.h"
#ifdef USE_LZ4
#include <lz4.h>
#endif
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
@@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonExistsResponse:
case T_NeonNblocksResponse:
case T_NeonGetPageResponse:
case T_NeonGetCompressedPageResponse:
case T_NeonErrorResponse:
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
@@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s)
Assert(msg_resp->tag == T_NeonGetPageResponse);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonGetCompressedPageResponse:
{
NeonGetCompressedPageResponse *msg_resp;
uint16 compressed_size = pq_getmsgint(s, 2);
msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size));
msg_resp->tag = tag;
msg_resp->compressed_size = compressed_size;
memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size);
pq_getmsgend(s);
Assert(msg_resp->tag == T_NeonGetCompressedPageResponse);
resp = (NeonResponse *) msg_resp;
break;
}
@@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg)
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetCompressedPageResponse:
{
NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\"");
appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonErrorResponse:
{
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
@@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
lfc_write(rinfo, forkNum, blkno, buffer);
break;
case T_NeonGetCompressedPageResponse:
{
#ifndef USE_LZ4
ereport(ERROR, \
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
errmsg("compression method lz4 not supported"), \
errdetail("This functionality requires the server to be built with lz4 support."), \
errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
#else
NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp;
int rc = LZ4_decompress_safe(cp->page,
buffer,
cp->compressed_size,
BLCKSZ);
if (rc != BLCKSZ) {
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg_internal("compressed lz4 data is corrupt")));
}
lfc_write(rinfo, forkNum, blkno, buffer);
#endif
break;
}
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),

View File

@@ -1518,7 +1518,6 @@ class NeonCli(AbstractNeonCli):
conf: Optional[Dict[str, Any]] = None,
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
placement_policy: Optional[str] = None,
set_default: bool = False,
) -> Tuple[TenantId, TimelineId]:
"""
@@ -1552,9 +1551,6 @@ class NeonCli(AbstractNeonCli):
if shard_stripe_size is not None:
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
if placement_policy is not None:
args.extend(["--placement-policy", str(placement_policy)])
res = self.raw_cli(args)
res.check_returncode()
return tenant_id, timeline_id
@@ -2172,37 +2168,6 @@ class NeonAttachmentService(MetricsGetter):
)
log.info("Attachment service passed consistency check")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.request(
"PUT",
f"{self.env.attachment_service_api}/debug/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
headers=self.headers(TokenScope.ADMIN),
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
def balance_all(self):
self.request(
"POST",
f"{self.env.attachment_service_api}/control/v1/balance/all",
headers=self.headers(TokenScope.ADMIN),
)
def balance_attached(self):
self.request(
"POST",
f"{self.env.attachment_service_api}/control/v1/balance/attached",
headers=self.headers(TokenScope.ADMIN),
)
def __enter__(self) -> "NeonAttachmentService":
return self
@@ -2357,16 +2322,16 @@ class NeonPageserver(PgProtocol):
def assert_no_errors(self):
logfile = self.workdir / "pageserver.log"
if not logfile.exists():
log.warning(f"Skipping log check on pageserver {self.id}: {logfile} does not exist")
log.warning(f"Skipping log check: {logfile} does not exist")
return
with logfile.open("r") as f:
errors = scan_pageserver_log_for_errors(f, self.allowed_errors)
for _lineno, error in errors:
log.info(f"not allowed error (pageserver {self.id}): {error.strip()}")
log.info(f"not allowed error: {error.strip()}")
assert not errors, f"Pageserver {self.id}: {errors}"
assert not errors
def assert_no_metric_errors(self):
"""

View File

@@ -1,4 +1,3 @@
import threading
from typing import Optional
from fixtures.log_helper import log
@@ -12,10 +11,6 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
ENDPOINT_LOCK = threading.Lock()
class Workload:
"""
@@ -46,30 +41,17 @@ class Workload:
self._endpoint: Optional[Endpoint] = None
def reconfigure(self):
"""
Request the endpoint to reconfigure based on location reported by storage controller
"""
if self._endpoint is not None:
with ENDPOINT_LOCK:
self._endpoint.reconfigure()
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
# We may be running alongside other Workloads for different tenants. Full TTID is
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
with ENDPOINT_LOCK:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id=endpoint_id,
)
self._endpoint.start(pageserver_id=pageserver_id)
else:
self._endpoint.reconfigure(pageserver_id=pageserver_id)
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id="ep-workload",
)
self._endpoint.start(pageserver_id=pageserver_id)
else:
self._endpoint.reconfigure(pageserver_id=pageserver_id)
connstring = self._endpoint.safe_psql(
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
@@ -112,7 +94,7 @@ class Workload:
else:
return False
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
assert self.expect_rows >= n
max_iters = 10
@@ -150,28 +132,22 @@ class Workload:
]
)
if ingest:
# Wait for written data to be ingested by the pageserver
for tenant_shard_id, pageserver in tenant_get_shards(
self.env, self.tenant_id, pageserver_id
):
last_flush_lsn = wait_for_last_flush_lsn(
self.env,
endpoint,
self.tenant_id,
self.timeline_id,
pageserver_id=pageserver_id,
)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
for tenant_shard_id, pageserver in tenant_get_shards(
self.env, self.tenant_id, pageserver_id
):
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
if upload:
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
def validate(self, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)

View File

@@ -15,7 +15,7 @@ def test_migrations(neon_simple_env: NeonEnv):
endpoint.wait_for_migrations()
num_migrations = 8
num_migrations = 9
with endpoint.cursor() as cur:
cur.execute("SELECT id FROM neon_migration.migration_id")

View File

@@ -29,3 +29,34 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
log.info(res)
assert len(res) == 1
assert len(res[0]) == 5
# Verify that the neon extension can be upgraded/downgraded.
def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_neon_extension_compatibility")
endpoint_main = env.endpoints.create("test_neon_extension_compatibility")
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
all_versions = ["1.3", "1.2", "1.1", "1.0"]
current_version = "1.3"
for idx, begin_version in enumerate(all_versions):
for target_version in all_versions[idx + 1 :]:
if current_version != begin_version:
cur.execute(
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {current_version}->{begin_version}"
)
current_version = begin_version
# downgrade
cur.execute(
f"ALTER EXTENSION neon UPDATE TO '{target_version}'; -- {begin_version}->{target_version}"
)
# upgrade
cur.execute(
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}"
)

View File

@@ -1,17 +1,13 @@
import os
from typing import Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AttachmentServiceApiException,
NeonEnv,
NeonEnvBuilder,
tenant_get_shards,
)
from fixtures.remote_storage import s3_storage
from fixtures.types import Lsn, TenantShardId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
@@ -404,245 +400,3 @@ def test_sharding_ingest(
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count
class Failure:
pageserver_id: Optional[int]
def apply(self, env: NeonEnv):
raise NotImplementedError()
def clear(self, env: NeonEnv):
"""
Clear the failure, in a way that should enable the system to proceed
to a totally clean state (all nodes online and reconciled)
"""
raise NotImplementedError()
def expect_available(self):
raise NotImplementedError()
def can_mitigate(self):
"""Whether Self.mitigate is available for use"""
return False
def mitigate(self, env: NeonEnv):
"""
Mitigate the failure in a way that should allow shard split to
complete and service to resume, but does not guarantee to leave
the whole world in a clean state (e.g. an Offline node might have
junk LocationConfigs on it)
"""
raise NotImplementedError()
def fails_forward(self):
"""
If true, this failure results in a state that eventualy completes the split.
"""
return False
class PageserverFailpoint(Failure):
def __init__(self, failpoint, pageserver_id, mitigate):
self.failpoint = failpoint
self.pageserver_id = pageserver_id
self._mitigate = mitigate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.allowed_errors.extend(
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
)
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
if self._mitigate:
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Active"})
def expect_available(self):
return True
def can_mitigate(self):
return self._mitigate
def mitigate(self, env):
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
class StorageControllerFailpoint(Failure):
def __init__(self, failpoint):
self.failpoint = failpoint
self.pageserver_id = None
def apply(self, env: NeonEnv):
env.attachment_service.configure_failpoints((self.failpoint, "return(1)"))
def clear(self, env: NeonEnv):
env.attachment_service.configure_failpoints((self.failpoint, "off"))
def expect_available(self):
return True
def can_mitigate(self):
return False
def fails_forward(self):
# Edge case: the very last failpoint that simulates a DB connection error, where
# the abort path will fail-forward and result in a complete split.
return self.failpoint == "shard-split-post-complete"
class NodeKill(Failure):
def __init__(self, pageserver_id, mitigate):
self.pageserver_id = pageserver_id
self._mitigate = mitigate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=True)
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.start()
def expect_available(self):
return False
def mitigate(self, env):
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
@pytest.mark.parametrize(
"failure",
[
PageserverFailpoint("api-500", 1, False),
NodeKill(1, False),
PageserverFailpoint("api-500", 1, True),
NodeKill(1, True),
PageserverFailpoint("shard-split-pre-prepare", 1, False),
PageserverFailpoint("shard-split-post-prepare", 1, False),
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
PageserverFailpoint("shard-split-post-hardlink", 1, False),
PageserverFailpoint("shard-split-post-child-conf", 1, False),
PageserverFailpoint("shard-split-lsn-wait", 1, False),
PageserverFailpoint("shard-split-pre-finish", 1, False),
StorageControllerFailpoint("shard-split-validation"),
StorageControllerFailpoint("shard-split-post-begin"),
StorageControllerFailpoint("shard-split-post-remote"),
StorageControllerFailpoint("shard-split-post-complete"),
],
)
def test_sharding_split_failures(neon_env_builder: NeonEnvBuilder, failure: Failure):
neon_env_builder.num_pageservers = 4
initial_shard_count = 2
split_shard_count = 4
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
assert (
failure.pageserver_id is None
or len(
env.get_pageserver(failure.pageserver_id)
.http_client()
.tenant_list_locations()["tenant_shards"]
)
> 0
)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(100)
# Set one pageserver to 500 all requests, then do a split
# TODO: also test with a long-blocking failure: controller should time out its request and then
# clean up in a well defined way.
failure.apply(env)
with pytest.raises(AttachmentServiceApiException):
env.attachment_service.tenant_shard_split(tenant_id, shard_count=4)
# We expect that the overall operation will fail, but some split requests
# will have succeeded: the net result should be to return to a clean state, including
# detaching any child shards.
def assert_rolled_back(exclude_ps_id=None) -> None:
count = 0
for ps in env.pageservers:
if exclude_ps_id is not None and ps.id == exclude_ps_id:
continue
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
assert tenant_shard_id.shard_count == initial_shard_count
count += 1
assert count == initial_shard_count
def assert_split_done(exclude_ps_id=None) -> None:
count = 0
for ps in env.pageservers:
if exclude_ps_id is not None and ps.id == exclude_ps_id:
continue
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
assert tenant_shard_id.shard_count == split_shard_count
count += 1
assert count == split_shard_count
def finish_split():
# Having failed+rolled back, we should be able to split again
# No failures this time; it will succeed
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
workload.churn_rows(10)
workload.validate()
if failure.expect_available():
# Even though the split failed partway through, this should not have interrupted
# clients. Disable waiting for pageservers in the workload helper, because our
# failpoints may prevent API access.
# This only applies for failure modes that leave pageserver page_service API available.
workload.churn_rows(10, upload=False, ingest=False)
workload.validate()
if failure.fails_forward():
# A failure type which results in eventual completion of the split
wait_until(30, 1, assert_split_done)
elif failure.can_mitigate():
# Mitigation phase: we expect to be able to proceed with a successful shard split
failure.mitigate(env)
# The split should appear to be rolled back from the point of view of all pageservers
# apart from the one that is offline
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
finish_split()
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
# Having cleared the failure, everything should converge to a pristine state
failure.clear(env)
wait_until(30, 1, assert_split_done)
else:
# Once we restore the faulty pageserver's API to good health, rollback should
# eventually complete.
failure.clear(env)
wait_until(30, 1, assert_rolled_back)
# Having rolled back, the tenant should be working
workload.churn_rows(10)
workload.validate()
# Splitting again should work, since we cleared the failure
finish_split()
assert_split_done()
env.attachment_service.consistency_check()

View File

@@ -1,5 +1,3 @@
import concurrent.futures
import random
import time
from collections import defaultdict
from datetime import datetime, timezone
@@ -25,9 +23,8 @@ from fixtures.pageserver.utils import (
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.types import TenantId, TenantShardId, TimelineId
from fixtures.types import TenantId, TimelineId
from fixtures.utils import run_pg_bench_small, wait_until
from fixtures.workload import Workload
from mypy_boto3_s3.type_defs import (
ObjectTypeDef,
)
@@ -773,186 +770,3 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
env.attachment_service.consistency_check()
def test_storcon_rolling_failures(
neon_env_builder: NeonEnvBuilder, httpserver: HTTPServer, httpserver_listen_address
):
neon_env_builder.num_pageservers = 8
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify-attach"
workloads: dict[TenantId, Workload] = {}
# Do neon_local endpoint reconfiguration in the background so that we can
# accept a healthy rate of calls into notify-attach.
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def handler(request: Request):
"""
Although storage controller can use neon_local directly, this causes problems when
the test is also concurrently modifying endpoints. Instead, configure storage controller
to send notifications up to this test code, which will route all endpoint updates
through Workload, which has a mutex to make it all safe.
"""
assert request.json is not None
body: dict[str, Any] = request.json
log.info(f"notify-attach request: {body}")
try:
workload = workloads[TenantId(body["tenant_id"])]
except KeyError:
pass
else:
# This causes the endpoint to query storage controller for its location, which
# is redundant since we already have it here, but this avoids extending the
# neon_local CLI to take full lists of locations
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
return Response(status=200)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
env = neon_env_builder.init_start()
for ps in env.pageservers:
# We will do unclean detaches
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
n_tenants = 32
tenants = [(env.initial_tenant, env.initial_timeline)]
for i in range(0, n_tenants - 1):
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
shard_count = [1, 2, 4][i % 3]
env.neon_cli.create_tenant(
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
)
tenants.append((tenant_id, timeline_id))
# Background pain:
# - TODO: some fraction of pageserver API requests hang
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
# the ones we're using for availability checks.
rng = random.Random(0xDEADBEEF)
for tenant_id, timeline_id in tenants:
workload = Workload(env, tenant_id, timeline_id)
workloads[tenant_id] = workload
def node_evacuated(node_id: int):
counts = get_node_shard_counts(env, [t[0] for t in tenants])
assert counts[node_id] == 0
def attachments_active():
for tid, _tlid in tenants:
for shard in env.attachment_service.locate(tid):
psid = shard["node_id"]
tsid = TenantShardId.parse(shard["shard_id"])
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
assert status["state"]["slug"] == "Active"
log.info(f"Shard {tsid} active on node {psid}")
failpoints = ("api-503", "5%1000*return(1)")
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
for ps in env.pageservers:
ps.http_client().configure_failpoints(failpoints)
def for_all_workloads(callback, timeout=60):
futs = []
with concurrent.futures.ThreadPoolExecutor() as pool:
for _tenant_id, workload in workloads.items():
futs.append(pool.submit(callback, workload))
for f in futs:
f.result(timeout=timeout)
def clean_fail_restore():
"""
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
locations to activate, then SIGTERM it.
- Endpoints should not fail any queries
- New attach locations should activate within bounded time.
"""
victim = rng.choice(env.pageservers)
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
victim.stop(immediate=False)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# Revert shards to attach at their original locations
env.attachment_service.balance_attached()
wait_until(10, 1, attachments_active)
def hard_fail_restore():
"""
Simulate an unexpected death of a pageserver node
"""
victim = rng.choice(env.pageservers)
victim.stop(immediate=True)
# TODO: once we implement heartbeats detecting node failures, remove this
# explicit marking offline and rely on storage controller to detect it itself.
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
env.attachment_service.balance_attached()
wait_until(10, 1, attachments_active)
def traffic():
"""
Check that all tenants are working for postgres clients
"""
def exercise_one(workload):
workload.churn_rows(100)
workload.validate()
for_all_workloads(exercise_one)
def init_one(workload):
workload.init()
workload.write_rows(100)
for_all_workloads(init_one, timeout=60)
for i in range(0, 20):
mode = rng.choice([0, 1, 2])
log.info(f"Iteration {i}, mode {mode}")
if mode == 0:
# Traffic interval: sometimes, instead of a failure, just let the clients
# write a load of data. This avoids chaos tests ending up with unrealistically
# small quantities of data in flight.
traffic()
elif mode == 1:
clean_fail_restore()
elif mode == 2:
hard_fail_restore()
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
# Success criteria:
# - New attach locations should activate within bounded time
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
# TODO: fail and remove: fail a node, and remove it from the cluster.
# Success criteria:
# - Endpoints should not fail any queries
# - New attach locations should activate within bounded time
# - New secondary locations should fill up with data within bounded time
# TODO: somehow need to wait for reconciles to complete before doing consistency check
# (or make the check wait).
# Do consistency check on every iteration, not just at the end: this makes it more obvious
# which change caused an issue.
env.attachment_service.consistency_check()

View File

@@ -190,6 +190,8 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
# So by ignoring these instead of waiting for empty upload queue
# we execute more distinct code paths.
'.*stopping left-over name="remote upload".*',
# an on-demand is cancelled by shutdown
".*initial size calculation failed: downloading failed, possibly for shutdown",
]
)

View File

@@ -213,7 +213,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
# This happens when timeline remains are cleaned up during loading
".*Timeline dir entry become invalid.*",
# In one of the branches we poll for tenant to become active. Polls can generate this log message:
f".*Tenant {env.initial_tenant} is not active*",
f".*Tenant {env.initial_tenant} is not active.*",
# an on-demand is cancelled by shutdown
".*initial size calculation failed: downloading failed, possibly for shutdown",
]
)