mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-06 18:00:37 +00:00
Compare commits
15 Commits
release-pr
...
image_laye
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
70d1086e0f | ||
|
|
5a8e8baf9f | ||
|
|
57a4119a7b | ||
|
|
aaef3789b0 | ||
|
|
0b57e0b8f2 | ||
|
|
485ecbaf8f | ||
|
|
0bcbce197a | ||
|
|
19d59e58d2 | ||
|
|
ce65d13dbd | ||
|
|
18fefff026 | ||
|
|
2a69861896 | ||
|
|
98375b3896 | ||
|
|
8c60359ae5 | ||
|
|
8c7136b057 | ||
|
|
0df6c41eaa |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -282,10 +282,8 @@ dependencies = [
|
||||
"control_plane",
|
||||
"diesel",
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"metrics",
|
||||
@@ -2843,6 +2841,15 @@ version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3513,6 +3520,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"lz4_flex",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -100,6 +100,7 @@ jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11.1"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
|
||||
@@ -19,10 +19,8 @@ aws-config.workspace = true
|
||||
aws-sdk-secretsmanager.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -10,9 +10,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::auth::{Scope, SwappableJwtAuth};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
|
||||
use utils::http::request::{must_get_query_param, parse_request_param};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -556,9 +554,6 @@ pub fn make_router(
|
||||
.post("/debug/v1/consistency_check", |r| {
|
||||
request_span(r, handle_consistency_check)
|
||||
})
|
||||
.put("/debug/v1/failpoints", |r| {
|
||||
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
|
||||
})
|
||||
.get("/control/v1/tenant/:tenant_id/locate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_locate)
|
||||
})
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
|
||||
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
|
||||
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
|
||||
/// is needed at a tenant-wide granularity.
|
||||
pub(crate) struct IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
|
||||
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
|
||||
}
|
||||
|
||||
impl<T> IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
pub(crate) fn shared(
|
||||
&self,
|
||||
key: T,
|
||||
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
let entry = locked.entry(key).or_default();
|
||||
entry.clone().read_owned()
|
||||
}
|
||||
|
||||
pub(crate) fn exclusive(
|
||||
&self,
|
||||
key: T,
|
||||
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
let entry = locked.entry(key).or_default();
|
||||
entry.clone().write_owned()
|
||||
}
|
||||
|
||||
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
|
||||
/// periodic housekeeping to avoid the map growing indefinitely
|
||||
pub(crate) fn housekeeping(&self) {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
locked.retain(|_k, lock| lock.try_write().is_err())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
entities: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ use utils::seqwait::MonotonicCounter;
|
||||
mod auth;
|
||||
mod compute_hook;
|
||||
pub mod http;
|
||||
mod id_lock_map;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
pub mod persistence;
|
||||
|
||||
@@ -206,12 +206,6 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let default_panic = std::panic::take_hook();
|
||||
std::panic::set_hook(Box::new(move |info| {
|
||||
default_panic(info);
|
||||
std::process::exit(1);
|
||||
}));
|
||||
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
// We use spawn_blocking for database operations, so require approximately
|
||||
// as many blocking threads as we will open database connections.
|
||||
|
||||
@@ -83,38 +83,29 @@ impl Node {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
|
||||
match self.get_availability_transition(availability) {
|
||||
AvailabilityTransition::ToActive => {
|
||||
pub(crate) fn set_availability(
|
||||
&mut self,
|
||||
availability: NodeAvailability,
|
||||
) -> AvailabilityTransition {
|
||||
use NodeAvailability::*;
|
||||
let transition = match (self.availability, availability) {
|
||||
(Offline, Active) => {
|
||||
// Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
|
||||
// users of previously-cloned copies of the node will still see the old cancellation
|
||||
// state. For example, Reconcilers in flight will have to complete and be spawned
|
||||
// again to realize that the node has become available.
|
||||
self.cancel = CancellationToken::new();
|
||||
AvailabilityTransition::ToActive
|
||||
}
|
||||
AvailabilityTransition::ToOffline => {
|
||||
(Active, Offline) => {
|
||||
// Fire the node's cancellation token to cancel any in-flight API requests to it
|
||||
self.cancel.cancel();
|
||||
AvailabilityTransition::ToOffline
|
||||
}
|
||||
AvailabilityTransition::Unchanged => {}
|
||||
}
|
||||
_ => AvailabilityTransition::Unchanged,
|
||||
};
|
||||
self.availability = availability;
|
||||
}
|
||||
|
||||
/// Without modifying the availability of the node, convert the intended availability
|
||||
/// into a description of the transition.
|
||||
pub(crate) fn get_availability_transition(
|
||||
&self,
|
||||
availability: NodeAvailability,
|
||||
) -> AvailabilityTransition {
|
||||
use AvailabilityTransition::*;
|
||||
use NodeAvailability::*;
|
||||
|
||||
match (self.availability, availability) {
|
||||
(Offline, Active) => ToActive,
|
||||
(Active, Offline) => ToOffline,
|
||||
_ => Unchanged,
|
||||
}
|
||||
transition
|
||||
}
|
||||
|
||||
/// Whether we may send API requests to this node.
|
||||
|
||||
@@ -11,9 +11,6 @@ use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
use pageserver_api::models::TenantConfig;
|
||||
use pageserver_api::shard::ShardConfigError;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::generation::Generation;
|
||||
@@ -75,14 +72,6 @@ pub(crate) enum DatabaseError {
|
||||
Logical(String),
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) enum AbortShardSplitStatus {
|
||||
/// We aborted the split in the database by reverting to the parent shards
|
||||
Aborted,
|
||||
/// The split had already been persisted.
|
||||
Complete,
|
||||
}
|
||||
|
||||
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
||||
|
||||
impl Persistence {
|
||||
@@ -581,51 +570,6 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Used when the remote part of a shard split failed: we will revert the database state to have only
|
||||
/// the parent shards, with SplitState::Idle.
|
||||
pub(crate) async fn abort_shard_split(
|
||||
&self,
|
||||
split_tenant_id: TenantId,
|
||||
new_shard_count: ShardCount,
|
||||
) -> DatabaseResult<AbortShardSplitStatus> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
|
||||
let aborted = conn.transaction(|conn| -> DatabaseResult<AbortShardSplitStatus> {
|
||||
// Clear the splitting state on parent shards
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.ne(new_shard_count.literal() as i32))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
|
||||
// Parent shards are already gone: we cannot abort.
|
||||
if updated == 0 {
|
||||
return Ok(AbortShardSplitStatus::Complete);
|
||||
}
|
||||
|
||||
// Sanity check: if parent shards were present, their cardinality should
|
||||
// be less than the number of child shards.
|
||||
if updated >= new_shard_count.count() as usize {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Unexpected parent shard count {updated} while aborting split to \
|
||||
count {new_shard_count:?} on tenant {split_tenant_id}"
|
||||
)));
|
||||
}
|
||||
|
||||
// Erase child shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(new_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(AbortShardSplitStatus::Aborted)
|
||||
})?;
|
||||
|
||||
Ok(aborted)
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
|
||||
@@ -660,28 +604,6 @@ pub(crate) struct TenantShardPersistence {
|
||||
pub(crate) config: String,
|
||||
}
|
||||
|
||||
impl TenantShardPersistence {
|
||||
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
|
||||
if self.shard_count == 0 {
|
||||
Ok(ShardIdentity::unsharded())
|
||||
} else {
|
||||
Ok(ShardIdentity::new(
|
||||
ShardNumber(self.shard_number as u8),
|
||||
ShardCount::new(self.shard_count as u8),
|
||||
ShardStripeSize(self.shard_stripe_size as u32),
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
|
||||
Ok(TenantShardId {
|
||||
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
|
||||
shard_number: ShardNumber(self.shard_number as u8),
|
||||
shard_count: ShardCount::new(self.shard_count as u8),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::node::Node`] that are stored durably
|
||||
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
|
||||
#[diesel(table_name = crate::schema::nodes)]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use crate::persistence::Persistence;
|
||||
use crate::service;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
@@ -19,8 +18,6 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
pub(super) struct Reconciler {
|
||||
@@ -488,29 +485,17 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(Ok(observed)) => Some(observed),
|
||||
Some(Err(mgmt_api::Error::ApiError(status, _msg)))
|
||||
if status == StatusCode::NOT_FOUND =>
|
||||
{
|
||||
None
|
||||
}
|
||||
Some(Ok(observed)) => observed,
|
||||
Some(Err(e)) => return Err(e.into()),
|
||||
None => return Err(ReconcileError::Cancel),
|
||||
};
|
||||
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
|
||||
match observed_conf {
|
||||
Some(conf) => {
|
||||
// Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
|
||||
// if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
|
||||
self.observed
|
||||
.locations
|
||||
.insert(attached_node.get_id(), ObservedStateLocation { conf });
|
||||
}
|
||||
None => {
|
||||
// Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
|
||||
self.observed.locations.remove(&attached_node.get_id());
|
||||
}
|
||||
}
|
||||
self.observed.locations.insert(
|
||||
attached_node.get_id(),
|
||||
ObservedStateLocation {
|
||||
conf: observed_conf,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -540,12 +525,7 @@ impl Reconciler {
|
||||
)));
|
||||
};
|
||||
|
||||
let mut wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
);
|
||||
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
@@ -682,26 +662,10 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
/// We tweak the externally-set TenantConfig while configuring
|
||||
/// locations, using our awareness of whether secondary locations
|
||||
/// are in use to automatically enable/disable heatmap uploads.
|
||||
fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
}
|
||||
config
|
||||
}
|
||||
|
||||
pub(crate) fn attached_location_conf(
|
||||
generation: Generation,
|
||||
shard: &ShardIdentity,
|
||||
config: &TenantConfig,
|
||||
has_secondaries: bool,
|
||||
) -> LocationConfig {
|
||||
LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
@@ -710,7 +674,7 @@ pub(crate) fn attached_location_conf(
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: ha_aware_config(config, has_secondaries),
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -725,6 +689,6 @@ pub(crate) fn secondary_location_conf(
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: ha_aware_config(config, true),
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -577,12 +577,7 @@ impl TenantState {
|
||||
.generation
|
||||
.expect("Attempted to enter attached state without a generation");
|
||||
|
||||
let wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
);
|
||||
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
|
||||
@@ -384,6 +384,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
image_layer_compression: settings
|
||||
.remove("image_layer_compression")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_layer_compression' json")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
@@ -496,6 +501,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
image_layer_compression: settings
|
||||
.remove("image_layer_compression")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_layer_compression' json")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
|
||||
@@ -294,6 +294,7 @@ pub struct TenantConfig {
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
pub image_layer_compression: Option<CompressionAlgorithm>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
@@ -327,6 +328,23 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[repr(u8)]
|
||||
pub enum CompressionAlgorithm {
|
||||
NoCompression,
|
||||
LZ4,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct EvictionPolicyLayerAccessThreshold {
|
||||
#[serde(with = "humantime_serde")]
|
||||
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
lz4_flex.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -1538,6 +1538,7 @@ broker_endpoint = '{broker_endpoint}'
|
||||
|
||||
let broker_endpoint = "http://127.0.0.1:7777";
|
||||
let trace_read_requests = true;
|
||||
let image_layer_compression = pageserver_api::models::CompressionAlgorithm::LZ4;
|
||||
|
||||
let config_string = format!(
|
||||
r#"{ALL_BASE_VALUES_TOML}
|
||||
@@ -1545,7 +1546,8 @@ pg_distrib_dir='{pg_distrib_dir}'
|
||||
broker_endpoint = '{broker_endpoint}'
|
||||
|
||||
[tenant_config]
|
||||
trace_read_requests = {trace_read_requests}"#,
|
||||
trace_read_requests = {trace_read_requests}
|
||||
image_layer_compression = 'LZ4'"#,
|
||||
);
|
||||
|
||||
let toml = config_string.parse()?;
|
||||
@@ -1555,6 +1557,10 @@ trace_read_requests = {trace_read_requests}"#,
|
||||
conf.default_tenant_conf.trace_read_requests, trace_read_requests,
|
||||
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
||||
);
|
||||
assert_eq!(
|
||||
conf.default_tenant_conf.image_layer_compression, image_layer_compression,
|
||||
"Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1444,6 +1444,8 @@ components:
|
||||
type: integer
|
||||
trace_read_requests:
|
||||
type: boolean
|
||||
image_layer_compression:
|
||||
type: string
|
||||
heatmap_period:
|
||||
type: string
|
||||
TenantConfigResponse:
|
||||
|
||||
@@ -2108,16 +2108,6 @@ where
|
||||
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
|
||||
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
|
||||
{
|
||||
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
|
||||
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
|
||||
"failpoint".into()
|
||||
)));
|
||||
|
||||
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
|
||||
anyhow::anyhow!("failpoint")
|
||||
)));
|
||||
}
|
||||
|
||||
// Spawn a new task to handle the request, to protect the handler from unexpected
|
||||
// async cancellations. Most pageserver functions are not async cancellation safe.
|
||||
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task
|
||||
|
||||
@@ -42,6 +42,9 @@ use tracing::info;
|
||||
/// backwards-compatible changes to the metadata format.
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||
|
||||
/// Minimal sorage format version with compression support
|
||||
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
|
||||
// Magic constants used to identify different kinds of files
|
||||
|
||||
@@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::CompressionAlgorithm;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::models::WalRedoManagerStatus;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
@@ -2288,6 +2289,13 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.image_layer_compression
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
|
||||
}
|
||||
|
||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
@@ -3637,6 +3645,7 @@ pub(crate) mod harness {
|
||||
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
||||
image_layer_compression: Some(tenant_conf.image_layer_compression),
|
||||
eviction_policy: Some(tenant_conf.eviction_policy),
|
||||
min_resident_size_override: tenant_conf.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: Some(
|
||||
|
||||
@@ -11,13 +11,16 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use lz4_flex;
|
||||
use pageserver_api::models::CompressionAlgorithm;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_compressed_blob(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let buf = self.read_blk(blknum, ctx).await?;
|
||||
let compression_alg = CompressionAlgorithm::from_repr(buf[off]).unwrap();
|
||||
let res = self.read_blob(offset + 1, ctx).await?;
|
||||
if compression_alg == CompressionAlgorithm::LZ4 {
|
||||
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||
})
|
||||
} else {
|
||||
assert_eq!(compression_alg, CompressionAlgorithm::NoCompression);
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_blob_into_buf(
|
||||
@@ -211,6 +237,61 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
pub async fn write_compressed_blob(
|
||||
&mut self,
|
||||
srcbuf: Bytes,
|
||||
compression: CompressionAlgorithm,
|
||||
) -> Result<u64, Error> {
|
||||
let offset = self.offset;
|
||||
let len = srcbuf.len();
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
io_buf.clear();
|
||||
let mut is_compressed = false;
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
|
||||
io_buf.put_u8(len as u8);
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({} bytes)", len),
|
||||
));
|
||||
}
|
||||
if compression == CompressionAlgorithm::LZ4 && len == BLCKSZ as usize {
|
||||
let compressed = lz4_flex::block::compress(&srcbuf);
|
||||
if compressed.len() < len {
|
||||
io_buf.put_u8(compression as u8);
|
||||
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
io_buf.extend_from_slice(&compressed[..]);
|
||||
is_compressed = true;
|
||||
}
|
||||
}
|
||||
if !is_compressed {
|
||||
io_buf.put_u8(CompressionAlgorithm::NoCompression as u8);
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
}
|
||||
}
|
||||
let (io_buf, hdr_res) = self.write_all(io_buf).await;
|
||||
match hdr_res {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
self.io_buf = Some(io_buf);
|
||||
if is_compressed {
|
||||
hdr_res.map(|_| offset)
|
||||
} else {
|
||||
let (_buf, res) = self.write_all(srcbuf).await;
|
||||
res.map(|_| offset)
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
@@ -227,7 +308,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
self.write_all(io_buf).await
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
@@ -242,8 +322,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
.await;
|
||||
self.io_buf = Some(io_buf);
|
||||
|
||||
@@ -9,9 +9,9 @@
|
||||
//! may lead to a data loss.
|
||||
//!
|
||||
use anyhow::bail;
|
||||
use pageserver_api::models::CompactionAlgorithm;
|
||||
use pageserver_api::models::EvictionPolicy;
|
||||
use pageserver_api::models::{self, ThrottleConfig};
|
||||
use pageserver_api::models::{
|
||||
self, CompactionAlgorithm, CompressionAlgorithm, EvictionPolicy, ThrottleConfig,
|
||||
};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::de::IntoDeserializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -41,6 +41,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
|
||||
super::CompactionAlgorithm::Legacy;
|
||||
|
||||
pub const DEFAULT_COMPRESSION_ALGORITHM: super::CompressionAlgorithm =
|
||||
super::CompressionAlgorithm::LZ4;
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
|
||||
// Large DEFAULT_GC_PERIOD is fine as long as PITR_INTERVAL is larger.
|
||||
@@ -345,6 +348,7 @@ pub struct TenantConf {
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub trace_read_requests: bool,
|
||||
pub image_layer_compression: CompressionAlgorithm,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
@@ -429,6 +433,10 @@ pub struct TenantConfOpt {
|
||||
#[serde(default)]
|
||||
pub trace_read_requests: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub image_layer_compression: Option<CompressionAlgorithm>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
@@ -492,6 +500,9 @@ impl TenantConfOpt {
|
||||
trace_read_requests: self
|
||||
.trace_read_requests
|
||||
.unwrap_or(global_conf.trace_read_requests),
|
||||
image_layer_compression: self
|
||||
.image_layer_compression
|
||||
.unwrap_or(global_conf.image_layer_compression),
|
||||
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
|
||||
min_resident_size_override: self
|
||||
.min_resident_size_override
|
||||
@@ -538,6 +549,7 @@ impl Default for TenantConf {
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
trace_read_requests: false,
|
||||
image_layer_compression: DEFAULT_COMPRESSION_ALGORITHM,
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
@@ -612,6 +624,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
trace_read_requests: value.trace_read_requests,
|
||||
image_layer_compression: value.image_layer_compression,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
|
||||
@@ -1443,35 +1443,6 @@ impl TenantManager {
|
||||
new_shard_count: ShardCount,
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let r = self
|
||||
.do_shard_split(tenant_shard_id, new_shard_count, new_stripe_size, ctx)
|
||||
.await;
|
||||
if r.is_err() {
|
||||
// Shard splitting might have left the original shard in a partially shut down state (it
|
||||
// stops the shard's remote timeline client). Reset it to ensure we leave things in
|
||||
// a working state.
|
||||
if self.get(tenant_shard_id).is_some() {
|
||||
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
|
||||
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
|
||||
// Log this error because our return value will still be the original error, not this one. This is
|
||||
// a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
|
||||
// (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
|
||||
// setting it broken probably won't help either.
|
||||
tracing::error!("Failed to reset {tenant_shard_id}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r
|
||||
}
|
||||
|
||||
pub(crate) async fn do_shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
@@ -1506,10 +1477,6 @@ impl TenantManager {
|
||||
.join(",")
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
let parent_shard_identity = tenant.shard_identity;
|
||||
let parent_tenant_conf = tenant.get_tenant_conf();
|
||||
let parent_generation = tenant.generation;
|
||||
@@ -1523,10 +1490,6 @@ impl TenantManager {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
self.resources.deletion_queue_client.flush_advisory();
|
||||
|
||||
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
|
||||
@@ -1548,16 +1511,11 @@ impl TenantManager {
|
||||
anyhow::bail!("Detached parent shard in the middle of split!")
|
||||
}
|
||||
};
|
||||
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Optimization: hardlink layers from the parent into the children, so that they don't have to
|
||||
// re-download & duplicate the data referenced in their initial IndexPart
|
||||
self.shard_split_hardlink(parent, child_shards.clone())
|
||||
.await?;
|
||||
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
||||
// child shards to reach this point.
|
||||
@@ -1597,10 +1555,6 @@ impl TenantManager {
|
||||
.await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
||||
for child_shard_id in &child_shards {
|
||||
let child_shard_id = *child_shard_id;
|
||||
@@ -1633,10 +1587,6 @@ impl TenantManager {
|
||||
timeline.timeline_id,
|
||||
target_lsn
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
|
||||
// Failure here might mean shutdown, in any case this part is an optimization
|
||||
// and we shouldn't hold up the split operation.
|
||||
@@ -1682,10 +1632,6 @@ impl TenantManager {
|
||||
},
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
parent_slot_guard.drop_old_value()?;
|
||||
|
||||
// Phase 6: Release the InProgress on the parent shard
|
||||
|
||||
@@ -724,6 +724,8 @@ impl DeltaLayerInner {
|
||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// assume backward compatibility
|
||||
expected_summary.format_version = actual_summary.format_version;
|
||||
// production code path
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
@@ -966,7 +968,7 @@ impl DeltaLayerInner {
|
||||
// track when a key is done.
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"))
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"), false)
|
||||
.await;
|
||||
|
||||
let blobs_buf = match res {
|
||||
|
||||
@@ -39,14 +39,17 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use crate::{
|
||||
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||
};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{CompressionAlgorithm, LayerAccessKind};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -153,6 +156,7 @@ pub struct ImageLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
format_version: u16,
|
||||
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -167,6 +171,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
f.debug_struct("ImageLayerInner")
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.field("format_version", &self.format_version)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -391,7 +396,12 @@ impl ImageLayerInner {
|
||||
let actual_summary =
|
||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||
|
||||
if actual_summary.format_version > COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||
bail!("Forward compatibility of storage is not supported: current format version is {}, format version of layer {} is {}", COMPRESSED_STORAGE_FORMAT_VERSION, path, actual_summary.format_version);
|
||||
}
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// assume backward compatibility
|
||||
expected_summary.format_version = actual_summary.format_version;
|
||||
// production code path
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
@@ -408,6 +418,7 @@ impl ImageLayerInner {
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
format_version: actual_summary.format_version,
|
||||
lsn,
|
||||
file,
|
||||
file_id,
|
||||
@@ -436,18 +447,20 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
offset,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build();
|
||||
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||
block_reader
|
||||
.block_cursor()
|
||||
.read_compressed_blob(offset, &ctx)
|
||||
.await
|
||||
} else {
|
||||
block_reader.block_cursor().read_blob(offset, &ctx).await
|
||||
})
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
|
||||
let value = Bytes::from(blob);
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
@@ -539,9 +552,12 @@ impl ImageLayerInner {
|
||||
.into();
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
let compressed_storage_format = self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION;
|
||||
for read in reads.into_iter() {
|
||||
let buf = BytesMut::with_capacity(max_vectored_read_bytes);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf).await;
|
||||
let res = vectored_blob_reader
|
||||
.read_blobs(&read, buf, compressed_storage_format)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
@@ -549,11 +565,31 @@ impl ImageLayerInner {
|
||||
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let img_buf = frozen_buf.slice(meta.start..meta.end);
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(img_buf),
|
||||
);
|
||||
if meta.compression_alg == CompressionAlgorithm::LZ4 {
|
||||
match lz4_flex::block::decompress(&img_buf, BLCKSZ as usize) {
|
||||
Ok(decompressed) => {
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(Bytes::from(decompressed)),
|
||||
);
|
||||
}
|
||||
Err(err) => reconstruct_state.on_key_error(
|
||||
meta.meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to decompress blob from file {}: {}",
|
||||
self.file.path,
|
||||
err
|
||||
)),
|
||||
),
|
||||
}
|
||||
} else {
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(img_buf),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -591,6 +627,7 @@ struct ImageLayerWriterInner {
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: Range<Key>,
|
||||
compression: CompressionAlgorithm,
|
||||
lsn: Lsn,
|
||||
|
||||
blob_writer: BlobWriter<false>,
|
||||
@@ -602,16 +639,17 @@ impl ImageLayerWriterInner {
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline: &Arc<Timeline>,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Self> {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let tenant_shard_id = timeline.tenant_shard_id;
|
||||
let compression = timeline.get_image_layer_compression();
|
||||
// Create the file initially with a temporary filename.
|
||||
// We'll atomically rename it to the final name when we're done.
|
||||
let path = ImageLayer::temp_path_for(
|
||||
conf,
|
||||
timeline.conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
&ImageFileName {
|
||||
@@ -638,11 +676,12 @@ impl ImageLayerWriterInner {
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
let writer = Self {
|
||||
conf,
|
||||
conf: timeline.conf,
|
||||
path,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
key_range: key_range.clone(),
|
||||
compression,
|
||||
lsn,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
@@ -658,10 +697,15 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
let off = if STORAGE_FORMAT_VERSION >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||
self.blob_writer
|
||||
.write_compressed_blob(img, self.compression)
|
||||
.await?
|
||||
} else {
|
||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
res?
|
||||
};
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
@@ -766,17 +810,12 @@ impl ImageLayerWriter {
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline: &Arc<Timeline>,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
Ok(Self {
|
||||
inner: Some(
|
||||
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
|
||||
.await?,
|
||||
),
|
||||
inner: Some(ImageLayerWriterInner::new(timeline, key_range, lsn).await?),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -19,8 +19,8 @@ use pageserver_api::{
|
||||
key::AUX_FILES_KEY,
|
||||
keyspace::KeySpaceAccum,
|
||||
models::{
|
||||
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
EvictionPolicy, LayerMapInfo, TimelineState,
|
||||
CompactionAlgorithm, CompressionAlgorithm, DownloadRemoteLayersTaskInfo,
|
||||
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState,
|
||||
},
|
||||
reltag::BlockNumber,
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -182,7 +182,7 @@ pub(crate) struct AuxFilesState {
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
pub(crate) conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||
|
||||
myself: Weak<Self>,
|
||||
@@ -1515,6 +1515,13 @@ impl Timeline {
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
pub fn get_image_layer_compression(&self) -> CompressionAlgorithm {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.image_layer_compression
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_layer_compression)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
@@ -3458,14 +3465,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
)
|
||||
.await?;
|
||||
let mut image_layer_writer = ImageLayerWriter::new(self, &img_range, lsn).await?;
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
|
||||
@@ -997,14 +997,7 @@ impl TimelineAdaptor {
|
||||
) -> Result<(), PageReconstructError> {
|
||||
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.timeline.conf,
|
||||
self.timeline.timeline_id,
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
)
|
||||
.await?;
|
||||
let mut image_layer_writer = ImageLayerWriter::new(&self.timeline, key_range, lsn).await?;
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
|
||||
@@ -15,11 +15,11 @@
|
||||
//!
|
||||
//! Note that the vectored blob api does *not* go through the page cache.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::CompressionAlgorithm;
|
||||
use std::collections::BTreeMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::vec_map::VecMap;
|
||||
|
||||
@@ -40,6 +40,7 @@ pub struct VectoredBlob {
|
||||
pub start: usize,
|
||||
pub end: usize,
|
||||
pub meta: BlobMeta,
|
||||
pub compression_alg: CompressionAlgorithm,
|
||||
}
|
||||
|
||||
/// Return type of [`VectoredBlobReader::read_blobs`]
|
||||
@@ -274,6 +275,7 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
&self,
|
||||
read: &VectoredRead,
|
||||
buf: BytesMut,
|
||||
compressed_storage_format: bool,
|
||||
) -> Result<VectoredBlobsBuf, std::io::Error> {
|
||||
assert!(read.size() > 0);
|
||||
assert!(
|
||||
@@ -304,35 +306,42 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
);
|
||||
|
||||
for ((offset, meta), next) in pairs {
|
||||
let offset_in_buf = offset - start_offset;
|
||||
let first_len_byte = buf[offset_in_buf as usize];
|
||||
let mut offset_in_buf = (offset - start_offset) as usize;
|
||||
let compression_alg = if compressed_storage_format {
|
||||
offset_in_buf += 1;
|
||||
CompressionAlgorithm::from_repr(buf[offset_in_buf - 1]).unwrap()
|
||||
} else {
|
||||
CompressionAlgorithm::NoCompression
|
||||
};
|
||||
let first_len_byte = buf[offset_in_buf];
|
||||
|
||||
// Each blob is prefixed by a header containing it's size.
|
||||
// Extract the size and skip that header to find the start of the data.
|
||||
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
|
||||
// 1 byte case and 1 in the 4 byte case.
|
||||
let (size_length, blob_size) = if first_len_byte < 0x80 {
|
||||
(1, first_len_byte as u64)
|
||||
(1usize, first_len_byte as usize)
|
||||
} else {
|
||||
let mut blob_size_buf = [0u8; 4];
|
||||
let offset_in_buf = offset_in_buf as usize;
|
||||
|
||||
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
|
||||
blob_size_buf[0] &= 0x7f;
|
||||
(4, u32::from_be_bytes(blob_size_buf) as u64)
|
||||
(4usize, u32::from_be_bytes(blob_size_buf) as usize)
|
||||
};
|
||||
|
||||
let start = offset_in_buf + size_length;
|
||||
let end = match next {
|
||||
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
|
||||
Some((next_blob_start_offset, _)) => {
|
||||
(next_blob_start_offset - start_offset) as usize
|
||||
}
|
||||
None => start + blob_size,
|
||||
};
|
||||
|
||||
assert_eq!(end - start, blob_size);
|
||||
|
||||
metas.push(VectoredBlob {
|
||||
start: start as usize,
|
||||
end: end as usize,
|
||||
start,
|
||||
end,
|
||||
compression_alg,
|
||||
meta: *meta,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -341,14 +341,7 @@ impl Accept for ProxyProtocolAccept {
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
||||
let conn = ready!(Pin::new(&mut self.incoming).poll_accept(cx)?);
|
||||
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
let span = tracing::info_span!("http_conn", ?conn_id);
|
||||
{
|
||||
let _enter = span.enter();
|
||||
tracing::info!("accepted new TCP connection");
|
||||
}
|
||||
|
||||
tracing::info!(protocol = self.protocol, "accepted new TCP connection");
|
||||
let Some(conn) = conn else {
|
||||
return Poll::Ready(None);
|
||||
};
|
||||
@@ -361,7 +354,6 @@ impl Accept for ProxyProtocolAccept {
|
||||
.with_label_values(&[self.protocol])
|
||||
.guard(),
|
||||
)),
|
||||
span,
|
||||
})))
|
||||
}
|
||||
}
|
||||
@@ -372,14 +364,6 @@ pin_project! {
|
||||
pub inner: T,
|
||||
pub connection_id: Uuid,
|
||||
pub gauge: Mutex<Option<IntCounterPairGuard>>,
|
||||
pub span: tracing::Span,
|
||||
}
|
||||
|
||||
impl<S> PinnedDrop for WithConnectionGuard<S> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let _enter = this.span.enter();
|
||||
tracing::info!("HTTP connection closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ use rand::SeedableRng;
|
||||
pub use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
use tokio_util::task::TaskTracker;
|
||||
use tracing::instrument::Instrumented;
|
||||
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard};
|
||||
@@ -31,12 +30,13 @@ use hyper::{
|
||||
Body, Method, Request, Response,
|
||||
};
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use tls_listener::TlsListener;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn, Instrument};
|
||||
use utils::http::{error::ApiError, json::json_response};
|
||||
|
||||
@@ -100,7 +100,12 @@ pub async fn task_main(
|
||||
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
ws_connections.close(); // allows `ws_connections.wait to complete`
|
||||
|
||||
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming, config.handshake_timeout);
|
||||
let tls_listener = TlsListener::new(
|
||||
tls_acceptor,
|
||||
addr_incoming,
|
||||
"http",
|
||||
config.handshake_timeout,
|
||||
);
|
||||
|
||||
let make_svc = hyper::service::make_service_fn(
|
||||
|stream: &tokio_rustls::server::TlsStream<
|
||||
@@ -116,11 +121,6 @@ pub async fn task_main(
|
||||
.take()
|
||||
.expect("gauge should be set on connection start");
|
||||
|
||||
// Cancel all current inflight HTTP requests if the HTTP connection is closed.
|
||||
let http_cancellation_token = CancellationToken::new();
|
||||
let cancel_connection = http_cancellation_token.clone().drop_guard();
|
||||
|
||||
let span = conn.span.clone();
|
||||
let client_addr = conn.inner.client_addr();
|
||||
let remote_addr = conn.inner.inner.remote_addr();
|
||||
let backend = backend.clone();
|
||||
@@ -136,43 +136,27 @@ pub async fn task_main(
|
||||
Ok(MetricService::new(
|
||||
hyper::service::service_fn(move |req: Request<Body>| {
|
||||
let backend = backend.clone();
|
||||
let ws_connections2 = ws_connections.clone();
|
||||
let ws_connections = ws_connections.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
let cancellation_handler = cancellation_handler.clone();
|
||||
let http_cancellation_token = http_cancellation_token.child_token();
|
||||
|
||||
// `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
|
||||
// By spawning the future, we ensure it never gets cancelled until it decides to.
|
||||
ws_connections.spawn(
|
||||
async move {
|
||||
// Cancel the current inflight HTTP request if the requets stream is closed.
|
||||
// This is slightly different to `_cancel_connection` in that
|
||||
// h2 can cancel individual requests with a `RST_STREAM`.
|
||||
let _cancel_session = http_cancellation_token.clone().drop_guard();
|
||||
|
||||
let res = request_handler(
|
||||
async move {
|
||||
Ok::<_, Infallible>(
|
||||
request_handler(
|
||||
req,
|
||||
config,
|
||||
backend,
|
||||
ws_connections2,
|
||||
ws_connections,
|
||||
cancellation_handler,
|
||||
peer_addr.ip(),
|
||||
endpoint_rate_limiter,
|
||||
http_cancellation_token,
|
||||
)
|
||||
.await
|
||||
.map_or_else(|e| e.into_response(), |r| r);
|
||||
|
||||
_cancel_session.disarm();
|
||||
|
||||
res
|
||||
}
|
||||
.in_current_span(),
|
||||
)
|
||||
.map_or_else(|e| e.into_response(), |r| r),
|
||||
)
|
||||
}
|
||||
}),
|
||||
gauge,
|
||||
cancel_connection,
|
||||
span,
|
||||
))
|
||||
}
|
||||
},
|
||||
@@ -192,23 +176,11 @@ pub async fn task_main(
|
||||
struct MetricService<S> {
|
||||
inner: S,
|
||||
_gauge: IntCounterPairGuard,
|
||||
_cancel: DropGuard,
|
||||
span: tracing::Span,
|
||||
}
|
||||
|
||||
impl<S> MetricService<S> {
|
||||
fn new(
|
||||
inner: S,
|
||||
_gauge: IntCounterPairGuard,
|
||||
_cancel: DropGuard,
|
||||
span: tracing::Span,
|
||||
) -> MetricService<S> {
|
||||
MetricService {
|
||||
inner,
|
||||
_gauge,
|
||||
_cancel,
|
||||
span,
|
||||
}
|
||||
fn new(inner: S, _gauge: IntCounterPairGuard) -> MetricService<S> {
|
||||
MetricService { inner, _gauge }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,16 +190,14 @@ where
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = Instrumented<S::Future>;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
|
||||
self.span
|
||||
.in_scope(|| self.inner.call(req))
|
||||
.instrument(self.span.clone())
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,8 +210,6 @@ async fn request_handler(
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
peer_addr: IpAddr,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
// used to cancel in-flight HTTP requests. not used to cancel websockets
|
||||
http_cancellation_token: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
|
||||
@@ -285,7 +253,7 @@ async fn request_handler(
|
||||
let ctx = RequestMonitoring::new(session_id, peer_addr, "http", &config.region);
|
||||
let span = ctx.span.clone();
|
||||
|
||||
sql_over_http::handle(config, ctx, request, backend, http_cancellation_token)
|
||||
sql_over_http::handle(config, ctx, request, backend)
|
||||
.instrument(span)
|
||||
.await
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
|
||||
|
||||
@@ -217,8 +217,8 @@ pub async fn handle(
|
||||
mut ctx: RequestMonitoring,
|
||||
request: Request<Body>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel2 = cancel.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
time::sleep(config.http_config.request_timeout).await;
|
||||
|
||||
@@ -13,7 +13,7 @@ use tokio::{
|
||||
time::timeout,
|
||||
};
|
||||
use tokio_rustls::{server::TlsStream, TlsAcceptor};
|
||||
use tracing::{info, warn, Instrument};
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::{
|
||||
metrics::TLS_HANDSHAKE_FAILURES,
|
||||
@@ -29,17 +29,24 @@ pin_project! {
|
||||
tls: TlsAcceptor,
|
||||
waiting: JoinSet<Option<TlsStream<A::Conn>>>,
|
||||
timeout: Duration,
|
||||
protocol: &'static str,
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Accept> TlsListener<A> {
|
||||
/// Create a `TlsListener` with default options.
|
||||
pub(crate) fn new(tls: TlsAcceptor, listener: A, timeout: Duration) -> Self {
|
||||
pub(crate) fn new(
|
||||
tls: TlsAcceptor,
|
||||
listener: A,
|
||||
protocol: &'static str,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
TlsListener {
|
||||
listener,
|
||||
tls,
|
||||
waiting: JoinSet::new(),
|
||||
timeout,
|
||||
protocol,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -66,7 +73,7 @@ where
|
||||
Poll::Ready(Some(Ok(mut conn))) => {
|
||||
let t = *this.timeout;
|
||||
let tls = this.tls.clone();
|
||||
let span = conn.span.clone();
|
||||
let protocol = *this.protocol;
|
||||
this.waiting.spawn(async move {
|
||||
let peer_addr = match conn.inner.wait_for_addr().await {
|
||||
Ok(Some(addr)) => addr,
|
||||
@@ -79,24 +86,21 @@ where
|
||||
|
||||
let accept = tls.accept(conn);
|
||||
match timeout(t, accept).await {
|
||||
Ok(Ok(conn)) => {
|
||||
info!(%peer_addr, "accepted new TLS connection");
|
||||
Some(conn)
|
||||
},
|
||||
Ok(Ok(conn)) => Some(conn),
|
||||
// The handshake failed, try getting another connection from the queue
|
||||
Ok(Err(e)) => {
|
||||
TLS_HANDSHAKE_FAILURES.inc();
|
||||
warn!(%peer_addr, "failed to accept TLS connection: {e:?}");
|
||||
warn!(%peer_addr, protocol, "failed to accept TLS connection: {e:?}");
|
||||
None
|
||||
}
|
||||
// The handshake timed out, try getting another connection from the queue
|
||||
Err(_) => {
|
||||
TLS_HANDSHAKE_FAILURES.inc();
|
||||
warn!(%peer_addr, "failed to accept TLS connection: timeout");
|
||||
warn!(%peer_addr, protocol, "failed to accept TLS connection: timeout");
|
||||
None
|
||||
}
|
||||
}
|
||||
}.instrument(span));
|
||||
});
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
tracing::error!("error accepting TCP connection: {e}");
|
||||
@@ -108,7 +112,10 @@ where
|
||||
|
||||
loop {
|
||||
return match this.waiting.poll_join_next(cx) {
|
||||
Poll::Ready(Some(Ok(Some(conn)))) => Poll::Ready(Some(Ok(conn))),
|
||||
Poll::Ready(Some(Ok(Some(conn)))) => {
|
||||
info!(protocol = this.protocol, "accepted new TLS connection");
|
||||
Poll::Ready(Some(Ok(conn)))
|
||||
}
|
||||
// The handshake failed to complete, try getting another connection from the queue
|
||||
Poll::Ready(Some(Ok(None))) => continue,
|
||||
// The handshake panicked or was cancelled. ignore and get another connection
|
||||
|
||||
@@ -2,7 +2,6 @@ pytest_plugins = (
|
||||
"fixtures.pg_version",
|
||||
"fixtures.parametrize",
|
||||
"fixtures.httpserver",
|
||||
"fixtures.compute_reconfigure",
|
||||
"fixtures.neon_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.pg_stats",
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
import concurrent.futures
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
class ComputeReconfigure:
|
||||
def __init__(self, server):
|
||||
self.server = server
|
||||
self.control_plane_compute_hook_api = f"http://{server.host}:{server.port}/notify-attach"
|
||||
self.workloads = {}
|
||||
|
||||
def register_workload(self, workload):
|
||||
self.workloads[workload.tenant_id] = workload
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def compute_reconfigure_listener(make_httpserver):
|
||||
"""
|
||||
This fixture exposes an HTTP listener for the storage controller to submit
|
||||
compute notifications to us, instead of updating neon_local endpoints itself.
|
||||
|
||||
Although storage controller can use neon_local directly, this causes problems when
|
||||
the test is also concurrently modifying endpoints. Instead, configure storage controller
|
||||
to send notifications up to this test code, which will route all endpoint updates
|
||||
through Workload, which has a mutex to make concurrent updates safe.
|
||||
"""
|
||||
server = make_httpserver
|
||||
|
||||
self = ComputeReconfigure(server)
|
||||
|
||||
# Do neon_local endpoint reconfiguration in the background so that we can
|
||||
# accept a healthy rate of calls into notify-attach.
|
||||
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
def handler(request: Request):
|
||||
assert request.json is not None
|
||||
body: dict[str, Any] = request.json
|
||||
log.info(f"notify-attach request: {body}")
|
||||
|
||||
try:
|
||||
workload = self.workloads[TenantId(body["tenant_id"])]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
# This causes the endpoint to query storage controller for its location, which
|
||||
# is redundant since we already have it here, but this avoids extending the
|
||||
# neon_local CLI to take full lists of locations
|
||||
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
self.server.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
yield self
|
||||
reconfigure_threads.shutdown()
|
||||
server.clear()
|
||||
@@ -2177,23 +2177,6 @@ class NeonStorageController(MetricsGetter):
|
||||
)
|
||||
log.info("storage controller passed consistency check")
|
||||
|
||||
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/debug/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
|
||||
def __enter__(self) -> "NeonStorageController":
|
||||
return self
|
||||
|
||||
@@ -2961,7 +2944,6 @@ class NeonProxy(PgProtocol):
|
||||
user = quote(kwargs["user"])
|
||||
password = quote(kwargs["password"])
|
||||
expected_code = kwargs.get("expected_code")
|
||||
timeout = kwargs.get("timeout")
|
||||
|
||||
log.info(f"Executing http query: {query}")
|
||||
|
||||
@@ -2975,7 +2957,6 @@ class NeonProxy(PgProtocol):
|
||||
"Neon-Pool-Opt-In": "true",
|
||||
},
|
||||
verify=str(self.test_output_dir / "proxy.crt"),
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
if expected_code is not None:
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -12,10 +11,6 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
ENDPOINT_LOCK = threading.Lock()
|
||||
|
||||
|
||||
class Workload:
|
||||
"""
|
||||
@@ -46,30 +41,17 @@ class Workload:
|
||||
|
||||
self._endpoint: Optional[Endpoint] = None
|
||||
|
||||
def reconfigure(self):
|
||||
"""
|
||||
Request the endpoint to reconfigure based on location reported by storage controller
|
||||
"""
|
||||
if self._endpoint is not None:
|
||||
with ENDPOINT_LOCK:
|
||||
self._endpoint.reconfigure()
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
# We may be running alongside other Workloads for different tenants. Full TTID is
|
||||
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
|
||||
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
|
||||
|
||||
with ENDPOINT_LOCK:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id="ep-workload",
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
connstring = self._endpoint.safe_psql(
|
||||
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
||||
@@ -112,7 +94,7 @@ class Workload:
|
||||
else:
|
||||
return False
|
||||
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
|
||||
assert self.expect_rows >= n
|
||||
|
||||
max_iters = 10
|
||||
@@ -150,28 +132,22 @@ class Workload:
|
||||
]
|
||||
)
|
||||
|
||||
if ingest:
|
||||
# Wait for written data to be ingested by the pageserver
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env,
|
||||
endpoint,
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
pageserver_id=pageserver_id,
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
|
||||
if upload:
|
||||
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
if upload:
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
|
||||
def validate(self, pageserver_id: Optional[int] = None):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
|
||||
@@ -188,6 +188,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"max": 1000,
|
||||
},
|
||||
"trace_read_requests": True,
|
||||
"image_layer_compression": "NoCompression",
|
||||
"walreceiver_connect_timeout": "13m",
|
||||
}
|
||||
|
||||
|
||||
74
test_runner/regress/test_compression.py
Normal file
74
test_runner/regress/test_compression.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
|
||||
|
||||
#
|
||||
# Test image layer compression and log compression ratio
|
||||
#
|
||||
@pytest.mark.timeout(1000)
|
||||
def test_compression(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
|
||||
def calculate_layers_size(tenant, timeline):
|
||||
timeline_path = "{}/tenants/{}/timelines/{}/".format(
|
||||
env.pageserver.workdir, tenant, timeline
|
||||
)
|
||||
delta_total_size = 0
|
||||
image_total_size = 0
|
||||
for filename in os.listdir(timeline_path):
|
||||
if filename.startswith("00000") and not filename.endswith(".___temp"):
|
||||
size = os.path.getsize(timeline_path + filename)
|
||||
pos = filename.find("__")
|
||||
if pos >= 0:
|
||||
pos = filename.find("-", pos)
|
||||
if pos >= 0:
|
||||
delta_total_size += size
|
||||
else:
|
||||
image_total_size += size
|
||||
log.info(f"Image layers size: {image_total_size}, delta layers size: {delta_total_size}")
|
||||
return image_total_size
|
||||
|
||||
tenant, timeline = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# Use aggressive compaction and checkpoint settings
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "1 s",
|
||||
"compaction_threshold": "1",
|
||||
"image_layer_compression": '"LZ4"',
|
||||
}
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
connstr = endpoint.connstr()
|
||||
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
|
||||
|
||||
time.sleep(5) # wait sometime to let background tasks completed at PS
|
||||
compressed_image_size = calculate_layers_size(tenant, timeline)
|
||||
|
||||
tenant, timeline = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# Use aggressive compaction and checkpoint settings
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "1 s",
|
||||
"compaction_threshold": "1",
|
||||
"image_layer_compression": '"NoCompression"',
|
||||
}
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
connstr = endpoint.connstr()
|
||||
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s50", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-c10", "-T25", "-Mprepared", connstr])
|
||||
|
||||
time.sleep(5) # wait sometime to let background tasks completed at PS
|
||||
raw_image_size = calculate_layers_size(tenant, timeline)
|
||||
log.info(f"Compression ratio: {raw_image_size/compressed_image_size}")
|
||||
@@ -596,39 +596,3 @@ def test_sql_over_http_timeout_cancel(static_proxy: NeonProxy):
|
||||
assert (
|
||||
"duplicate key value violates unique constraint" in res["message"]
|
||||
), "HTTP query should conflict"
|
||||
|
||||
|
||||
def test_sql_over_http_connection_cancel(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
static_proxy.safe_psql("create table test_table ( id int primary key )")
|
||||
|
||||
# insert into a table, with a unique constraint, after sleeping for n seconds
|
||||
query = "WITH temp AS ( \
|
||||
SELECT pg_sleep($1) as sleep, $2::int as id \
|
||||
) INSERT INTO test_table (id) SELECT id FROM temp"
|
||||
|
||||
try:
|
||||
# The request should complete before the proxy HTTP timeout triggers.
|
||||
# Timeout and cancel the request on the client side before the query completes.
|
||||
static_proxy.http_query(
|
||||
query,
|
||||
[static_proxy.http_timeout_seconds - 1, 1],
|
||||
user="http",
|
||||
password="http",
|
||||
timeout=2,
|
||||
)
|
||||
except requests.exceptions.ReadTimeout:
|
||||
pass
|
||||
|
||||
# wait until the query _would_ have been complete
|
||||
time.sleep(static_proxy.http_timeout_seconds)
|
||||
|
||||
res = static_proxy.http_query(query, [1, 1], user="http", password="http", expected_code=200)
|
||||
assert res["command"] == "INSERT", "HTTP query should insert"
|
||||
assert res["rowCount"] == 1, "HTTP query should insert"
|
||||
|
||||
res = static_proxy.http_query(query, [0, 1], user="http", password="http", expected_code=400)
|
||||
assert (
|
||||
"duplicate key value violates unique constraint" in res["message"]
|
||||
), "HTTP query should conflict"
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
import os
|
||||
from typing import Dict, List, Optional, Union
|
||||
from typing import Dict, List, Union
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
StorageControllerApiException,
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
@@ -499,337 +495,3 @@ def test_sharding_ingest(
|
||||
|
||||
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
||||
assert huge_layer_count <= shard_count
|
||||
|
||||
|
||||
class Failure:
|
||||
pageserver_id: Optional[int]
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
raise NotImplementedError()
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
"""
|
||||
Clear the failure, in a way that should enable the system to proceed
|
||||
to a totally clean state (all nodes online and reconciled)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def expect_available(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def can_mitigate(self):
|
||||
"""Whether Self.mitigate is available for use"""
|
||||
return False
|
||||
|
||||
def mitigate(self, env: NeonEnv):
|
||||
"""
|
||||
Mitigate the failure in a way that should allow shard split to
|
||||
complete and service to resume, but does not guarantee to leave
|
||||
the whole world in a clean state (e.g. an Offline node might have
|
||||
junk LocationConfigs on it)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def fails_forward(self, env: NeonEnv):
|
||||
"""
|
||||
If true, this failure results in a state that eventualy completes the split.
|
||||
"""
|
||||
return False
|
||||
|
||||
def expect_exception(self):
|
||||
"""
|
||||
How do we expect a call to the split API to fail?
|
||||
"""
|
||||
return StorageControllerApiException
|
||||
|
||||
|
||||
class PageserverFailpoint(Failure):
|
||||
def __init__(self, failpoint, pageserver_id, mitigate):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.allowed_errors.extend(
|
||||
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
|
||||
)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
if self._mitigate:
|
||||
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Active"})
|
||||
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
def can_mitigate(self):
|
||||
return self._mitigate
|
||||
|
||||
def mitigate(self, env):
|
||||
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
class StorageControllerFailpoint(Failure):
|
||||
def __init__(self, failpoint, action):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = None
|
||||
self.action = action
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
env.storage_controller.configure_failpoints((self.failpoint, self.action))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
if "panic" in self.action:
|
||||
log.info("Restarting storage controller after panic")
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
else:
|
||||
env.storage_controller.configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
def expect_available(self):
|
||||
# Controller panics _do_ leave pageservers available, but our test code relies
|
||||
# on using the locate API to update configurations in Workload, so we must skip
|
||||
# these actions when the controller has been panicked.
|
||||
return "panic" not in self.action
|
||||
|
||||
def can_mitigate(self):
|
||||
return False
|
||||
|
||||
def fails_forward(self, env):
|
||||
# Edge case: the very last failpoint that simulates a DB connection error, where
|
||||
# the abort path will fail-forward and result in a complete split.
|
||||
fail_forward = self.failpoint == "shard-split-post-complete"
|
||||
|
||||
# If the failure was a panic, then if we expect split to eventually (after restart)
|
||||
# complete, we must restart before checking that.
|
||||
if fail_forward and "panic" in self.action:
|
||||
log.info("Restarting storage controller after panic")
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
return fail_forward
|
||||
|
||||
def expect_exception(self):
|
||||
if "panic" in self.action:
|
||||
return requests.exceptions.ConnectionError
|
||||
else:
|
||||
return StorageControllerApiException
|
||||
|
||||
|
||||
class NodeKill(Failure):
|
||||
def __init__(self, pageserver_id, mitigate):
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
|
||||
def expect_available(self):
|
||||
return False
|
||||
|
||||
def mitigate(self, env):
|
||||
env.storage_controller.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
class CompositeFailure(Failure):
|
||||
"""
|
||||
Wrapper for failures in multiple components (e.g. a failpoint in the storage controller, *and*
|
||||
stop a pageserver to interfere with rollback)
|
||||
"""
|
||||
|
||||
def __init__(self, failures: list[Failure]):
|
||||
self.failures = failures
|
||||
|
||||
self.pageserver_id = None
|
||||
for f in failures:
|
||||
if f.pageserver_id is not None:
|
||||
self.pageserver_id = f.pageserver_id
|
||||
break
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
for f in self.failures:
|
||||
f.apply(env)
|
||||
|
||||
def clear(self, env):
|
||||
for f in self.failures:
|
||||
f.clear(env)
|
||||
|
||||
def expect_available(self):
|
||||
return all(f.expect_available() for f in self.failures)
|
||||
|
||||
def mitigate(self, env):
|
||||
for f in self.failures:
|
||||
f.mitigate(env)
|
||||
|
||||
def expect_exception(self):
|
||||
expect = set(f.expect_exception() for f in self.failures)
|
||||
|
||||
# We can't give a sensible response if our failures have different expectations
|
||||
assert len(expect) == 1
|
||||
|
||||
return list(expect)[0]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"failure",
|
||||
[
|
||||
PageserverFailpoint("api-500", 1, False),
|
||||
NodeKill(1, False),
|
||||
PageserverFailpoint("api-500", 1, True),
|
||||
NodeKill(1, True),
|
||||
PageserverFailpoint("shard-split-pre-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-post-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-child-conf", 1, False),
|
||||
PageserverFailpoint("shard-split-lsn-wait", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-finish", 1, False),
|
||||
StorageControllerFailpoint("shard-split-validation", "return(1)"),
|
||||
StorageControllerFailpoint("shard-split-post-begin", "return(1)"),
|
||||
StorageControllerFailpoint("shard-split-post-remote", "return(1)"),
|
||||
StorageControllerFailpoint("shard-split-post-complete", "return(1)"),
|
||||
StorageControllerFailpoint("shard-split-validation", "panic(failpoint)"),
|
||||
StorageControllerFailpoint("shard-split-post-begin", "panic(failpoint)"),
|
||||
StorageControllerFailpoint("shard-split-post-remote", "panic(failpoint)"),
|
||||
StorageControllerFailpoint("shard-split-post-complete", "panic(failpoint)"),
|
||||
CompositeFailure(
|
||||
[NodeKill(1, True), StorageControllerFailpoint("shard-split-post-begin", "return(1)")]
|
||||
),
|
||||
CompositeFailure(
|
||||
[NodeKill(1, False), StorageControllerFailpoint("shard-split-post-begin", "return(1)")]
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_sharding_split_failures(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
failure: Failure,
|
||||
):
|
||||
neon_env_builder.num_pageservers = 4
|
||||
neon_env_builder.control_plane_compute_hook_api = (
|
||||
compute_reconfigure_listener.control_plane_compute_hook_api
|
||||
)
|
||||
initial_shard_count = 2
|
||||
split_shard_count = 4
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
for ps in env.pageservers:
|
||||
# When we do node failures and abandon a shard, it will de-facto have old generation and
|
||||
# thereby be unable to publish remote consistent LSN updates
|
||||
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
|
||||
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
|
||||
assert (
|
||||
failure.pageserver_id is None
|
||||
or len(
|
||||
env.get_pageserver(failure.pageserver_id)
|
||||
.http_client()
|
||||
.tenant_list_locations()["tenant_shards"]
|
||||
)
|
||||
> 0
|
||||
)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
# Put the environment into a failing state (exact meaning depends on `failure`)
|
||||
failure.apply(env)
|
||||
|
||||
with pytest.raises(failure.expect_exception()):
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
|
||||
|
||||
# We expect that the overall operation will fail, but some split requests
|
||||
# will have succeeded: the net result should be to return to a clean state, including
|
||||
# detaching any child shards.
|
||||
def assert_rolled_back(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == initial_shard_count
|
||||
count += 1
|
||||
assert count == initial_shard_count
|
||||
|
||||
def assert_split_done(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == split_shard_count
|
||||
count += 1
|
||||
assert count == split_shard_count
|
||||
|
||||
def finish_split():
|
||||
# Having failed+rolled back, we should be able to split again
|
||||
# No failures this time; it will succeed
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
if failure.expect_available():
|
||||
# Even though the split failed partway through, this should not have interrupted
|
||||
# clients. Disable waiting for pageservers in the workload helper, because our
|
||||
# failpoints may prevent API access.
|
||||
# This only applies for failure modes that leave pageserver page_service API available.
|
||||
workload.churn_rows(10, upload=False, ingest=False)
|
||||
workload.validate()
|
||||
|
||||
if failure.fails_forward(env):
|
||||
log.info("Fail-forward failure, checking split eventually completes...")
|
||||
# A failure type which results in eventual completion of the split
|
||||
wait_until(30, 1, assert_split_done)
|
||||
elif failure.can_mitigate():
|
||||
log.info("Mitigating failure...")
|
||||
# Mitigation phase: we expect to be able to proceed with a successful shard split
|
||||
failure.mitigate(env)
|
||||
|
||||
# The split should appear to be rolled back from the point of view of all pageservers
|
||||
# apart from the one that is offline
|
||||
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
finish_split()
|
||||
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
# Having cleared the failure, everything should converge to a pristine state
|
||||
failure.clear(env)
|
||||
wait_until(30, 1, assert_split_done)
|
||||
else:
|
||||
# Once we restore the faulty pageserver's API to good health, rollback should
|
||||
# eventually complete.
|
||||
log.info("Clearing failure...")
|
||||
failure.clear(env)
|
||||
|
||||
wait_until(30, 1, assert_rolled_back)
|
||||
|
||||
# Having rolled back, the tenant should be working
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
# Splitting again should work, since we cleared the failure
|
||||
finish_split()
|
||||
assert_split_done()
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
Reference in New Issue
Block a user