mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-02 18:20:37 +00:00
Compare commits
13 Commits
jcsp/ha-te
...
wal_lz4_co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42d0b040f8 | ||
|
|
9832638c09 | ||
|
|
62e7638c69 | ||
|
|
0dad8e427d | ||
|
|
4cfa2fdca5 | ||
|
|
56ddf8e37f | ||
|
|
ed4bb3073f | ||
|
|
b7e7aeed4d | ||
|
|
a880178cca | ||
|
|
b09d686335 | ||
|
|
74d24582cf | ||
|
|
4834d22d2d | ||
|
|
86e8c43ddf |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -282,10 +282,8 @@ dependencies = [
|
||||
"control_plane",
|
||||
"diesel",
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"metrics",
|
||||
@@ -2843,6 +2841,15 @@ version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8"
|
||||
dependencies = [
|
||||
"twox-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3513,6 +3520,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"itertools",
|
||||
"leaky-bucket",
|
||||
"lz4_flex",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -100,6 +100,7 @@ jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
lz4_flex = "0.11.1"
|
||||
md5 = "0.7.0"
|
||||
memoffset = "0.8"
|
||||
native-tls = "0.2"
|
||||
|
||||
4
Makefile
4
Makefile
@@ -23,12 +23,12 @@ endif
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S),Linux)
|
||||
# Seccomp BPF is only available for Linux
|
||||
PG_CONFIGURE_OPTS += --with-libseccomp
|
||||
PG_CONFIGURE_OPTS += --with-lz4 --with-libseccomp
|
||||
else ifeq ($(UNAME_S),Darwin)
|
||||
# macOS with brew-installed openssl requires explicit paths
|
||||
# It can be configured with OPENSSL_PREFIX variable
|
||||
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
|
||||
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += --with-lz4 --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
|
||||
PG_CONFIGURE_OPTS += PKG_CONFIG_PATH=$(shell brew --prefix icu4c)/lib/pkgconfig
|
||||
# macOS already has bison and flex in the system, but they are old and result in postgres-v14 target failure
|
||||
# brew formulae are keg-only and not symlinked into HOMEBREW_PREFIX, force their usage
|
||||
|
||||
@@ -302,9 +302,9 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
RoleAction::Create => {
|
||||
// This branch only runs when roles are created through the console, so it is
|
||||
// safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
|
||||
// from neon_superuser.
|
||||
// from neon_superuser. (NOTE: REPLICATION has been removed from here for now).
|
||||
let mut query: String = format!(
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
|
||||
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
info!("running role create query: '{}'", &query);
|
||||
@@ -805,6 +805,18 @@ $$;"#,
|
||||
"",
|
||||
"",
|
||||
// Add new migrations below.
|
||||
r#"
|
||||
DO $$
|
||||
DECLARE
|
||||
role_name TEXT;
|
||||
BEGIN
|
||||
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
|
||||
END LOOP;
|
||||
END
|
||||
$$;"#,
|
||||
];
|
||||
|
||||
let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
|
||||
|
||||
@@ -19,10 +19,8 @@ aws-config.workspace = true
|
||||
aws-sdk-secretsmanager.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
@@ -24,13 +23,10 @@ struct ShardedComputeHookTenant {
|
||||
stripe_size: ShardStripeSize,
|
||||
shard_count: ShardCount,
|
||||
shards: Vec<(ShardNumber, NodeId)>,
|
||||
|
||||
// Async lock used for ensuring that remote compute hook calls are ordered identically to updates to this structure
|
||||
lock: Arc<tokio::sync::Mutex<()>>,
|
||||
}
|
||||
|
||||
enum ComputeHookTenant {
|
||||
Unsharded((NodeId, Arc<tokio::sync::Mutex<()>>)),
|
||||
Unsharded(NodeId),
|
||||
Sharded(ShardedComputeHookTenant),
|
||||
}
|
||||
|
||||
@@ -42,17 +38,9 @@ impl ComputeHookTenant {
|
||||
shards: vec![(tenant_shard_id.shard_number, node_id)],
|
||||
stripe_size,
|
||||
shard_count: tenant_shard_id.shard_count,
|
||||
lock: Arc::default(),
|
||||
})
|
||||
} else {
|
||||
Self::Unsharded((node_id, Arc::default()))
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lock(&self) -> &Arc<tokio::sync::Mutex<()>> {
|
||||
match self {
|
||||
Self::Unsharded((_node_id, lock)) => lock,
|
||||
Self::Sharded(sharded_tenant) => &sharded_tenant.lock,
|
||||
Self::Unsharded(node_id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,9 +53,7 @@ impl ComputeHookTenant {
|
||||
node_id: NodeId,
|
||||
) {
|
||||
match self {
|
||||
Self::Unsharded((existing_node_id, _lock))
|
||||
if tenant_shard_id.shard_count.count() == 1 =>
|
||||
{
|
||||
Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => {
|
||||
*existing_node_id = node_id
|
||||
}
|
||||
Self::Sharded(sharded_tenant)
|
||||
@@ -136,15 +122,9 @@ pub(crate) enum NotifyError {
|
||||
}
|
||||
|
||||
impl ComputeHookTenant {
|
||||
fn maybe_reconfigure(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Option<(
|
||||
ComputeHookNotifyRequest,
|
||||
impl std::future::Future<Output = tokio::sync::OwnedMutexGuard<()>>,
|
||||
)> {
|
||||
let request = match self {
|
||||
Self::Unsharded((node_id, _lock)) => Some(ComputeHookNotifyRequest {
|
||||
fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option<ComputeHookNotifyRequest> {
|
||||
match self {
|
||||
Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest {
|
||||
tenant_id,
|
||||
shards: vec![ComputeHookNotifyRequestShard {
|
||||
shard_number: ShardNumber(0),
|
||||
@@ -178,9 +158,7 @@ impl ComputeHookTenant {
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
request.map(|r| (r, self.get_lock().clone().lock_owned()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,11 +167,8 @@ impl ComputeHookTenant {
|
||||
/// the compute connection string.
|
||||
pub(super) struct ComputeHook {
|
||||
config: Config,
|
||||
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
||||
state: tokio::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
||||
authorization_header: Option<String>,
|
||||
|
||||
// This lock is only used in testing enviroments, to serialize calls into neon_lock
|
||||
neon_local_lock: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
@@ -207,7 +182,6 @@ impl ComputeHook {
|
||||
state: Default::default(),
|
||||
config,
|
||||
authorization_header,
|
||||
neon_local_lock: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,10 +190,6 @@ impl ComputeHook {
|
||||
&self,
|
||||
reconfigure_request: ComputeHookNotifyRequest,
|
||||
) -> anyhow::Result<()> {
|
||||
// neon_local updates are not safe to call concurrently, use a lock to serialize
|
||||
// all calls to this function
|
||||
let _locked = self.neon_local_lock.lock().await;
|
||||
|
||||
let env = match LocalEnv::load_config() {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
@@ -370,38 +340,30 @@ impl ComputeHook {
|
||||
stripe_size: ShardStripeSize,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let reconfigure_request = {
|
||||
let mut locked = self.state.lock().unwrap();
|
||||
let mut locked = self.state.lock().await;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
|
||||
tenant.maybe_reconfigure(tenant_shard_id.tenant_id)
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
let Some((reconfigure_request, lock_fut)) = reconfigure_request else {
|
||||
|
||||
let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id);
|
||||
let Some(reconfigure_request) = reconfigure_request else {
|
||||
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
|
||||
// until it does.
|
||||
tracing::info!("Tenant isn't yet ready to emit a notification");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Finish acquiring the tenant's async lock: this future was created inside the self.state
|
||||
// lock above, so we are guaranteed to get this lock in the same order as callers took
|
||||
// that lock. This ordering is essential: the cloud control plane must end up with the
|
||||
// same end state for the tenant that we see.
|
||||
let _guard = lock_fut.await;
|
||||
|
||||
if let Some(notify_url) = &self.config.compute_hook_url {
|
||||
self.do_notify(notify_url, reconfigure_request, cancel)
|
||||
.await
|
||||
@@ -443,7 +405,6 @@ pub(crate) mod tests {
|
||||
tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.shards
|
||||
.len(),
|
||||
1
|
||||
@@ -451,7 +412,6 @@ pub(crate) mod tests {
|
||||
assert!(tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.stripe_size
|
||||
.is_none());
|
||||
|
||||
@@ -485,7 +445,6 @@ pub(crate) mod tests {
|
||||
tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.shards
|
||||
.len(),
|
||||
2
|
||||
@@ -494,7 +453,6 @@ pub(crate) mod tests {
|
||||
tenant_state
|
||||
.maybe_reconfigure(tenant_id)
|
||||
.unwrap()
|
||||
.0
|
||||
.stripe_size,
|
||||
Some(ShardStripeSize(32768))
|
||||
);
|
||||
|
||||
@@ -10,9 +10,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::auth::{Scope, SwappableJwtAuth};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
|
||||
use utils::http::request::{must_get_query_param, parse_request_param};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -440,24 +438,6 @@ async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiEr
|
||||
state.service.tenants_dump()
|
||||
}
|
||||
|
||||
async fn handle_balance_all(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
service.balance_all()?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_balance_attached(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
service.balance_attached()?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -574,9 +554,6 @@ pub fn make_router(
|
||||
.post("/debug/v1/consistency_check", |r| {
|
||||
request_span(r, handle_consistency_check)
|
||||
})
|
||||
.put("/debug/v1/failpoints", |r| {
|
||||
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
|
||||
})
|
||||
.get("/control/v1/tenant/:tenant_id/locate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_locate)
|
||||
})
|
||||
@@ -595,12 +572,6 @@ pub fn make_router(
|
||||
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_split)
|
||||
})
|
||||
.post("/control/v1/balance/all", |r| {
|
||||
tenant_service_handler(r, handle_balance_all)
|
||||
})
|
||||
.post("/control/v1/balance/attached", |r| {
|
||||
tenant_service_handler(r, handle_balance_attached)
|
||||
})
|
||||
// Tenant operations
|
||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
|
||||
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
|
||||
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
|
||||
/// is needed at a tenant-wide granularity.
|
||||
pub(crate) struct IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
|
||||
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
|
||||
}
|
||||
|
||||
impl<T> IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
pub(crate) fn shared(
|
||||
&self,
|
||||
key: T,
|
||||
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
let entry = locked.entry(key).or_default();
|
||||
entry.clone().read_owned()
|
||||
}
|
||||
|
||||
pub(crate) fn exclusive(
|
||||
&self,
|
||||
key: T,
|
||||
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
let entry = locked.entry(key).or_default();
|
||||
entry.clone().write_owned()
|
||||
}
|
||||
|
||||
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
|
||||
/// periodic housekeeping to avoid the map growing indefinitely
|
||||
pub(crate) fn housekeeping(&self) {
|
||||
let mut locked = self.entities.lock().unwrap();
|
||||
locked.retain(|_k, lock| lock.try_write().is_err())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for IdLockMap<T>
|
||||
where
|
||||
T: Eq + PartialEq + std::hash::Hash,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
entities: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ use utils::seqwait::MonotonicCounter;
|
||||
mod auth;
|
||||
mod compute_hook;
|
||||
pub mod http;
|
||||
mod id_lock_map;
|
||||
pub mod metrics;
|
||||
mod node;
|
||||
pub mod persistence;
|
||||
|
||||
@@ -11,9 +11,6 @@ use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
use pageserver_api::models::TenantConfig;
|
||||
use pageserver_api::shard::ShardConfigError;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::generation::Generation;
|
||||
@@ -75,14 +72,6 @@ pub(crate) enum DatabaseError {
|
||||
Logical(String),
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) enum AbortShardSplitStatus {
|
||||
/// We aborted the split in the database by reverting to the parent shards
|
||||
Aborted,
|
||||
/// The split had already been persisted.
|
||||
Complete,
|
||||
}
|
||||
|
||||
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
||||
|
||||
impl Persistence {
|
||||
@@ -581,42 +570,6 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Used when the remote part of a shard split failed: we will revert the database state to have only
|
||||
/// the parent shards, with SplitState::Idle.
|
||||
pub(crate) async fn abort_shard_split(
|
||||
&self,
|
||||
split_tenant_id: TenantId,
|
||||
new_shard_count: ShardCount,
|
||||
) -> DatabaseResult<AbortShardSplitStatus> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
|
||||
let aborted = conn.transaction(|conn| -> QueryResult<AbortShardSplitStatus> {
|
||||
// Clear the splitting state on parent shards
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.ne(new_shard_count.literal() as i32))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
|
||||
// Parent shards are already gone: we cannot abort.
|
||||
if updated == 0 {
|
||||
return Ok(AbortShardSplitStatus::Complete);
|
||||
}
|
||||
|
||||
// Erase child shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(new_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(AbortShardSplitStatus::Aborted)
|
||||
})?;
|
||||
|
||||
Ok(aborted)
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
|
||||
@@ -651,28 +604,6 @@ pub(crate) struct TenantShardPersistence {
|
||||
pub(crate) config: String,
|
||||
}
|
||||
|
||||
impl TenantShardPersistence {
|
||||
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
|
||||
if self.shard_count == 0 {
|
||||
Ok(ShardIdentity::unsharded())
|
||||
} else {
|
||||
Ok(ShardIdentity::new(
|
||||
ShardNumber(self.shard_number as u8),
|
||||
ShardCount::new(self.shard_count as u8),
|
||||
ShardStripeSize(self.shard_stripe_size as u32),
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
|
||||
Ok(TenantShardId {
|
||||
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
|
||||
shard_number: ShardNumber(self.shard_number as u8),
|
||||
shard_count: ShardCount::new(self.shard_count as u8),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::node::Node`] that are stored durably
|
||||
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
|
||||
#[diesel(table_name = crate::schema::nodes)]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use crate::persistence::Persistence;
|
||||
use crate::service;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
@@ -19,8 +18,6 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
pub(super) struct Reconciler {
|
||||
@@ -488,29 +485,17 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(Ok(observed)) => Some(observed),
|
||||
Some(Err(mgmt_api::Error::ApiError(status, _msg)))
|
||||
if status == StatusCode::NOT_FOUND =>
|
||||
{
|
||||
None
|
||||
}
|
||||
Some(Ok(observed)) => observed,
|
||||
Some(Err(e)) => return Err(e.into()),
|
||||
None => return Err(ReconcileError::Cancel),
|
||||
};
|
||||
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
|
||||
match observed_conf {
|
||||
Some(conf) => {
|
||||
// Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
|
||||
// if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
|
||||
self.observed
|
||||
.locations
|
||||
.insert(attached_node.get_id(), ObservedStateLocation { conf });
|
||||
}
|
||||
None => {
|
||||
// Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
|
||||
self.observed.locations.remove(&attached_node.get_id());
|
||||
}
|
||||
}
|
||||
self.observed.locations.insert(
|
||||
attached_node.get_id(),
|
||||
ObservedStateLocation {
|
||||
conf: observed_conf,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -540,12 +525,7 @@ impl Reconciler {
|
||||
)));
|
||||
};
|
||||
|
||||
let mut wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
);
|
||||
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
@@ -682,26 +662,10 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
/// We tweak the externally-set TenantConfig while configuring
|
||||
/// locations, using our awareness of whether secondary locations
|
||||
/// are in use to automatically enable/disable heatmap uploads.
|
||||
fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
}
|
||||
config
|
||||
}
|
||||
|
||||
pub(crate) fn attached_location_conf(
|
||||
generation: Generation,
|
||||
shard: &ShardIdentity,
|
||||
config: &TenantConfig,
|
||||
has_secondaries: bool,
|
||||
) -> LocationConfig {
|
||||
LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
@@ -710,7 +674,7 @@ pub(crate) fn attached_location_conf(
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: ha_aware_config(config, has_secondaries),
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -725,6 +689,6 @@ pub(crate) fn secondary_location_conf(
|
||||
shard_number: shard.number.0,
|
||||
shard_count: shard.count.literal(),
|
||||
shard_stripe_size: shard.stripe_size.0,
|
||||
tenant_conf: ha_aware_config(config, true),
|
||||
tenant_conf: config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -577,12 +577,7 @@ impl TenantState {
|
||||
.generation
|
||||
.expect("Attempted to enter attached state without a generation");
|
||||
|
||||
let wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
);
|
||||
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
|
||||
@@ -774,10 +774,7 @@ impl Endpoint {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap();
|
||||
let client = reqwest::Client::new();
|
||||
let response = client
|
||||
.post(format!(
|
||||
"http://{}:{}/configure",
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::time::Duration;
|
||||
use anyhow::{bail, Context};
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::SinkExt;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::controller_api::NodeRegisterRequest;
|
||||
use pageserver_api::models::{
|
||||
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
|
||||
@@ -263,11 +262,6 @@ impl PageServerNode {
|
||||
match st {
|
||||
Ok(()) => Ok(true),
|
||||
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
|
||||
Err(mgmt_api::Error::ApiError(status, _msg))
|
||||
if status == StatusCode::SERVICE_UNAVAILABLE =>
|
||||
{
|
||||
Ok(false)
|
||||
}
|
||||
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
||||
}
|
||||
},
|
||||
|
||||
@@ -29,7 +29,6 @@ pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
mod hll;
|
||||
pub mod metric_vec_duration;
|
||||
pub use hll::{HyperLogLog, HyperLogLogVec};
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod more_process_metrics;
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
//! Helpers for observing duration on `HistogramVec` / `CounterVec` / `GaugeVec` / `MetricVec<T>`.
|
||||
|
||||
use std::{future::Future, time::Instant};
|
||||
|
||||
pub trait DurationResultObserver {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration);
|
||||
}
|
||||
|
||||
pub async fn observe_async_block_duration_by_result<
|
||||
T,
|
||||
E,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
O: DurationResultObserver,
|
||||
>(
|
||||
observer: &O,
|
||||
block: F,
|
||||
) -> Result<T, E> {
|
||||
let start = Instant::now();
|
||||
let result = block.await;
|
||||
let duration = start.elapsed();
|
||||
observer.observe_result(&result, duration);
|
||||
result
|
||||
}
|
||||
@@ -757,6 +757,7 @@ pub enum PagestreamBeMessage {
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
GetCompressedPage(PagestreamGetPageResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -996,6 +997,12 @@ impl PagestreamBeMessage {
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
Self::GetCompressedPage(resp) => {
|
||||
bytes.put_u8(105); /* tag from pagestore_client.h */
|
||||
bytes.put_u16(resp.page.len() as u16);
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
bytes.put_u8(Tag::Error as u8);
|
||||
bytes.put(resp.message.as_bytes());
|
||||
@@ -1078,6 +1085,7 @@ impl PagestreamBeMessage {
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
Self::GetCompressedPage(_) => "GetCompressedPage",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,6 +144,13 @@ pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<boo
|
||||
dispatch_pgversion!(version, Ok(pgv::bindings::bkpimg_is_compressed(bimg_info)))
|
||||
}
|
||||
|
||||
pub fn bkpimage_is_compressed_lz4(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
|
||||
dispatch_pgversion!(
|
||||
version,
|
||||
Ok(pgv::bindings::bkpimg_is_compressed_lz4(bimg_info))
|
||||
)
|
||||
}
|
||||
|
||||
pub fn generate_wal_segment(
|
||||
segno: u64,
|
||||
system_id: u64,
|
||||
|
||||
@@ -8,3 +8,7 @@ pub const SIZEOF_RELMAPFILE: usize = 512; /* sizeof(RelMapFile) in relmapper.c *
|
||||
pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
||||
(bimg_info & BKPIMAGE_IS_COMPRESSED) != 0
|
||||
}
|
||||
|
||||
pub fn bkpimg_is_compressed_lz4(_bimg_info: u8) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
||||
|
||||
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
||||
}
|
||||
|
||||
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
|
||||
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
|
||||
}
|
||||
|
||||
@@ -16,3 +16,7 @@ pub fn bkpimg_is_compressed(bimg_info: u8) -> bool {
|
||||
|
||||
(bimg_info & ANY_COMPRESS_FLAG) != 0
|
||||
}
|
||||
|
||||
pub fn bkpimg_is_compressed_lz4(bimg_info: u8) -> bool {
|
||||
(bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ use remote_storage::{
|
||||
};
|
||||
use test_context::test_context;
|
||||
use test_context::AsyncTestContext;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
@@ -484,32 +485,33 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
|
||||
|
||||
{
|
||||
let mut stream = ctx
|
||||
let stream = ctx
|
||||
.client
|
||||
.download(&path, &cancel)
|
||||
.await
|
||||
.expect("download succeeds")
|
||||
.download_stream;
|
||||
|
||||
let first = stream
|
||||
.next()
|
||||
.await
|
||||
.expect("should have the first blob")
|
||||
.expect("should have succeeded");
|
||||
let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));
|
||||
|
||||
tracing::info!(len = first.len(), "downloaded first chunk");
|
||||
let first = reader.fill_buf().await.expect("should have the first blob");
|
||||
|
||||
let len = first.len();
|
||||
tracing::info!(len, "downloaded first chunk");
|
||||
|
||||
assert!(
|
||||
first.len() < len,
|
||||
first.len() < file_len,
|
||||
"uploaded file is too small, we downloaded all on first chunk"
|
||||
);
|
||||
|
||||
reader.consume(len);
|
||||
|
||||
cancel.cancel();
|
||||
|
||||
let next = stream.next().await.expect("stream should have more");
|
||||
let next = reader.fill_buf().await;
|
||||
|
||||
let e = next.expect_err("expected an error, but got a chunk?");
|
||||
|
||||
@@ -520,6 +522,10 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
|
||||
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
|
||||
"{inner:?}"
|
||||
);
|
||||
|
||||
let e = DownloadError::from(e);
|
||||
|
||||
assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
lz4_flex.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -157,6 +157,7 @@ impl PagestreamClient {
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetCompressedPage(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
|
||||
@@ -2103,16 +2103,6 @@ where
|
||||
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
|
||||
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
|
||||
{
|
||||
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
|
||||
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
|
||||
"failpoint".into()
|
||||
)));
|
||||
|
||||
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
|
||||
anyhow::anyhow!("failpoint")
|
||||
)));
|
||||
}
|
||||
|
||||
// Spawn a new task to handle the request, to protect the handler from unexpected
|
||||
// async cancellations. Most pageserver functions are not async cancellation safe.
|
||||
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task
|
||||
@@ -2257,7 +2247,7 @@ pub fn make_router(
|
||||
.get("/v1/location_config", |r| {
|
||||
api_handler(r, list_location_config_handler)
|
||||
})
|
||||
.get("/v1/location_config/:tenant_shard_id", |r| {
|
||||
.get("/v1/location_config/:tenant_id", |r| {
|
||||
api_handler(r, get_location_config_handler)
|
||||
})
|
||||
.put(
|
||||
|
||||
@@ -40,7 +40,14 @@ use tracing::info;
|
||||
/// format, bump this!
|
||||
/// Note that TimelineMetadata uses its own version number to track
|
||||
/// backwards-compatible changes to the metadata format.
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Minimal sorage format version with compression support
|
||||
pub const COMPRESSED_STORAGE_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Page image compression algorithm
|
||||
pub const NO_COMPRESSION: u8 = 0;
|
||||
pub const LZ4_COMPRESSION: u8 = 0;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use enum_map::EnumMap;
|
||||
use metrics::metric_vec_duration::DurationResultObserver;
|
||||
use metrics::{
|
||||
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
|
||||
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
@@ -1283,11 +1282,65 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|
|
||||
})
|
||||
});
|
||||
|
||||
impl DurationResultObserver for BasebackupQueryTime {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
|
||||
pub(crate) struct BasebackupQueryTimeOngoingRecording<'a, 'c> {
|
||||
parent: &'a BasebackupQueryTime,
|
||||
ctx: &'c RequestContext,
|
||||
start: std::time::Instant,
|
||||
}
|
||||
|
||||
impl BasebackupQueryTime {
|
||||
pub(crate) fn start_recording<'c: 'a, 'a>(
|
||||
&'a self,
|
||||
ctx: &'c RequestContext,
|
||||
) -> BasebackupQueryTimeOngoingRecording<'_, '_> {
|
||||
let start = Instant::now();
|
||||
match ctx.micros_spent_throttled.open() {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
rate_limit.call(|| {
|
||||
warn!(error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
|
||||
});
|
||||
}
|
||||
}
|
||||
BasebackupQueryTimeOngoingRecording {
|
||||
parent: self,
|
||||
ctx,
|
||||
start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
|
||||
pub(crate) fn observe<T, E>(self, res: &Result<T, E>) {
|
||||
let elapsed = self.start.elapsed();
|
||||
let ex_throttled = self
|
||||
.ctx
|
||||
.micros_spent_throttled
|
||||
.close_and_checked_sub_from(elapsed);
|
||||
let ex_throttled = match ex_throttled {
|
||||
Ok(ex_throttled) => ex_throttled,
|
||||
Err(error) => {
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
rate_limit.call(|| {
|
||||
warn!(error, "error deducting time spent throttled; this message is logged at a global rate limit");
|
||||
});
|
||||
elapsed
|
||||
}
|
||||
};
|
||||
let label_value = if res.is_ok() { "ok" } else { "error" };
|
||||
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
|
||||
metric.observe(duration.as_secs_f64());
|
||||
let metric = self
|
||||
.parent
|
||||
.0
|
||||
.get_metric_with_label_values(&[label_value])
|
||||
.unwrap();
|
||||
metric.observe(ex_throttled.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1155,9 +1155,18 @@ impl PageServerHandler {
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
}))
|
||||
let compressed = lz4_flex::block::compress(&page);
|
||||
if compressed.len() < page.len() {
|
||||
Ok(PagestreamBeMessage::GetCompressedPage(
|
||||
PagestreamGetPageResponse {
|
||||
page: Bytes::from(compressed),
|
||||
},
|
||||
))
|
||||
} else {
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
@@ -1199,7 +1208,7 @@ impl PageServerHandler {
|
||||
prev_lsn: Option<Lsn>,
|
||||
full_backup: bool,
|
||||
gzip: bool,
|
||||
ctx: RequestContext,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -1214,7 +1223,7 @@ impl PageServerHandler {
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
info!("waiting for {}", lsn);
|
||||
timeline.wait_lsn(lsn, &ctx).await?;
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
|
||||
.context("invalid basebackup lsn")?;
|
||||
@@ -1236,7 +1245,7 @@ impl PageServerHandler {
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
@@ -1257,7 +1266,7 @@ impl PageServerHandler {
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
// shutdown the encoder to ensure the gzip footer is written
|
||||
@@ -1269,7 +1278,7 @@ impl PageServerHandler {
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
&ctx,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -1449,25 +1458,25 @@ where
|
||||
false
|
||||
};
|
||||
|
||||
::metrics::metric_vec_duration::observe_async_block_duration_by_result(
|
||||
&*metrics::BASEBACKUP_QUERY_TIME,
|
||||
async move {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
gzip,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
Result::<(), QueryError>::Ok(())
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
|
||||
let res = async {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
gzip,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
Result::<(), QueryError>::Ok(())
|
||||
}
|
||||
.await;
|
||||
metric_recording.observe(&res);
|
||||
res?;
|
||||
}
|
||||
// return pair of prev_lsn and last_lsn
|
||||
else if query_string.starts_with("get_last_record_rlsn ") {
|
||||
@@ -1563,7 +1572,7 @@ where
|
||||
prev_lsn,
|
||||
true,
|
||||
false,
|
||||
ctx,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
|
||||
@@ -16,6 +16,7 @@ use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use lz4_flex;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
||||
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -992,7 +993,15 @@ impl<'a> DatadirModification<'a> {
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
let compressed = lz4_flex::block::compress(&img);
|
||||
if compressed.len() < img.len() {
|
||||
self.put(
|
||||
rel_block_to_key(rel, blknum),
|
||||
Value::CompressedImage(Bytes::from(compressed)),
|
||||
);
|
||||
} else {
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1597,6 +1606,10 @@ impl<'a> DatadirModification<'a> {
|
||||
if let Some((_, value)) = values.last() {
|
||||
return if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else if let Value::CompressedImage(img) = value {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)
|
||||
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
|
||||
Ok(Bytes::from(decompressed))
|
||||
} else {
|
||||
// Currently, we never need to read back a WAL record that we
|
||||
// inserted in the same "transaction". All the metadata updates
|
||||
|
||||
@@ -13,6 +13,8 @@ pub use pageserver_api::key::{Key, KEY_SIZE};
|
||||
pub enum Value {
|
||||
/// An Image value contains a full copy of the value
|
||||
Image(Bytes),
|
||||
/// An compressed page image contains a full copy of the page
|
||||
CompressedImage(Bytes),
|
||||
/// A WalRecord value contains a WAL record that needs to be
|
||||
/// replayed get the full value. Replaying the WAL record
|
||||
/// might need a previous version of the value (if will_init()
|
||||
@@ -22,12 +24,17 @@ pub enum Value {
|
||||
|
||||
impl Value {
|
||||
pub fn is_image(&self) -> bool {
|
||||
matches!(self, Value::Image(_))
|
||||
match self {
|
||||
Value::Image(_) => true,
|
||||
Value::CompressedImage(_) => true,
|
||||
Value::WalRecord(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn will_init(&self) -> bool {
|
||||
match self {
|
||||
Value::Image(_) => true,
|
||||
Value::CompressedImage(_) => true,
|
||||
Value::WalRecord(rec) => rec.will_init(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -272,9 +272,6 @@ pub enum TaskKind {
|
||||
// Task that uploads a file to remote storage
|
||||
RemoteUploadTask,
|
||||
|
||||
// Task that downloads a file from remote storage
|
||||
RemoteDownloadTask,
|
||||
|
||||
// task that handles the initial downloading of all tenants
|
||||
InitialLoad,
|
||||
|
||||
|
||||
@@ -11,13 +11,16 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{LZ4_COMPRESSION, NO_COMPRESSION};
|
||||
use lz4_flex;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
@@ -32,6 +35,29 @@ impl<'a> BlockCursor<'a> {
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_compressed_blob(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let buf = self.read_blk(blknum, ctx).await?;
|
||||
let compression_alg = buf[off];
|
||||
let res = self.read_blob(offset + 1, ctx).await?;
|
||||
if compression_alg == LZ4_COMPRESSION {
|
||||
lz4_flex::block::decompress(&res, BLCKSZ as usize).map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, "decompress error")
|
||||
})
|
||||
} else {
|
||||
assert_eq!(compression_alg, NO_COMPRESSION);
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_blob_into_buf(
|
||||
@@ -211,6 +237,58 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
pub async fn write_compressed_blob(&mut self, srcbuf: Bytes) -> Result<u64, Error> {
|
||||
let offset = self.offset;
|
||||
|
||||
let len = srcbuf.len();
|
||||
|
||||
let mut io_buf = self.io_buf.take().expect("we always put it back below");
|
||||
io_buf.clear();
|
||||
let mut is_compressed = false;
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
io_buf.put_u8(len as u8);
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("blob too large ({} bytes)", len),
|
||||
));
|
||||
}
|
||||
if len == BLCKSZ as usize {
|
||||
let compressed = lz4_flex::block::compress(&srcbuf);
|
||||
if compressed.len() < len {
|
||||
io_buf.put_u8(LZ4_COMPRESSION);
|
||||
let mut len_buf = (compressed.len() as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
io_buf.extend_from_slice(&compressed[..]);
|
||||
is_compressed = true;
|
||||
}
|
||||
if is_compressed {
|
||||
io_buf.put_u8(NO_COMPRESSION);
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
let (io_buf, hdr_res) = self.write_all(io_buf).await;
|
||||
match hdr_res {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
self.io_buf = Some(io_buf);
|
||||
if is_compressed {
|
||||
hdr_res.map(|_| offset)
|
||||
} else {
|
||||
let (_buf, res) = self.write_all(srcbuf).await;
|
||||
res.map(|_| offset)
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
/// which can be used to retrieve the data later.
|
||||
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
@@ -227,7 +305,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
self.write_all(io_buf).await
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > 0x7fff_ffff {
|
||||
@@ -242,8 +319,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
let mut len_buf = (len as u32).to_be_bytes();
|
||||
len_buf[0] |= 0x80;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
self.write_all(io_buf).await
|
||||
}
|
||||
.await;
|
||||
self.io_buf = Some(io_buf);
|
||||
|
||||
@@ -1440,31 +1440,6 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let r = self
|
||||
.do_shard_split(tenant_shard_id, new_shard_count, ctx)
|
||||
.await;
|
||||
if r.is_err() {
|
||||
// Shard splitting might have left the original shard in a partially shut down state (it
|
||||
// stops the shard's remote timeline client). Reset it to ensure we leave things in
|
||||
// a working state.
|
||||
if self.get(tenant_shard_id).is_some() {
|
||||
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
|
||||
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
|
||||
// Log this error because our return value will still be the original error, not this one.
|
||||
tracing::warn!("Failed to reset {tenant_shard_id}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r
|
||||
}
|
||||
|
||||
pub(crate) async fn do_shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
@@ -1491,10 +1466,6 @@ impl TenantManager {
|
||||
.join(",")
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
|
||||
if let Err(e) = tenant.split_prepare(&child_shards).await {
|
||||
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
|
||||
@@ -1504,10 +1475,6 @@ impl TenantManager {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
self.resources.deletion_queue_client.flush_advisory();
|
||||
|
||||
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
|
||||
@@ -1529,16 +1496,11 @@ impl TenantManager {
|
||||
anyhow::bail!("Detached parent shard in the middle of split!")
|
||||
}
|
||||
};
|
||||
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Optimization: hardlink layers from the parent into the children, so that they don't have to
|
||||
// re-download & duplicate the data referenced in their initial IndexPart
|
||||
self.shard_split_hardlink(parent, child_shards.clone())
|
||||
.await?;
|
||||
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
||||
// child shards to reach this point.
|
||||
@@ -1575,10 +1537,6 @@ impl TenantManager {
|
||||
.await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
||||
for child_shard_id in &child_shards {
|
||||
let child_shard_id = *child_shard_id;
|
||||
@@ -1611,10 +1569,6 @@ impl TenantManager {
|
||||
timeline.timeline_id,
|
||||
target_lsn
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
|
||||
// Failure here might mean shutdown, in any case this part is an optimization
|
||||
// and we shouldn't hold up the split operation.
|
||||
@@ -1660,10 +1614,6 @@ impl TenantManager {
|
||||
},
|
||||
);
|
||||
|
||||
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
|
||||
parent_slot_guard.drop_old_value()?;
|
||||
|
||||
// Phase 6: Release the InProgress on the parent shard
|
||||
|
||||
@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::models::{
|
||||
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
@@ -147,12 +148,13 @@ impl ValuesReconstructState {
|
||||
lsn: Lsn,
|
||||
value: Value,
|
||||
) -> ValueReconstructSituation {
|
||||
let state = self
|
||||
let mut error: Option<PageReconstructError> = None;
|
||||
let key_state = self
|
||||
.keys
|
||||
.entry(*key)
|
||||
.or_insert(Ok(VectoredValueReconstructState::default()));
|
||||
|
||||
if let Ok(state) = state {
|
||||
let situation = if let Ok(state) = key_state {
|
||||
let key_done = match state.situation {
|
||||
ValueReconstructSituation::Complete => unreachable!(),
|
||||
ValueReconstructSituation::Continue => match value {
|
||||
@@ -160,6 +162,21 @@ impl ValuesReconstructState {
|
||||
state.img = Some((lsn, img));
|
||||
true
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
match lz4_flex::block::decompress(&img, BLCKSZ as usize) {
|
||||
Ok(decompressed) => {
|
||||
state.img = Some((lsn, Bytes::from(decompressed)));
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
error = Some(PageReconstructError::from(anyhow::anyhow!(
|
||||
"Failed to decompress blobrom virtual file: {}",
|
||||
e
|
||||
)));
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let reached_cache =
|
||||
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
|
||||
@@ -178,7 +195,11 @@ impl ValuesReconstructState {
|
||||
state.situation
|
||||
} else {
|
||||
ValueReconstructSituation::Complete
|
||||
};
|
||||
if let Some(err) = error {
|
||||
*key_state = Err(err);
|
||||
}
|
||||
situation
|
||||
}
|
||||
|
||||
/// Returns the Lsn at which this key is cached if one exists.
|
||||
|
||||
@@ -44,12 +44,13 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
@@ -813,6 +814,12 @@ impl DeltaLayerInner {
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
reconstruct_state.img = Some((entry_lsn, Bytes::from(decompressed)));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
@@ -1102,6 +1109,9 @@ impl DeltaLayerInner {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
format!(" compressed img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
@@ -1138,6 +1148,11 @@ impl DeltaLayerInner {
|
||||
let checkpoint = CheckPoint::decode(&img)?;
|
||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
let checkpoint = CheckPoint::decode(&decompressed)?;
|
||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||
}
|
||||
Value::WalRecord(_rec) => {
|
||||
println!(" unexpected walrecord value for checkpoint key");
|
||||
}
|
||||
|
||||
@@ -39,7 +39,9 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use crate::{
|
||||
COMPRESSED_STORAGE_FORMAT_VERSION, IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX,
|
||||
};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -153,6 +155,7 @@ pub struct ImageLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
format_version: u16,
|
||||
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -167,6 +170,7 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
f.debug_struct("ImageLayerInner")
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.field("format_version", &self.format_version)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -408,6 +412,7 @@ impl ImageLayerInner {
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
format_version: actual_summary.format_version,
|
||||
lsn,
|
||||
file,
|
||||
file_id,
|
||||
@@ -436,18 +441,20 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
offset,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build();
|
||||
let blob = (if self.format_version >= COMPRESSED_STORAGE_FORMAT_VERSION {
|
||||
block_reader
|
||||
.block_cursor()
|
||||
.read_compressed_blob(offset, &ctx)
|
||||
.await
|
||||
} else {
|
||||
block_reader.block_cursor().read_blob(offset, &ctx).await
|
||||
})
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
|
||||
let value = Bytes::from(blob);
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
@@ -658,10 +665,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let (_img, res) = self.blob_writer.write_blob(img).await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let off = res?;
|
||||
|
||||
let off = self.blob_writer.write_compressed_blob(img).await?;
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
|
||||
@@ -14,9 +14,12 @@ use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::walrecord;
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use lz4_flex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::*;
|
||||
@@ -133,6 +136,9 @@ impl InMemoryLayer {
|
||||
Ok(Value::Image(img)) => {
|
||||
write!(&mut desc, " img {} bytes", img.len())?;
|
||||
}
|
||||
Ok(Value::CompressedImage(img)) => {
|
||||
write!(&mut desc, " compressed img {} bytes", img.len())?;
|
||||
}
|
||||
Ok(Value::WalRecord(rec)) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
|
||||
write!(
|
||||
@@ -184,6 +190,11 @@ impl InMemoryLayer {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::CompressedImage(img) => {
|
||||
let decompressed = lz4_flex::block::decompress(&img, BLCKSZ as usize)?;
|
||||
reconstruct_state.img = Some((*entry_lsn, Bytes::from(decompressed)));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
|
||||
@@ -880,23 +880,18 @@ impl LayerInner {
|
||||
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let task_name = format!("download layer {}", self);
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
|
||||
// block tenant::mgr::remove_tenant_from_memory.
|
||||
|
||||
let this: Arc<Self> = self.clone();
|
||||
|
||||
crate::task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
crate::task_mgr::TaskKind::RemoteDownloadTask,
|
||||
Some(self.desc.tenant_shard_id),
|
||||
Some(self.desc.timeline_id),
|
||||
&task_name,
|
||||
false,
|
||||
async move {
|
||||
let guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| DownloadError::DownloadCancelled)?;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
|
||||
let _guard = guard;
|
||||
|
||||
let client = timeline
|
||||
.remote_client
|
||||
@@ -906,7 +901,7 @@ impl LayerInner {
|
||||
let result = client.download_layer_file(
|
||||
&this.desc.filename(),
|
||||
&this.metadata(),
|
||||
&crate::task_mgr::shutdown_token()
|
||||
&timeline.cancel
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -929,7 +924,6 @@ impl LayerInner {
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {},
|
||||
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
|
||||
_ = timeline.cancel.cancelled() => {},
|
||||
};
|
||||
|
||||
@@ -959,11 +953,10 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
match rx.await {
|
||||
Ok((Ok(()), permit)) => {
|
||||
if let Some(reason) = self
|
||||
|
||||
@@ -471,8 +471,9 @@ impl WalIngest {
|
||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
|
||||
// only lz4 compression of WAL is now supported, for other compression algorithms fall back to storing the original WAL record
|
||||
&& (!postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? ||
|
||||
postgres_ffi::bkpimage_is_compressed_lz4(blk.bimg_info, modification.tline.pg_version)?)
|
||||
// do not materialize null pages because them most likely be soon replaced with real data
|
||||
&& blk.bimg_len != 0
|
||||
{
|
||||
@@ -480,7 +481,21 @@ impl WalIngest {
|
||||
let img_len = blk.bimg_len as usize;
|
||||
let img_offs = blk.bimg_offset as usize;
|
||||
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||
if postgres_ffi::bkpimage_is_compressed_lz4(
|
||||
blk.bimg_info,
|
||||
modification.tline.pg_version,
|
||||
)? {
|
||||
let decompressed_img_len = (BLCKSZ - blk.hole_length) as usize;
|
||||
let decompressed = lz4_flex::block::decompress(
|
||||
&decoded.record[img_offs..img_offs + img_len],
|
||||
decompressed_img_len,
|
||||
)
|
||||
.map_err(|msg| PageReconstructError::Other(anyhow::anyhow!(msg)))?;
|
||||
assert_eq!(decompressed.len(), decompressed_img_len);
|
||||
image.extend_from_slice(&decompressed);
|
||||
} else {
|
||||
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
||||
}
|
||||
|
||||
if blk.hole_length != 0 {
|
||||
let tail = image.split_off(blk.hole_offset as usize);
|
||||
|
||||
@@ -18,10 +18,10 @@ OBJS = \
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
SHLIB_LINK = -lcurl -llz4
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
6
pgxn/neon/neon--1.1--1.0.sql
Normal file
6
pgxn/neon/neon--1.1--1.0.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
-- the order of operations is important here
|
||||
-- because the view depends on the function
|
||||
|
||||
DROP VIEW IF EXISTS neon_lfc_stats CASCADE;
|
||||
|
||||
DROP FUNCTION IF EXISTS neon_get_lfc_stats CASCADE;
|
||||
1
pgxn/neon/neon--1.2--1.1.sql
Normal file
1
pgxn/neon/neon--1.2--1.1.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP VIEW IF EXISTS NEON_STAT_FILE_CACHE CASCADE;
|
||||
1
pgxn/neon/neon--1.3--1.2.sql
Normal file
1
pgxn/neon/neon--1.3--1.2.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP FUNCTION IF EXISTS approximate_working_set_size(bool) CASCADE;
|
||||
@@ -44,6 +44,7 @@ typedef enum
|
||||
T_NeonErrorResponse,
|
||||
T_NeonDbSizeResponse,
|
||||
T_NeonGetSlruSegmentResponse,
|
||||
T_NeonGetCompressedPageResponse
|
||||
} NeonMessageTag;
|
||||
|
||||
/* base struct for c-style inheritance */
|
||||
@@ -144,6 +145,15 @@ typedef struct
|
||||
|
||||
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
uint16 compressed_size;
|
||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||
} NeonGetCompressedPageResponse;
|
||||
|
||||
#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size))
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
|
||||
@@ -45,6 +45,10 @@
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#ifdef USE_LZ4
|
||||
#include <lz4.h>
|
||||
#endif
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
@@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg)
|
||||
case T_NeonExistsResponse:
|
||||
case T_NeonNblocksResponse:
|
||||
case T_NeonGetPageResponse:
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
case T_NeonErrorResponse:
|
||||
case T_NeonDbSizeResponse:
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
@@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s)
|
||||
|
||||
Assert(msg_resp->tag == T_NeonGetPageResponse);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
}
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
{
|
||||
NeonGetCompressedPageResponse *msg_resp;
|
||||
uint16 compressed_size = pq_getmsgint(s, 2);
|
||||
msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size));
|
||||
msg_resp->tag = tag;
|
||||
msg_resp->compressed_size = compressed_size;
|
||||
memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size);
|
||||
pq_getmsgend(s);
|
||||
|
||||
Assert(msg_resp->tag == T_NeonGetCompressedPageResponse);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
}
|
||||
@@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
{
|
||||
NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg;
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\"");
|
||||
appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size);
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
{
|
||||
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
|
||||
@@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||
break;
|
||||
|
||||
case T_NeonGetCompressedPageResponse:
|
||||
{
|
||||
#ifndef USE_LZ4
|
||||
ereport(ERROR, \
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
|
||||
errmsg("compression method lz4 not supported"), \
|
||||
errdetail("This functionality requires the server to be built with lz4 support."), \
|
||||
errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
|
||||
#else
|
||||
NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp;
|
||||
int rc = LZ4_decompress_safe(cp->page,
|
||||
buffer,
|
||||
cp->compressed_size,
|
||||
BLCKSZ);
|
||||
if (rc != BLCKSZ) {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg_internal("compressed lz4 data is corrupt")));
|
||||
}
|
||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
|
||||
@@ -1518,7 +1518,6 @@ class NeonCli(AbstractNeonCli):
|
||||
conf: Optional[Dict[str, Any]] = None,
|
||||
shard_count: Optional[int] = None,
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
placement_policy: Optional[str] = None,
|
||||
set_default: bool = False,
|
||||
) -> Tuple[TenantId, TimelineId]:
|
||||
"""
|
||||
@@ -1552,9 +1551,6 @@ class NeonCli(AbstractNeonCli):
|
||||
if shard_stripe_size is not None:
|
||||
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
|
||||
|
||||
if placement_policy is not None:
|
||||
args.extend(["--placement-policy", str(placement_policy)])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
return tenant_id, timeline_id
|
||||
@@ -2172,37 +2168,6 @@ class NeonAttachmentService(MetricsGetter):
|
||||
)
|
||||
log.info("Attachment service passed consistency check")
|
||||
|
||||
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.request(
|
||||
"PUT",
|
||||
f"{self.env.attachment_service_api}/debug/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
|
||||
def balance_all(self):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/balance/all",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def balance_attached(self):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/balance/attached",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def __enter__(self) -> "NeonAttachmentService":
|
||||
return self
|
||||
|
||||
@@ -2357,16 +2322,16 @@ class NeonPageserver(PgProtocol):
|
||||
def assert_no_errors(self):
|
||||
logfile = self.workdir / "pageserver.log"
|
||||
if not logfile.exists():
|
||||
log.warning(f"Skipping log check on pageserver {self.id}: {logfile} does not exist")
|
||||
log.warning(f"Skipping log check: {logfile} does not exist")
|
||||
return
|
||||
|
||||
with logfile.open("r") as f:
|
||||
errors = scan_pageserver_log_for_errors(f, self.allowed_errors)
|
||||
|
||||
for _lineno, error in errors:
|
||||
log.info(f"not allowed error (pageserver {self.id}): {error.strip()}")
|
||||
log.info(f"not allowed error: {error.strip()}")
|
||||
|
||||
assert not errors, f"Pageserver {self.id}: {errors}"
|
||||
assert not errors
|
||||
|
||||
def assert_no_metric_errors(self):
|
||||
"""
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -12,10 +11,6 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
ENDPOINT_LOCK = threading.Lock()
|
||||
|
||||
|
||||
class Workload:
|
||||
"""
|
||||
@@ -46,30 +41,17 @@ class Workload:
|
||||
|
||||
self._endpoint: Optional[Endpoint] = None
|
||||
|
||||
def reconfigure(self):
|
||||
"""
|
||||
Request the endpoint to reconfigure based on location reported by storage controller
|
||||
"""
|
||||
if self._endpoint is not None:
|
||||
with ENDPOINT_LOCK:
|
||||
self._endpoint.reconfigure()
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
# We may be running alongside other Workloads for different tenants. Full TTID is
|
||||
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
|
||||
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
|
||||
|
||||
with ENDPOINT_LOCK:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id="ep-workload",
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
connstring = self._endpoint.safe_psql(
|
||||
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
||||
@@ -112,7 +94,7 @@ class Workload:
|
||||
else:
|
||||
return False
|
||||
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True, ingest=True):
|
||||
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
|
||||
assert self.expect_rows >= n
|
||||
|
||||
max_iters = 10
|
||||
@@ -150,28 +132,22 @@ class Workload:
|
||||
]
|
||||
)
|
||||
|
||||
if ingest:
|
||||
# Wait for written data to be ingested by the pageserver
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env,
|
||||
endpoint,
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
pageserver_id=pageserver_id,
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
for tenant_shard_id, pageserver in tenant_get_shards(
|
||||
self.env, self.tenant_id, pageserver_id
|
||||
):
|
||||
last_flush_lsn = wait_for_last_flush_lsn(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
ps_http = pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
|
||||
if upload:
|
||||
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
if upload:
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
|
||||
def validate(self, pageserver_id: Optional[int] = None):
|
||||
endpoint = self.endpoint(pageserver_id)
|
||||
|
||||
@@ -15,7 +15,7 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
|
||||
endpoint.wait_for_migrations()
|
||||
|
||||
num_migrations = 8
|
||||
num_migrations = 9
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SELECT id FROM neon_migration.migration_id")
|
||||
|
||||
@@ -29,3 +29,34 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
log.info(res)
|
||||
assert len(res) == 1
|
||||
assert len(res[0]) == 5
|
||||
|
||||
|
||||
# Verify that the neon extension can be upgraded/downgraded.
|
||||
def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_neon_extension_compatibility")
|
||||
|
||||
endpoint_main = env.endpoints.create("test_neon_extension_compatibility")
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
all_versions = ["1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.3"
|
||||
for idx, begin_version in enumerate(all_versions):
|
||||
for target_version in all_versions[idx + 1 :]:
|
||||
if current_version != begin_version:
|
||||
cur.execute(
|
||||
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {current_version}->{begin_version}"
|
||||
)
|
||||
current_version = begin_version
|
||||
# downgrade
|
||||
cur.execute(
|
||||
f"ALTER EXTENSION neon UPDATE TO '{target_version}'; -- {begin_version}->{target_version}"
|
||||
)
|
||||
# upgrade
|
||||
cur.execute(
|
||||
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}"
|
||||
)
|
||||
|
||||
@@ -1,17 +1,13 @@
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
AttachmentServiceApiException,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import Lsn, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
@@ -404,245 +400,3 @@ def test_sharding_ingest(
|
||||
|
||||
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
||||
assert huge_layer_count <= shard_count
|
||||
|
||||
|
||||
class Failure:
|
||||
pageserver_id: Optional[int]
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
raise NotImplementedError()
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
"""
|
||||
Clear the failure, in a way that should enable the system to proceed
|
||||
to a totally clean state (all nodes online and reconciled)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def expect_available(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def can_mitigate(self):
|
||||
"""Whether Self.mitigate is available for use"""
|
||||
return False
|
||||
|
||||
def mitigate(self, env: NeonEnv):
|
||||
"""
|
||||
Mitigate the failure in a way that should allow shard split to
|
||||
complete and service to resume, but does not guarantee to leave
|
||||
the whole world in a clean state (e.g. an Offline node might have
|
||||
junk LocationConfigs on it)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def fails_forward(self):
|
||||
"""
|
||||
If true, this failure results in a state that eventualy completes the split.
|
||||
"""
|
||||
return False
|
||||
|
||||
|
||||
class PageserverFailpoint(Failure):
|
||||
def __init__(self, failpoint, pageserver_id, mitigate):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.allowed_errors.extend(
|
||||
[".*failpoint.*", ".*Resetting.*after shard split failure.*"]
|
||||
)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
|
||||
if self._mitigate:
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Active"})
|
||||
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
def can_mitigate(self):
|
||||
return self._mitigate
|
||||
|
||||
def mitigate(self, env):
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
class StorageControllerFailpoint(Failure):
|
||||
def __init__(self, failpoint):
|
||||
self.failpoint = failpoint
|
||||
self.pageserver_id = None
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
env.attachment_service.configure_failpoints((self.failpoint, "return(1)"))
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
env.attachment_service.configure_failpoints((self.failpoint, "off"))
|
||||
|
||||
def expect_available(self):
|
||||
return True
|
||||
|
||||
def can_mitigate(self):
|
||||
return False
|
||||
|
||||
def fails_forward(self):
|
||||
# Edge case: the very last failpoint that simulates a DB connection error, where
|
||||
# the abort path will fail-forward and result in a complete split.
|
||||
return self.failpoint == "shard-split-post-complete"
|
||||
|
||||
|
||||
class NodeKill(Failure):
|
||||
def __init__(self, pageserver_id, mitigate):
|
||||
self.pageserver_id = pageserver_id
|
||||
self._mitigate = mitigate
|
||||
|
||||
def apply(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
def clear(self, env: NeonEnv):
|
||||
pageserver = env.get_pageserver(self.pageserver_id)
|
||||
pageserver.start()
|
||||
|
||||
def expect_available(self):
|
||||
return False
|
||||
|
||||
def mitigate(self, env):
|
||||
env.attachment_service.node_configure(self.pageserver_id, {"availability": "Offline"})
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"failure",
|
||||
[
|
||||
PageserverFailpoint("api-500", 1, False),
|
||||
NodeKill(1, False),
|
||||
PageserverFailpoint("api-500", 1, True),
|
||||
NodeKill(1, True),
|
||||
PageserverFailpoint("shard-split-pre-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-post-prepare", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-hardlink", 1, False),
|
||||
PageserverFailpoint("shard-split-post-child-conf", 1, False),
|
||||
PageserverFailpoint("shard-split-lsn-wait", 1, False),
|
||||
PageserverFailpoint("shard-split-pre-finish", 1, False),
|
||||
StorageControllerFailpoint("shard-split-validation"),
|
||||
StorageControllerFailpoint("shard-split-post-begin"),
|
||||
StorageControllerFailpoint("shard-split-post-remote"),
|
||||
StorageControllerFailpoint("shard-split-post-complete"),
|
||||
],
|
||||
)
|
||||
def test_sharding_split_failures(neon_env_builder: NeonEnvBuilder, failure: Failure):
|
||||
neon_env_builder.num_pageservers = 4
|
||||
initial_shard_count = 2
|
||||
split_shard_count = 4
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Make sure the node we're failing has a shard on it, otherwise the test isn't testing anything
|
||||
assert (
|
||||
failure.pageserver_id is None
|
||||
or len(
|
||||
env.get_pageserver(failure.pageserver_id)
|
||||
.http_client()
|
||||
.tenant_list_locations()["tenant_shards"]
|
||||
)
|
||||
> 0
|
||||
)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
# Set one pageserver to 500 all requests, then do a split
|
||||
# TODO: also test with a long-blocking failure: controller should time out its request and then
|
||||
# clean up in a well defined way.
|
||||
failure.apply(env)
|
||||
|
||||
with pytest.raises(AttachmentServiceApiException):
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=4)
|
||||
|
||||
# We expect that the overall operation will fail, but some split requests
|
||||
# will have succeeded: the net result should be to return to a clean state, including
|
||||
# detaching any child shards.
|
||||
def assert_rolled_back(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == initial_shard_count
|
||||
count += 1
|
||||
assert count == initial_shard_count
|
||||
|
||||
def assert_split_done(exclude_ps_id=None) -> None:
|
||||
count = 0
|
||||
for ps in env.pageservers:
|
||||
if exclude_ps_id is not None and ps.id == exclude_ps_id:
|
||||
continue
|
||||
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
tenant_shard_id = TenantShardId.parse(loc[0])
|
||||
log.info(f"Shard {tenant_shard_id} seen on node {ps.id}")
|
||||
assert tenant_shard_id.shard_count == split_shard_count
|
||||
count += 1
|
||||
assert count == split_shard_count
|
||||
|
||||
def finish_split():
|
||||
# Having failed+rolled back, we should be able to split again
|
||||
# No failures this time; it will succeed
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
if failure.expect_available():
|
||||
# Even though the split failed partway through, this should not have interrupted
|
||||
# clients. Disable waiting for pageservers in the workload helper, because our
|
||||
# failpoints may prevent API access.
|
||||
# This only applies for failure modes that leave pageserver page_service API available.
|
||||
workload.churn_rows(10, upload=False, ingest=False)
|
||||
workload.validate()
|
||||
|
||||
if failure.fails_forward():
|
||||
# A failure type which results in eventual completion of the split
|
||||
wait_until(30, 1, assert_split_done)
|
||||
elif failure.can_mitigate():
|
||||
# Mitigation phase: we expect to be able to proceed with a successful shard split
|
||||
failure.mitigate(env)
|
||||
|
||||
# The split should appear to be rolled back from the point of view of all pageservers
|
||||
# apart from the one that is offline
|
||||
wait_until(30, 1, lambda: assert_rolled_back(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
finish_split()
|
||||
wait_until(30, 1, lambda: assert_split_done(exclude_ps_id=failure.pageserver_id))
|
||||
|
||||
# Having cleared the failure, everything should converge to a pristine state
|
||||
failure.clear(env)
|
||||
wait_until(30, 1, assert_split_done)
|
||||
else:
|
||||
# Once we restore the faulty pageserver's API to good health, rollback should
|
||||
# eventually complete.
|
||||
failure.clear(env)
|
||||
|
||||
wait_until(30, 1, assert_rolled_back)
|
||||
|
||||
# Having rolled back, the tenant should be working
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
|
||||
# Splitting again should work, since we cleared the failure
|
||||
finish_split()
|
||||
assert_split_done()
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import concurrent.futures
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
@@ -25,9 +23,8 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
@@ -773,186 +770,3 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
|
||||
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
|
||||
def test_storcon_rolling_failures(
|
||||
neon_env_builder: NeonEnvBuilder, httpserver: HTTPServer, httpserver_listen_address
|
||||
):
|
||||
neon_env_builder.num_pageservers = 8
|
||||
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify-attach"
|
||||
|
||||
workloads: dict[TenantId, Workload] = {}
|
||||
|
||||
# Do neon_local endpoint reconfiguration in the background so that we can
|
||||
# accept a healthy rate of calls into notify-attach.
|
||||
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
def handler(request: Request):
|
||||
"""
|
||||
Although storage controller can use neon_local directly, this causes problems when
|
||||
the test is also concurrently modifying endpoints. Instead, configure storage controller
|
||||
to send notifications up to this test code, which will route all endpoint updates
|
||||
through Workload, which has a mutex to make it all safe.
|
||||
"""
|
||||
assert request.json is not None
|
||||
body: dict[str, Any] = request.json
|
||||
log.info(f"notify-attach request: {body}")
|
||||
|
||||
try:
|
||||
workload = workloads[TenantId(body["tenant_id"])]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
# This causes the endpoint to query storage controller for its location, which
|
||||
# is redundant since we already have it here, but this avoids extending the
|
||||
# neon_local CLI to take full lists of locations
|
||||
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for ps in env.pageservers:
|
||||
# We will do unclean detaches
|
||||
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
|
||||
n_tenants = 32
|
||||
tenants = [(env.initial_tenant, env.initial_timeline)]
|
||||
for i in range(0, n_tenants - 1):
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
shard_count = [1, 2, 4][i % 3]
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
|
||||
)
|
||||
tenants.append((tenant_id, timeline_id))
|
||||
|
||||
# Background pain:
|
||||
# - TODO: some fraction of pageserver API requests hang
|
||||
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
|
||||
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
|
||||
# the ones we're using for availability checks.
|
||||
|
||||
rng = random.Random(0xDEADBEEF)
|
||||
|
||||
for tenant_id, timeline_id in tenants:
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workloads[tenant_id] = workload
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
counts = get_node_shard_counts(env, [t[0] for t in tenants])
|
||||
assert counts[node_id] == 0
|
||||
|
||||
def attachments_active():
|
||||
for tid, _tlid in tenants:
|
||||
for shard in env.attachment_service.locate(tid):
|
||||
psid = shard["node_id"]
|
||||
tsid = TenantShardId.parse(shard["shard_id"])
|
||||
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
|
||||
assert status["state"]["slug"] == "Active"
|
||||
log.info(f"Shard {tsid} active on node {psid}")
|
||||
|
||||
failpoints = ("api-503", "5%1000*return(1)")
|
||||
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints(failpoints)
|
||||
|
||||
def for_all_workloads(callback, timeout=60):
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
for _tenant_id, workload in workloads.items():
|
||||
futs.append(pool.submit(callback, workload))
|
||||
|
||||
for f in futs:
|
||||
f.result(timeout=timeout)
|
||||
|
||||
def clean_fail_restore():
|
||||
"""
|
||||
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
|
||||
locations to activate, then SIGTERM it.
|
||||
- Endpoints should not fail any queries
|
||||
- New attach locations should activate within bounded time.
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
victim.stop(immediate=False)
|
||||
|
||||
traffic()
|
||||
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
|
||||
# Revert shards to attach at their original locations
|
||||
env.attachment_service.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def hard_fail_restore():
|
||||
"""
|
||||
Simulate an unexpected death of a pageserver node
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
victim.stop(immediate=True)
|
||||
# TODO: once we implement heartbeats detecting node failures, remove this
|
||||
# explicit marking offline and rely on storage controller to detect it itself.
|
||||
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
traffic()
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
env.attachment_service.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def traffic():
|
||||
"""
|
||||
Check that all tenants are working for postgres clients
|
||||
"""
|
||||
|
||||
def exercise_one(workload):
|
||||
workload.churn_rows(100)
|
||||
workload.validate()
|
||||
|
||||
for_all_workloads(exercise_one)
|
||||
|
||||
def init_one(workload):
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
for_all_workloads(init_one, timeout=60)
|
||||
|
||||
for i in range(0, 20):
|
||||
mode = rng.choice([0, 1, 2])
|
||||
log.info(f"Iteration {i}, mode {mode}")
|
||||
if mode == 0:
|
||||
# Traffic interval: sometimes, instead of a failure, just let the clients
|
||||
# write a load of data. This avoids chaos tests ending up with unrealistically
|
||||
# small quantities of data in flight.
|
||||
traffic()
|
||||
elif mode == 1:
|
||||
clean_fail_restore()
|
||||
elif mode == 2:
|
||||
hard_fail_restore()
|
||||
|
||||
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
|
||||
# Success criteria:
|
||||
# - New attach locations should activate within bounded time
|
||||
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
|
||||
|
||||
# TODO: fail and remove: fail a node, and remove it from the cluster.
|
||||
# Success criteria:
|
||||
# - Endpoints should not fail any queries
|
||||
# - New attach locations should activate within bounded time
|
||||
# - New secondary locations should fill up with data within bounded time
|
||||
|
||||
# TODO: somehow need to wait for reconciles to complete before doing consistency check
|
||||
# (or make the check wait).
|
||||
|
||||
# Do consistency check on every iteration, not just at the end: this makes it more obvious
|
||||
# which change caused an issue.
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
@@ -190,6 +190,8 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
# So by ignoring these instead of waiting for empty upload queue
|
||||
# we execute more distinct code paths.
|
||||
'.*stopping left-over name="remote upload".*',
|
||||
# an on-demand is cancelled by shutdown
|
||||
".*initial size calculation failed: downloading failed, possibly for shutdown",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -213,7 +213,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
# This happens when timeline remains are cleaned up during loading
|
||||
".*Timeline dir entry become invalid.*",
|
||||
# In one of the branches we poll for tenant to become active. Polls can generate this log message:
|
||||
f".*Tenant {env.initial_tenant} is not active*",
|
||||
f".*Tenant {env.initial_tenant} is not active.*",
|
||||
# an on-demand is cancelled by shutdown
|
||||
".*initial size calculation failed: downloading failed, possibly for shutdown",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user