Compare commits

..

15 Commits

Author SHA1 Message Date
Konstantin Knizhnik
70d1086e0f Prepare for first stage of deployment: do not bump format version and do not write data in new format but recognoze new format 2024-03-15 10:02:51 +02:00
Konstantin Knizhnik
5a8e8baf9f Make ruff happy 2024-03-14 18:05:30 +02:00
Konstantin Knizhnik
57a4119a7b Add test for compression 2024-03-14 16:45:45 +02:00
Konstantin Knizhnik
aaef3789b0 Ignore format version when comparing summary for delta_layer 2024-03-14 14:21:35 +02:00
Konstantin Knizhnik
0b57e0b8f2 Fix image layer format version matching 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
485ecbaf8f Fix test_attach_tenant_config.py test 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
0bcbce197a Fix test_attach_tenent_config.py test 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
19d59e58d2 Use CompressionAlgorithm enum 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
ce65d13dbd Add compress_image_layer to openapi spec 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
18fefff026 Fix compressed blob writer 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
2a69861896 Fix parse_tenant_config test 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
98375b3896 Support vectored comp[ressed blobs read 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
8c60359ae5 Emable iomage layer compression by default 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
8c7136b057 Add compress_image_layer property to TenantConfig 2024-03-14 08:33:37 +02:00
Konstantin Knizhnik
0df6c41eaa Compress image layer 2024-03-14 08:33:37 +02:00
40 changed files with 536 additions and 1639 deletions

12
Cargo.lock generated
View File

@@ -282,10 +282,8 @@ dependencies = [
"control_plane",
"diesel",
"diesel_migrations",
"fail",
"futures",
"git-version",
"hex",
"humantime",
"hyper",
"metrics",
@@ -2843,6 +2841,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lz4_flex"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
dependencies = [
"twox-hash",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3513,6 +3520,7 @@ dependencies = [
"hyper",
"itertools",
"leaky-bucket",
"lz4_flex",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -100,6 +100,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11.1"
md5 = "0.7.0"
memoffset = "0.8"
native-tls = "0.2"

View File

@@ -19,10 +19,8 @@ aws-config.workspace = true
aws-sdk-secretsmanager.workspace = true
camino.workspace = true
clap.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
hyper.workspace = true
humantime.workspace = true
once_cell.workspace = true

View File

@@ -10,9 +10,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::auth::{Scope, SwappableJwtAuth};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
use utils::http::request::{must_get_query_param, parse_request_param};
use utils::id::{TenantId, TimelineId};
@@ -556,9 +554,6 @@ pub fn make_router(
.post("/debug/v1/consistency_check", |r| {
request_span(r, handle_consistency_check)
})
.put("/debug/v1/failpoints", |r| {
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
})
.get("/control/v1/tenant/:tenant_id/locate", |r| {
tenant_service_handler(r, handle_tenant_locate)
})

View File

@@ -1,54 +0,0 @@
use std::{collections::HashMap, sync::Arc};
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
/// is needed at a tenant-wide granularity.
pub(crate) struct IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
}
impl<T> IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
pub(crate) fn shared(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().read_owned()
}
pub(crate) fn exclusive(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().write_owned()
}
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {
let mut locked = self.entities.lock().unwrap();
locked.retain(|_k, lock| lock.try_write().is_err())
}
}
impl<T> Default for IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
fn default() -> Self {
Self {
entities: std::sync::Mutex::new(HashMap::new()),
}
}
}

View File

@@ -4,7 +4,6 @@ use utils::seqwait::MonotonicCounter;
mod auth;
mod compute_hook;
pub mod http;
mod id_lock_map;
pub mod metrics;
mod node;
pub mod persistence;

View File

@@ -206,12 +206,6 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
}
fn main() -> anyhow::Result<()> {
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
std::process::exit(1);
}));
tokio::runtime::Builder::new_current_thread()
// We use spawn_blocking for database operations, so require approximately
// as many blocking threads as we will open database connections.

View File

@@ -83,38 +83,29 @@ impl Node {
}
}
pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
match self.get_availability_transition(availability) {
AvailabilityTransition::ToActive => {
pub(crate) fn set_availability(
&mut self,
availability: NodeAvailability,
) -> AvailabilityTransition {
use NodeAvailability::*;
let transition = match (self.availability, availability) {
(Offline, Active) => {
// Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
// users of previously-cloned copies of the node will still see the old cancellation
// state. For example, Reconcilers in flight will have to complete and be spawned
// again to realize that the node has become available.
self.cancel = CancellationToken::new();
AvailabilityTransition::ToActive
}
AvailabilityTransition::ToOffline => {
(Active, Offline) => {
// Fire the node's cancellation token to cancel any in-flight API requests to it
self.cancel.cancel();
AvailabilityTransition::ToOffline
}
AvailabilityTransition::Unchanged => {}
}
_ => AvailabilityTransition::Unchanged,
};
self.availability = availability;
}
/// Without modifying the availability of the node, convert the intended availability
/// into a description of the transition.
pub(crate) fn get_availability_transition(
&self,
availability: NodeAvailability,
) -> AvailabilityTransition {
use AvailabilityTransition::*;
use NodeAvailability::*;
match (self.availability, availability) {
(Offline, Active) => ToActive,
(Active, Offline) => ToOffline,
_ => Unchanged,
}
transition
}
/// Whether we may send API requests to this node.

View File

@@ -11,9 +11,6 @@ use diesel::prelude::*;
use diesel::Connection;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
@@ -75,14 +72,6 @@ pub(crate) enum DatabaseError {
Logical(String),
}
#[must_use]
pub(crate) enum AbortShardSplitStatus {
/// We aborted the split in the database by reverting to the parent shards
Aborted,
/// The split had already been persisted.
Complete,
}
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
@@ -581,51 +570,6 @@ impl Persistence {
})
.await
}
/// Used when the remote part of a shard split failed: we will revert the database state to have only
/// the parent shards, with SplitState::Idle.
pub(crate) async fn abort_shard_split(
&self,
split_tenant_id: TenantId,
new_shard_count: ShardCount,
) -> DatabaseResult<AbortShardSplitStatus> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
let aborted = conn.transaction(|conn| -> DatabaseResult<AbortShardSplitStatus> {
// Clear the splitting state on parent shards
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.ne(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)?;
// Parent shards are already gone: we cannot abort.
if updated == 0 {
return Ok(AbortShardSplitStatus::Complete);
}
// Sanity check: if parent shards were present, their cardinality should
// be less than the number of child shards.
if updated >= new_shard_count.count() as usize {
return Err(DatabaseError::Logical(format!(
"Unexpected parent shard count {updated} while aborting split to \
count {new_shard_count:?} on tenant {split_tenant_id}"
)));
}
// Erase child shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.execute(conn)?;
Ok(AbortShardSplitStatus::Aborted)
})?;
Ok(aborted)
})
.await
}
}
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
@@ -660,28 +604,6 @@ pub(crate) struct TenantShardPersistence {
pub(crate) config: String,
}
impl TenantShardPersistence {
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
if self.shard_count == 0 {
Ok(ShardIdentity::unsharded())
} else {
Ok(ShardIdentity::new(
ShardNumber(self.shard_number as u8),
ShardCount::new(self.shard_count as u8),
ShardStripeSize(self.shard_stripe_size as u32),
)?)
}
}
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[diesel(table_name = crate::schema::nodes)]

View File

@@ -1,6 +1,5 @@
use crate::persistence::Persistence;
use crate::service;
use hyper::StatusCode;
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
@@ -19,8 +18,6 @@ use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
/// Object with the lifetime of the background reconcile task that is created
/// for tenants which have a difference between their intent and observed states.
pub(super) struct Reconciler {
@@ -488,29 +485,17 @@ impl Reconciler {
)
.await
{
Some(Ok(observed)) => Some(observed),
Some(Err(mgmt_api::Error::ApiError(status, _msg)))
if status == StatusCode::NOT_FOUND =>
{
None
}
Some(Ok(observed)) => observed,
Some(Err(e)) => return Err(e.into()),
None => return Err(ReconcileError::Cancel),
};
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
match observed_conf {
Some(conf) => {
// Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
// if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
self.observed
.locations
.insert(attached_node.get_id(), ObservedStateLocation { conf });
}
None => {
// Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
self.observed.locations.remove(&attached_node.get_id());
}
}
self.observed.locations.insert(
attached_node.get_id(),
ObservedStateLocation {
conf: observed_conf,
},
);
}
Ok(())
@@ -540,12 +525,7 @@ impl Reconciler {
)));
};
let mut wanted_conf = attached_location_conf(
generation,
&self.shard,
&self.config,
!self.intent.secondary.is_empty(),
);
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
@@ -682,26 +662,10 @@ impl Reconciler {
}
}
/// We tweak the externally-set TenantConfig while configuring
/// locations, using our awareness of whether secondary locations
/// are in use to automatically enable/disable heatmap uploads.
fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
let mut config = config.clone();
if has_secondaries {
if config.heatmap_period.is_none() {
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
}
} else {
config.heatmap_period = None;
}
config
}
pub(crate) fn attached_location_conf(
generation: Generation,
shard: &ShardIdentity,
config: &TenantConfig,
has_secondaries: bool,
) -> LocationConfig {
LocationConfig {
mode: LocationConfigMode::AttachedSingle,
@@ -710,7 +674,7 @@ pub(crate) fn attached_location_conf(
shard_number: shard.number.0,
shard_count: shard.count.literal(),
shard_stripe_size: shard.stripe_size.0,
tenant_conf: ha_aware_config(config, has_secondaries),
tenant_conf: config.clone(),
}
}
@@ -725,6 +689,6 @@ pub(crate) fn secondary_location_conf(
shard_number: shard.number.0,
shard_count: shard.count.literal(),
shard_stripe_size: shard.stripe_size.0,
tenant_conf: ha_aware_config(config, true),
tenant_conf: config.clone(),
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -577,12 +577,7 @@ impl TenantState {
.generation
.expect("Attempted to enter attached state without a generation");
let wanted_conf = attached_location_conf(
generation,
&self.shard,
&self.config,
!self.intent.secondary.is_empty(),
);
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {

View File

@@ -384,6 +384,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
image_layer_compression: settings
.remove("image_layer_compression")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'image_layer_compression' json")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)
@@ -496,6 +501,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
image_layer_compression: settings
.remove("image_layer_compression")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'image_layer_compression' json")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)

View File

@@ -294,6 +294,7 @@ pub struct TenantConfig {
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
pub image_layer_compression: Option<CompressionAlgorithm>,
pub eviction_policy: Option<EvictionPolicy>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
@@ -327,6 +328,23 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
enum_map::Enum,
)]
#[repr(u8)]
pub enum CompressionAlgorithm {
NoCompression,
LZ4,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct EvictionPolicyLayerAccessThreshold {
#[serde(with = "humantime_serde")]

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -1538,6 +1538,7 @@ broker_endpoint = '{broker_endpoint}'
let broker_endpoint = "http://127.0.0.1:7777";
let trace_read_requests = true;
let image_layer_compression = pageserver_api::models::CompressionAlgorithm::LZ4;
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
@@ -1545,7 +1546,8 @@ pg_distrib_dir='{pg_distrib_dir}'
broker_endpoint = '{broker_endpoint}'
[tenant_config]
trace_read_requests = {trace_read_requests}"#,
trace_read_requests = {trace_read_requests}
image_layer_compression = 'LZ4'"#,
);
let toml = config_string.parse()?;
@@ -1555,6 +1557,10 @@ trace_read_requests = {trace_read_requests}"#,
conf.default_tenant_conf.trace_read_requests, trace_read_requests,
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
);
assert_eq!(
conf.default_tenant_conf.image_layer_compression, image_layer_compression,
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
);
Ok(())
}

View File

@@ -1444,6 +1444,8 @@ components:
type: integer
trace_read_requests:
type: boolean
image_layer_compression:
type: string
heatmap_period:
type: string
TenantConfigResponse:

View File

@@ -2108,16 +2108,6 @@ where
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
{
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
"failpoint".into()
)));
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
anyhow::anyhow!("failpoint")
)));
}
// Spawn a new task to handle the request, to protect the handler from unexpected
// async cancellations. Most pageserver functions are not async cancellation safe.
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task

View File

@@ -42,6 +42,9 @@ use tracing::info;
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
/// Minimal sorage format version with compression support
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
pub const DEFAULT_PG_VERSION: u32 = 15;
// Magic constants used to identify different kinds of files

View File

@@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::CompressionAlgorithm;
use pageserver_api::models::TimelineState;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity;
@@ -2288,6 +2289,13 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.image_layer_compression
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
}
pub fn get_min_resident_size_override(&self) -> Option<u64> {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
@@ -3637,6 +3645,7 @@ pub(crate) mod harness {
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
trace_read_requests: Some(tenant_conf.trace_read_requests),
image_layer_compression: Some(tenant_conf.image_layer_compression),
eviction_policy: Some(tenant_conf.eviction_policy),
min_resident_size_override: tenant_conf.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: Some(

View File

@@ -11,13 +11,16 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
use lz4_flex;
use pageserver_api::models::CompressionAlgorithm;
use postgres_ffi::BLCKSZ;
use std::cmp::min;
use std::io::{Error, ErrorKind};
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_compressed_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let blknum = (offset / PAGE_SZ as u64) as u32;
let off = (offset % PAGE_SZ as u64) as usize;
let buf = self.read_blk(blknum, ctx).await?;
let compression_alg = CompressionAlgorithm::from_repr(buf[off]).unwrap();
let res = self.read_blob(offset + 1, ctx).await?;
if compression_alg == CompressionAlgorithm::LZ4 {
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
})
} else {
assert_eq!(compression_alg, CompressionAlgorithm::NoCompression);
Ok(res)
}
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_blob_into_buf(
@@ -211,6 +237,61 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(src_buf, Ok(()))
}
pub async fn write_compressed_blob(
&mut self,
srcbuf: Bytes,
compression: CompressionAlgorithm,
) -> Result<u64, Error> {
let offset = self.offset;
let len = srcbuf.len();
let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let mut is_compressed = false;
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
io_buf.put_u8(len as u8);
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", len),
));
}
if compression == CompressionAlgorithm::LZ4 && len == BLCKSZ as usize {
let compressed = lz4_flex::block::compress(&srcbuf);
if compressed.len() < len {
io_buf.put_u8(compression as u8);
let mut len_buf = (compressed.len() as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
io_buf.extend_from_slice(&compressed[..]);
is_compressed = true;
}
}
if !is_compressed {
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
}
}
let (io_buf, hdr_res) = self.write_all(io_buf).await;
match hdr_res {
Ok(_) => (),
Err(e) => return Err(e),
}
self.io_buf = Some(io_buf);
if is_compressed {
hdr_res.map(|_| offset)
} else {
let (_buf, res) = self.write_all(srcbuf).await;
res.map(|_| offset)
}
}
/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
@@ -227,7 +308,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf).await
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
@@ -242,8 +322,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf).await
}
self.write_all(io_buf).await
}
.await;
self.io_buf = Some(io_buf);

View File

@@ -9,9 +9,9 @@
//! may lead to a data loss.
//!
use anyhow::bail;
use pageserver_api::models::CompactionAlgorithm;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::{self, ThrottleConfig};
use pageserver_api::models::{
self, CompactionAlgorithm, CompressionAlgorithm, EvictionPolicy, ThrottleConfig,
};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize};
@@ -41,6 +41,9 @@ pub mod defaults {
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
super::CompactionAlgorithm::Legacy;
pub const DEFAULT_COMPRESSION_ALGORITHM: super::CompressionAlgorithm =
super::CompressionAlgorithm::LZ4;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
@@ -345,6 +348,7 @@ pub struct TenantConf {
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub trace_read_requests: bool,
pub image_layer_compression: CompressionAlgorithm,
pub eviction_policy: EvictionPolicy,
pub min_resident_size_override: Option<u64>,
// See the corresponding metric's help string.
@@ -429,6 +433,10 @@ pub struct TenantConfOpt {
#[serde(default)]
pub trace_read_requests: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub image_layer_compression: Option<CompressionAlgorithm>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub eviction_policy: Option<EvictionPolicy>,
@@ -492,6 +500,9 @@ impl TenantConfOpt {
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
image_layer_compression: self
.image_layer_compression
.unwrap_or(global_conf.image_layer_compression),
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
min_resident_size_override: self
.min_resident_size_override
@@ -538,6 +549,7 @@ impl Default for TenantConf {
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
image_layer_compression: DEFAULT_COMPRESSION_ALGORITHM,
eviction_policy: EvictionPolicy::NoEviction,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
@@ -612,6 +624,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
max_lsn_wal_lag: value.max_lsn_wal_lag,
trace_read_requests: value.trace_read_requests,
image_layer_compression: value.image_layer_compression,
eviction_policy: value.eviction_policy,
min_resident_size_override: value.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: value

View File

@@ -1443,35 +1443,6 @@ impl TenantManager {
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let r = self
.do_shard_split(tenant_shard_id, new_shard_count, new_stripe_size, ctx)
.await;
if r.is_err() {
// Shard splitting might have left the original shard in a partially shut down state (it
// stops the shard's remote timeline client). Reset it to ensure we leave things in
// a working state.
if self.get(tenant_shard_id).is_some() {
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
// Log this error because our return value will still be the original error, not this one. This is
// a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
// (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
// setting it broken probably won't help either.
tracing::error!("Failed to reset {tenant_shard_id}: {e}");
}
}
}
r
}
pub(crate) async fn do_shard_split(
&self,
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
@@ -1506,10 +1477,6 @@ impl TenantManager {
.join(",")
);
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
let parent_shard_identity = tenant.shard_identity;
let parent_tenant_conf = tenant.get_tenant_conf();
let parent_generation = tenant.generation;
@@ -1523,10 +1490,6 @@ impl TenantManager {
return Err(e);
}
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
self.resources.deletion_queue_client.flush_advisory();
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
@@ -1548,16 +1511,11 @@ impl TenantManager {
anyhow::bail!("Detached parent shard in the middle of split!")
}
};
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Optimization: hardlink layers from the parent into the children, so that they don't have to
// re-download & duplicate the data referenced in their initial IndexPart
self.shard_split_hardlink(parent, child_shards.clone())
.await?;
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
// child shards to reach this point.
@@ -1597,10 +1555,6 @@ impl TenantManager {
.await?;
}
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
for child_shard_id in &child_shards {
let child_shard_id = *child_shard_id;
@@ -1633,10 +1587,6 @@ impl TenantManager {
timeline.timeline_id,
target_lsn
);
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
"failpoint"
)));
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
// Failure here might mean shutdown, in any case this part is an optimization
// and we shouldn't hold up the split operation.
@@ -1682,10 +1632,6 @@ impl TenantManager {
},
);
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
"failpoint"
)));
parent_slot_guard.drop_old_value()?;
// Phase 6: Release the InProgress on the parent shard

View File

@@ -724,6 +724,8 @@ impl DeltaLayerInner {
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if let Some(mut expected_summary) = summary {
// assume backward compatibility
expected_summary.format_version = actual_summary.format_version;
// production code path
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
@@ -966,7 +968,7 @@ impl DeltaLayerInner {
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.read_blobs(&read, buf.take().expect("Should have a buffer"), false)
.await;
let blobs_buf = match res {

View File

@@ -39,14 +39,17 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use crate::{
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{CompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::BLCKSZ;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
@@ -153,6 +156,7 @@ pub struct ImageLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
format_version: u16,
lsn: Lsn,
@@ -167,6 +171,7 @@ impl std::fmt::Debug for ImageLayerInner {
f.debug_struct("ImageLayerInner")
.field("index_start_blk", &self.index_start_blk)
.field("index_root_blk", &self.index_root_blk)
.field("format_version", &self.format_version)
.finish()
}
}
@@ -391,7 +396,12 @@ impl ImageLayerInner {
let actual_summary =
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if actual_summary.format_version > COMPRESSED_STORAGE_FORMAT_VERSION {
bail!("Forward compatibility of storage is not supported: current format version is {}, format version of layer {} is {}", COMPRESSED_STORAGE_FORMAT_VERSION, path, actual_summary.format_version);
}
if let Some(mut expected_summary) = summary {
// assume backward compatibility
expected_summary.format_version = actual_summary.format_version;
// production code path
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
@@ -408,6 +418,7 @@ impl ImageLayerInner {
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
format_version: actual_summary.format_version,
lsn,
file,
file_id,
@@ -436,18 +447,20 @@ impl ImageLayerInner {
)
.await?
{
let blob = block_reader
.block_cursor()
.read_blob(
offset,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build(),
)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build();
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
block_reader
.block_cursor()
.read_compressed_blob(offset, &ctx)
.await
} else {
block_reader.block_cursor().read_blob(offset, &ctx).await
})
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
@@ -539,9 +552,12 @@ impl ImageLayerInner {
.into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let compressed_storage_format = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION;
for read in reads.into_iter() {
let buf = BytesMut::with_capacity(max_vectored_read_bytes);
let res = vectored_blob_reader.read_blobs(&read, buf).await;
let res = vectored_blob_reader
.read_blobs(&read, buf, compressed_storage_format)
.await;
match res {
Ok(blobs_buf) => {
@@ -549,11 +565,31 @@ impl ImageLayerInner {
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
if meta.compression_alg == CompressionAlgorithm::LZ4 {
match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) {
Ok(decompressed) => {
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(Bytes::from(decompressed)),
);
}
Err(err) => reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::from(anyhow!(
"Failed to decompress blob from file {}: {}",
self.file.path,
err
)),
),
}
} else {
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
}
}
}
Err(err) => {
@@ -591,6 +627,7 @@ struct ImageLayerWriterInner {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
key_range: Range<Key>,
compression: CompressionAlgorithm,
lsn: Lsn,
blob_writer: BlobWriter<false>,
@@ -602,16 +639,17 @@ impl ImageLayerWriterInner {
/// Start building a new image layer.
///
async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
timeline: &Arc<Timeline>,
key_range: &Range<Key>,
lsn: Lsn,
) -> anyhow::Result<Self> {
let timeline_id = timeline.timeline_id;
let tenant_shard_id = timeline.tenant_shard_id;
let compression = timeline.get_image_layer_compression();
// Create the file initially with a temporary filename.
// We'll atomically rename it to the final name when we're done.
let path = ImageLayer::temp_path_for(
conf,
timeline.conf,
timeline_id,
tenant_shard_id,
&ImageFileName {
@@ -638,11 +676,12 @@ impl ImageLayerWriterInner {
let tree_builder = DiskBtreeBuilder::new(block_buf);
let writer = Self {
conf,
conf: timeline.conf,
path,
timeline_id,
tenant_shard_id,
key_range: key_range.clone(),
compression,
lsn,
tree: tree_builder,
blob_writer,
@@ -658,10 +697,15 @@ impl ImageLayerWriterInner {
///
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img).await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
let off = if STORAGE_FORMAT_VERSION >= COMPRESSED_STORAGE_FORMAT_VERSION {
self.blob_writer
.write_compressed_blob(img, self.compression)
.await?
} else {
let (_img, res) = self.blob_writer.write_blob(img).await;
// TODO: re-use the buffer for `img` further upstack
res?
};
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
@@ -766,17 +810,12 @@ impl ImageLayerWriter {
/// Start building a new image layer.
///
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
timeline: &Arc<Timeline>,
key_range: &Range<Key>,
lsn: Lsn,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
.await?,
),
inner: Some(ImageLayerWriterInner::new(timeline, key_range, lsn).await?),
})
}

View File

@@ -19,8 +19,8 @@ use pageserver_api::{
key::AUX_FILES_KEY,
keyspace::KeySpaceAccum,
models::{
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
EvictionPolicy, LayerMapInfo, TimelineState,
CompactionAlgorithm, CompressionAlgorithm, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, TenantShardId},
@@ -182,7 +182,7 @@ pub(crate) struct AuxFilesState {
}
pub struct Timeline {
conf: &'static PageServerConf,
pub(crate) conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
myself: Weak<Self>,
@@ -1515,6 +1515,13 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.image_layer_compression
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
@@ -3458,14 +3465,7 @@ impl Timeline {
continue;
}
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&img_range,
lsn,
)
.await?;
let mut image_layer_writer = ImageLayerWriter::new(self, &img_range, lsn).await?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(CreateImageLayersError::Other(anyhow::anyhow!(

View File

@@ -997,14 +997,7 @@ impl TimelineAdaptor {
) -> Result<(), PageReconstructError> {
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
let mut image_layer_writer = ImageLayerWriter::new(
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
key_range,
lsn,
)
.await?;
let mut image_layer_writer = ImageLayerWriter::new(&self.timeline, key_range, lsn).await?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(PageReconstructError::Other(anyhow::anyhow!(

View File

@@ -15,11 +15,11 @@
//!
//! Note that the vectored blob api does *not* go through the page cache.
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use bytes::BytesMut;
use pageserver_api::key::Key;
use pageserver_api::models::CompressionAlgorithm;
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
@@ -40,6 +40,7 @@ pub struct VectoredBlob {
pub start: usize,
pub end: usize,
pub meta: BlobMeta,
pub compression_alg: CompressionAlgorithm,
}
/// Return type of [`VectoredBlobReader::read_blobs`]
@@ -274,6 +275,7 @@ impl<'a> VectoredBlobReader<'a> {
&self,
read: &VectoredRead,
buf: BytesMut,
compressed_storage_format: bool,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
assert!(
@@ -304,35 +306,42 @@ impl<'a> VectoredBlobReader<'a> {
);
for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];
let mut offset_in_buf = (offset - start_offset) as usize;
let compression_alg = if compressed_storage_format {
offset_in_buf += 1;
CompressionAlgorithm::from_repr(buf[offset_in_buf - 1]).unwrap()
} else {
CompressionAlgorithm::NoCompression
};
let first_len_byte = buf[offset_in_buf];
// Each blob is prefixed by a header containing it's size.
// Extract the size and skip that header to find the start of the data.
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
// 1 byte case and 1 in the 4 byte case.
let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64)
(1usize, first_len_byte as usize)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64)
(4usize, u32::from_be_bytes(blob_size_buf) as usize)
};
let start = offset_in_buf + size_length;
let end = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
Some((next_blob_start_offset, _)) => {
(next_blob_start_offset - start_offset) as usize
}
None => start + blob_size,
};
assert_eq!(end - start, blob_size);
metas.push(VectoredBlob {
start: start as usize,
end: end as usize,
start,
end,
compression_alg,
meta: *meta,
})
}

View File

@@ -341,14 +341,7 @@ impl Accept for ProxyProtocolAccept {
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let conn = ready!(Pin::new(&mut self.incoming).poll_accept(cx)?);
let conn_id = uuid::Uuid::new_v4();
let span = tracing::info_span!("http_conn", ?conn_id);
{
let _enter = span.enter();
tracing::info!("accepted new TCP connection");
}
tracing::info!(protocol = self.protocol, "accepted new TCP connection");
let Some(conn) = conn else {
return Poll::Ready(None);
};
@@ -361,7 +354,6 @@ impl Accept for ProxyProtocolAccept {
.with_label_values(&[self.protocol])
.guard(),
)),
span,
})))
}
}
@@ -372,14 +364,6 @@ pin_project! {
pub inner: T,
pub connection_id: Uuid,
pub gauge: Mutex<Option<IntCounterPairGuard>>,
pub span: tracing::Span,
}
impl<S> PinnedDrop for WithConnectionGuard<S> {
fn drop(this: Pin<&mut Self>) {
let _enter = this.span.enter();
tracing::info!("HTTP connection closed")
}
}
}

View File

@@ -19,7 +19,6 @@ use rand::SeedableRng;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio_util::task::TaskTracker;
use tracing::instrument::Instrumented;
use crate::context::RequestMonitoring;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard};
@@ -31,12 +30,13 @@ use hyper::{
Body, Method, Request, Response,
};
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::Arc;
use std::task::Poll;
use tls_listener::TlsListener;
use tokio::net::TcpListener;
use tokio_util::sync::{CancellationToken, DropGuard};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn, Instrument};
use utils::http::{error::ApiError, json::json_response};
@@ -100,7 +100,12 @@ pub async fn task_main(
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
ws_connections.close(); // allows `ws_connections.wait to complete`
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming, config.handshake_timeout);
let tls_listener = TlsListener::new(
tls_acceptor,
addr_incoming,
"http",
config.handshake_timeout,
);
let make_svc = hyper::service::make_service_fn(
|stream: &tokio_rustls::server::TlsStream<
@@ -116,11 +121,6 @@ pub async fn task_main(
.take()
.expect("gauge should be set on connection start");
// Cancel all current inflight HTTP requests if the HTTP connection is closed.
let http_cancellation_token = CancellationToken::new();
let cancel_connection = http_cancellation_token.clone().drop_guard();
let span = conn.span.clone();
let client_addr = conn.inner.client_addr();
let remote_addr = conn.inner.inner.remote_addr();
let backend = backend.clone();
@@ -136,43 +136,27 @@ pub async fn task_main(
Ok(MetricService::new(
hyper::service::service_fn(move |req: Request<Body>| {
let backend = backend.clone();
let ws_connections2 = ws_connections.clone();
let ws_connections = ws_connections.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
let cancellation_handler = cancellation_handler.clone();
let http_cancellation_token = http_cancellation_token.child_token();
// `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
// By spawning the future, we ensure it never gets cancelled until it decides to.
ws_connections.spawn(
async move {
// Cancel the current inflight HTTP request if the requets stream is closed.
// This is slightly different to `_cancel_connection` in that
// h2 can cancel individual requests with a `RST_STREAM`.
let _cancel_session = http_cancellation_token.clone().drop_guard();
let res = request_handler(
async move {
Ok::<_, Infallible>(
request_handler(
req,
config,
backend,
ws_connections2,
ws_connections,
cancellation_handler,
peer_addr.ip(),
endpoint_rate_limiter,
http_cancellation_token,
)
.await
.map_or_else(|e| e.into_response(), |r| r);
_cancel_session.disarm();
res
}
.in_current_span(),
)
.map_or_else(|e| e.into_response(), |r| r),
)
}
}),
gauge,
cancel_connection,
span,
))
}
},
@@ -192,23 +176,11 @@ pub async fn task_main(
struct MetricService<S> {
inner: S,
_gauge: IntCounterPairGuard,
_cancel: DropGuard,
span: tracing::Span,
}
impl<S> MetricService<S> {
fn new(
inner: S,
_gauge: IntCounterPairGuard,
_cancel: DropGuard,
span: tracing::Span,
) -> MetricService<S> {
MetricService {
inner,
_gauge,
_cancel,
span,
}
fn new(inner: S, _gauge: IntCounterPairGuard) -> MetricService<S> {
MetricService { inner, _gauge }
}
}
@@ -218,16 +190,14 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future = Instrumented<S::Future>;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
self.span
.in_scope(|| self.inner.call(req))
.instrument(self.span.clone())
self.inner.call(req)
}
}
@@ -240,8 +210,6 @@ async fn request_handler(
cancellation_handler: Arc<CancellationHandler>,
peer_addr: IpAddr,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
// used to cancel in-flight HTTP requests. not used to cancel websockets
http_cancellation_token: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let session_id = uuid::Uuid::new_v4();
@@ -285,7 +253,7 @@ async fn request_handler(
let ctx = RequestMonitoring::new(session_id, peer_addr, "http", &config.region);
let span = ctx.span.clone();
sql_over_http::handle(config, ctx, request, backend, http_cancellation_token)
sql_over_http::handle(config, ctx, request, backend)
.instrument(span)
.await
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {

View File

@@ -217,8 +217,8 @@ pub async fn handle(
mut ctx: RequestMonitoring,
request: Request<Body>,
backend: Arc<PoolingBackend>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
let handle = tokio::spawn(async move {
time::sleep(config.http_config.request_timeout).await;

View File

@@ -13,7 +13,7 @@ use tokio::{
time::timeout,
};
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use tracing::{info, warn, Instrument};
use tracing::{info, warn};
use crate::{
metrics::TLS_HANDSHAKE_FAILURES,
@@ -29,17 +29,24 @@ pin_project! {
tls: TlsAcceptor,
waiting: JoinSet<Option<TlsStream<A::Conn>>>,
timeout: Duration,
protocol: &'static str,
}
}
impl<A: Accept> TlsListener<A> {
/// Create a `TlsListener` with default options.
pub(crate) fn new(tls: TlsAcceptor, listener: A, timeout: Duration) -> Self {
pub(crate) fn new(
tls: TlsAcceptor,
listener: A,
protocol: &'static str,
timeout: Duration,
) -> Self {
TlsListener {
listener,
tls,
waiting: JoinSet::new(),
timeout,
protocol,
}
}
}
@@ -66,7 +73,7 @@ where
Poll::Ready(Some(Ok(mut conn))) => {
let t = *this.timeout;
let tls = this.tls.clone();
let span = conn.span.clone();
let protocol = *this.protocol;
this.waiting.spawn(async move {
let peer_addr = match conn.inner.wait_for_addr().await {
Ok(Some(addr)) => addr,
@@ -79,24 +86,21 @@ where
let accept = tls.accept(conn);
match timeout(t, accept).await {
Ok(Ok(conn)) => {
info!(%peer_addr, "accepted new TLS connection");
Some(conn)
},
Ok(Ok(conn)) => Some(conn),
// The handshake failed, try getting another connection from the queue
Ok(Err(e)) => {
TLS_HANDSHAKE_FAILURES.inc();
warn!(%peer_addr, "failed to accept TLS connection: {e:?}");
warn!(%peer_addr, protocol, "failed to accept TLS connection: {e:?}");
None
}
// The handshake timed out, try getting another connection from the queue
Err(_) => {
TLS_HANDSHAKE_FAILURES.inc();
warn!(%peer_addr, "failed to accept TLS connection: timeout");
warn!(%peer_addr, protocol, "failed to accept TLS connection: timeout");
None
}
}
}.instrument(span));
});
}
Poll::Ready(Some(Err(e))) => {
tracing::error!("error accepting TCP connection: {e}");
@@ -108,7 +112,10 @@ where
loop {
return match this.waiting.poll_join_next(cx) {
Poll::Ready(Some(Ok(Some(conn)))) => Poll::Ready(Some(Ok(conn))),
Poll::Ready(Some(Ok(Some(conn)))) => {
info!(protocol = this.protocol, "accepted new TLS connection");
Poll::Ready(Some(Ok(conn)))
}
// The handshake failed to complete, try getting another connection from the queue
Poll::Ready(Some(Ok(None))) => continue,
// The handshake panicked or was cancelled. ignore and get another connection

View File

@@ -2,7 +2,6 @@ pytest_plugins = (
"fixtures.pg_version",
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.compute_reconfigure",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -1,62 +0,0 @@
import concurrent.futures
from typing import Any
import pytest
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.log_helper import log
from fixtures.types import TenantId
class ComputeReconfigure:
def __init__(self, server):
self.server = server
self.control_plane_compute_hook_api = f"http://{server.host}:{server.port}/notify-attach"
self.workloads = {}
def register_workload(self, workload):
self.workloads[workload.tenant_id] = workload
@pytest.fixture(scope="function")
def compute_reconfigure_listener(make_httpserver):
"""
This fixture exposes an HTTP listener for the storage controller to submit
compute notifications to us, instead of updating neon_local endpoints itself.
Although storage controller can use neon_local directly, this causes problems when
the test is also concurrently modifying endpoints. Instead, configure storage controller
to send notifications up to this test code, which will route all endpoint updates
through Workload, which has a mutex to make concurrent updates safe.
"""
server = make_httpserver
self = ComputeReconfigure(server)
# Do neon_local endpoint reconfiguration in the background so that we can
# accept a healthy rate of calls into notify-attach.
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def handler(request: Request):
assert request.json is not None
body: dict[str, Any] = request.json
log.info(f"notify-attach request: {body}")
try:
workload = self.workloads[TenantId(body["tenant_id"])]
except KeyError:
pass
else:
# This causes the endpoint to query storage controller for its location, which
# is redundant since we already have it here, but this avoids extending the
# neon_local CLI to take full lists of locations
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
return Response(status=200)
self.server.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
yield self
reconfigure_threads.shutdown()
server.clear()

View File

@@ -2177,23 +2177,6 @@ class NeonStorageController(MetricsGetter):
)
log.info("storage controller passed consistency check")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.request(
"PUT",
f"{self.env.storage_controller_api}/debug/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
headers=self.headers(TokenScope.ADMIN),
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
def __enter__(self) -> "NeonStorageController":
return self
@@ -2961,7 +2944,6 @@ class NeonProxy(PgProtocol):
user = quote(kwargs["user"])
password = quote(kwargs["password"])
expected_code = kwargs.get("expected_code")
timeout = kwargs.get("timeout")
log.info(f"Executing http query: {query}")
@@ -2975,7 +2957,6 @@ class NeonProxy(PgProtocol):
"Neon-Pool-Opt-In": "true",
},
verify=str(self.test_output_dir / "proxy.crt"),
timeout=timeout,
)
if expected_code is not None:

View File

@@ -1,4 +1,3 @@
import threading
from typing import Optional
from fixtures.log_helper import log
@@ -12,10 +11,6 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
ENDPOINT_LOCK = threading.Lock()
class Workload:
"""
@@ -46,30 +41,17 @@ class Workload:
self._endpoint: Optional[Endpoint] = None
def reconfigure(self):
"""
Request the endpoint to reconfigure based on location reported by storage controller
"""
if self._endpoint is not None:
with ENDPOINT_LOCK:
self._endpoint.reconfigure()
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
# We may be running alongside other Workloads for different tenants. Full TTID is
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
with ENDPOINT_LOCK:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id=endpoint_id,
)
self._endpoint.start(pageserver_id=pageserver_id)
else:
self._endpoint.reconfigure(pageserver_id=pageserver_id)
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id="ep-workload",
)
self._endpoint.start(pageserver_id=pageserver_id)
else:
self._endpoint.reconfigure(pageserver_id=pageserver_id)
connstring = self._endpoint.safe_psql(
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
@@ -112,7 +94,7 @@ class Workload:
else:
return False
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
assert self.expect_rows >= n
max_iters = 10
@@ -150,28 +132,22 @@ class Workload:
]
)
if ingest:
# Wait for written data to be ingested by the pageserver
for tenant_shard_id, pageserver in tenant_get_shards(
self.env, self.tenant_id, pageserver_id
):
last_flush_lsn = wait_for_last_flush_lsn(
self.env,
endpoint,
self.tenant_id,
self.timeline_id,
pageserver_id=pageserver_id,
)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
for tenant_shard_id, pageserver in tenant_get_shards(
self.env, self.tenant_id, pageserver_id
):
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = pageserver.http_client()
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
if upload:
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
else:
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
def validate(self, pageserver_id: Optional[int] = None):
endpoint = self.endpoint(pageserver_id)

View File

@@ -188,6 +188,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"max": 1000,
},
"trace_read_requests": True,
"image_layer_compression": "NoCompression",
"walreceiver_connect_timeout": "13m",
}

View File

@@ -0,0 +1,74 @@
import os
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
#
# Test image layer compression and log compression ratio
#
@pytest.mark.timeout(1000)
def test_compression(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
def calculate_layers_size(tenant, timeline):
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, tenant, timeline
)
delta_total_size = 0
image_total_size = 0
for filename in os.listdir(timeline_path):
if filename.startswith("00000") and not filename.endswith(".___temp"):
size = os.path.getsize(timeline_path + filename)
pos = filename.find("__")
if pos >= 0:
pos = filename.find("-", pos)
if pos >= 0:
delta_total_size += size
else:
image_total_size += size
log.info(f"Image layers size: {image_total_size}, delta layers size: {delta_total_size}")
return image_total_size
tenant, timeline = env.neon_cli.create_tenant(
conf={
# Use aggressive compaction and checkpoint settings
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "1 s",
"compaction_threshold": "1",
"image_layer_compression": '"LZ4"',
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
connstr = endpoint.connstr()
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
time.sleep(5) # wait sometime to let background tasks completed at PS
compressed_image_size = calculate_layers_size(tenant, timeline)
tenant, timeline = env.neon_cli.create_tenant(
conf={
# Use aggressive compaction and checkpoint settings
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "1 s",
"compaction_threshold": "1",
"image_layer_compression": '"NoCompression"',
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
connstr = endpoint.connstr()
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
time.sleep(5) # wait sometime to let background tasks completed at PS
raw_image_size = calculate_layers_size(tenant, timeline)
log.info(f"Compression ratio: {raw_image_size/compressed_image_size}")

View File

@@ -596,39 +596,3 @@ def test_sql_over_http_timeout_cancel(static_proxy: NeonProxy):
assert (
"duplicate key value violates unique constraint" in res["message"]
), "HTTP query should conflict"
def test_sql_over_http_connection_cancel(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")
static_proxy.safe_psql("create table test_table ( id int primary key )")
# insert into a table, with a unique constraint, after sleeping for n seconds
query = "WITH temp AS ( \
SELECT pg_sleep($1) as sleep, $2::int as id \
) INSERT INTO test_table (id) SELECT id FROM temp"
try:
# The request should complete before the proxy HTTP timeout triggers.
# Timeout and cancel the request on the client side before the query completes.
static_proxy.http_query(
query,
[static_proxy.http_timeout_seconds - 1, 1],
user="http",
password="http",
timeout=2,
)
except requests.exceptions.ReadTimeout:
pass
# wait until the query _would_ have been complete
time.sleep(static_proxy.http_timeout_seconds)
res = static_proxy.http_query(query, [1, 1], user="http", password="http", expected_code=200)
assert res["command"] == "INSERT", "HTTP query should insert"
assert res["rowCount"] == 1, "HTTP query should insert"
res = static_proxy.http_query(query, [0, 1], user="http", password="http", expected_code=400)
assert (
"duplicate key value violates unique constraint" in res["message"]
), "HTTP query should conflict"

View File

@@ -1,14 +1,10 @@
import os
from typing import Dict, List, Optional, Union
from typing import Dict, List, Union
import pytest
import requests
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
StorageControllerApiException,
tenant_get_shards,
)
from fixtures.remote_storage import s3_storage
@@ -499,337 +495,3 @@ def test_sharding_ingest(
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count
class Failure:
pageserver_id: Optional[int]
def apply(self, env: NeonEnv):
raise NotImplementedError()
def clear(self, env: NeonEnv):
"""
Clear the failure, in a way that should enable the system to proceed
to a totally clean state (all nodes online and reconciled)
"""
raise NotImplementedError()
def expect_available(self):
raise NotImplementedError()
def can_mitigate(self):
"""Whether Self.mitigate is available for use"""
return False
def mitigate(self, env: NeonEnv):
"""
Mitigate the failure in a way that should allow shard split to
complete and service to resume, but does not guarantee to leave
the whole world in a clean state (e.g. an Offline node might have
junk LocationConfigs on it)
"""
raise NotImplementedError()
def fails_forward(self, env: NeonEnv):
"""
If true, this failure results in a state that eventualy completes the split.
"""
return False
def expect_exception(self):
"""
How do we expect a call to the split API to fail?
"""
return StorageControllerApiException
class PageserverFailpoint(Failure):
def __init__(self, failpoint, pageserver_id, mitigate):
self.failpoint = failpoint
self.pageserver_id = pageserver_id
self._mitigate = mitigate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.allowed_errors.extend(
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
)
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
if self._mitigate:
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Active"})
def expect_available(self):
return True
def can_mitigate(self):
return self._mitigate
def mitigate(self, env):
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
class StorageControllerFailpoint(Failure):
def __init__(self, failpoint, action):
self.failpoint = failpoint
self.pageserver_id = None
self.action = action
def apply(self, env: NeonEnv):
env.storage_controller.configure_failpoints((self.failpoint, self.action))
def clear(self, env: NeonEnv):
if "panic" in self.action:
log.info("Restarting storage controller after panic")
env.storage_controller.stop()
env.storage_controller.start()
else:
env.storage_controller.configure_failpoints((self.failpoint, "off"))
def expect_available(self):
# Controller panics _do_ leave pageservers available, but our test code relies
# on using the locate API to update configurations in Workload, so we must skip
# these actions when the controller has been panicked.
return "panic" not in self.action
def can_mitigate(self):
return False
def fails_forward(self, env):
# Edge case: the very last failpoint that simulates a DB connection error, where
# the abort path will fail-forward and result in a complete split.
fail_forward = self.failpoint == "shard-split-post-complete"
# If the failure was a panic, then if we expect split to eventually (after restart)
# complete, we must restart before checking that.
if fail_forward and "panic" in self.action:
log.info("Restarting storage controller after panic")
env.storage_controller.stop()
env.storage_controller.start()
return fail_forward
def expect_exception(self):
if "panic" in self.action:
return requests.exceptions.ConnectionError
else:
return StorageControllerApiException
class NodeKill(Failure):
def __init__(self, pageserver_id, mitigate):
self.pageserver_id = pageserver_id
self._mitigate = mitigate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=True)
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.start()
def expect_available(self):
return False
def mitigate(self, env):
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
class CompositeFailure(Failure):
"""
Wrapper for failures in multiple components (e.g. a failpoint in the storage controller, *and*
stop a pageserver to interfere with rollback)
"""
def __init__(self, failures: list[Failure]):
self.failures = failures
self.pageserver_id = None
for f in failures:
if f.pageserver_id is not None:
self.pageserver_id = f.pageserver_id
break
def apply(self, env: NeonEnv):
for f in self.failures:
f.apply(env)
def clear(self, env):
for f in self.failures:
f.clear(env)
def expect_available(self):
return all(f.expect_available() for f in self.failures)
def mitigate(self, env):
for f in self.failures:
f.mitigate(env)
def expect_exception(self):
expect = set(f.expect_exception() for f in self.failures)
# We can't give a sensible response if our failures have different expectations
assert len(expect) == 1
return list(expect)[0]
@pytest.mark.parametrize(
"failure",
[
PageserverFailpoint("api-500", 1, False),
NodeKill(1, False),
PageserverFailpoint("api-500", 1, True),
NodeKill(1, True),
PageserverFailpoint("shard-split-pre-prepare", 1, False),
PageserverFailpoint("shard-split-post-prepare", 1, False),
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
PageserverFailpoint("shard-split-post-hardlink", 1, False),
PageserverFailpoint("shard-split-post-child-conf", 1, False),
PageserverFailpoint("shard-split-lsn-wait", 1, False),
PageserverFailpoint("shard-split-pre-finish", 1, False),
StorageControllerFailpoint("shard-split-validation", "return(1)"),
StorageControllerFailpoint("shard-split-post-begin", "return(1)"),
StorageControllerFailpoint("shard-split-post-remote", "return(1)"),
StorageControllerFailpoint("shard-split-post-complete", "return(1)"),
StorageControllerFailpoint("shard-split-validation", "panic(failpoint)"),
StorageControllerFailpoint("shard-split-post-begin", "panic(failpoint)"),
StorageControllerFailpoint("shard-split-post-remote", "panic(failpoint)"),
StorageControllerFailpoint("shard-split-post-complete", "panic(failpoint)"),
CompositeFailure(
[NodeKill(1, True), StorageControllerFailpoint("shard-split-post-begin", "return(1)")]
),
CompositeFailure(
[NodeKill(1, False), StorageControllerFailpoint("shard-split-post-begin", "return(1)")]
),
],
)
def test_sharding_split_failures(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
failure: Failure,
):
neon_env_builder.num_pageservers = 4
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
initial_shard_count = 2
split_shard_count = 4
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
for ps in env.pageservers:
# When we do node failures and abandon a shard, it will de-facto have old generation and
# thereby be unable to publish remote consistent LSN updates
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
assert (
failure.pageserver_id is None
or len(
env.get_pageserver(failure.pageserver_id)
.http_client()
.tenant_list_locations()["tenant_shards"]
)
> 0
)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(100)
# Put the environment into a failing state (exact meaning depends on `failure`)
failure.apply(env)
with pytest.raises(failure.expect_exception()):
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
# We expect that the overall operation will fail, but some split requests
# will have succeeded: the net result should be to return to a clean state, including
# detaching any child shards.
def assert_rolled_back(exclude_ps_id=None) -> None:
count = 0
for ps in env.pageservers:
if exclude_ps_id is not None and ps.id == exclude_ps_id:
continue
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
assert tenant_shard_id.shard_count == initial_shard_count
count += 1
assert count == initial_shard_count
def assert_split_done(exclude_ps_id=None) -> None:
count = 0
for ps in env.pageservers:
if exclude_ps_id is not None and ps.id == exclude_ps_id:
continue
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
tenant_shard_id = TenantShardId.parse(loc[0])
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
assert tenant_shard_id.shard_count == split_shard_count
count += 1
assert count == split_shard_count
def finish_split():
# Having failed+rolled back, we should be able to split again
# No failures this time; it will succeed
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
workload.churn_rows(10)
workload.validate()
if failure.expect_available():
# Even though the split failed partway through, this should not have interrupted
# clients. Disable waiting for pageservers in the workload helper, because our
# failpoints may prevent API access.
# This only applies for failure modes that leave pageserver page_service API available.
workload.churn_rows(10, upload=False, ingest=False)
workload.validate()
if failure.fails_forward(env):
log.info("Fail-forward failure, checking split eventually completes...")
# A failure type which results in eventual completion of the split
wait_until(30, 1, assert_split_done)
elif failure.can_mitigate():
log.info("Mitigating failure...")
# Mitigation phase: we expect to be able to proceed with a successful shard split
failure.mitigate(env)
# The split should appear to be rolled back from the point of view of all pageservers
# apart from the one that is offline
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
finish_split()
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
# Having cleared the failure, everything should converge to a pristine state
failure.clear(env)
wait_until(30, 1, assert_split_done)
else:
# Once we restore the faulty pageserver's API to good health, rollback should
# eventually complete.
log.info("Clearing failure...")
failure.clear(env)
wait_until(30, 1, assert_rolled_back)
# Having rolled back, the tenant should be working
workload.churn_rows(10)
workload.validate()
# Splitting again should work, since we cleared the failure
finish_split()
assert_split_done()
env.storage_controller.consistency_check()