Compare commits

..

1 Commits

Author SHA1 Message Date
Matthias van de Meent
cb8782a51a Fix recovery IO deadlock
Previously, it was possible for backends to request a page
with the LSN of the record currently being replayed. This could
cause a deadlock when the redo process wanted to read that same
page at the same time.

This LSN could only appear when the page was not present in the
LwLSN cache, and the highest evicted LSN also was the LSN of the
currently-replayed WAL record.

The issue is fixed by splitting maxLastWrittenLsn into two: one
for data pages, and one for metadata.  This allows us to keep
track of metadata changes separately, removing the implicit
dependency of page IO on metadata LSNs where appropriate.

Additionally, we stop evicting LwLSNs for pages with an LSN that
is yet to be replayed.  This means the global data page LwLsn
will never return an LSN of a record that has yet to be replayed,
*unless* the startup process has already determined that it won't
access that page again, making page IO and Replay waits by other
backends using that LSN safe for those pages.
2025-06-11 22:03:54 +02:00
57 changed files with 368 additions and 834 deletions

27
Cargo.lock generated
View File

@@ -903,6 +903,12 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.7"
@@ -1294,7 +1300,7 @@ dependencies = [
"aws-smithy-types",
"axum",
"axum-extra",
"base64 0.22.1",
"base64 0.13.1",
"bytes",
"camino",
"cfg-if",
@@ -1420,7 +1426,7 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.22.1",
"base64 0.13.1",
"camino",
"clap",
"comfy-table",
@@ -4812,7 +4818,7 @@ dependencies = [
name = "postgres-protocol2"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"base64 0.20.0",
"byteorder",
"bytes",
"fallible-iterator",
@@ -5184,7 +5190,7 @@ dependencies = [
"aws-config",
"aws-sdk-iam",
"aws-sigv4",
"base64 0.22.1",
"base64 0.13.1",
"bstr",
"bytes",
"camino",
@@ -6488,17 +6494,15 @@ dependencies = [
[[package]]
name = "serde_with"
version = "3.12.0"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa"
checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe"
dependencies = [
"base64 0.22.1",
"base64 0.13.1",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.9.0",
"serde",
"serde_derive",
"serde_json",
"serde_with_macros",
"time",
@@ -6506,9 +6510,9 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "3.12.0"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e"
checksum = "881b6f881b17d13214e5d494c939ebab463d01264ce1811e9d4ac3a882e7695f"
dependencies = [
"darling",
"proc-macro2",
@@ -8579,6 +8583,7 @@ dependencies = [
"anyhow",
"axum",
"axum-core",
"base64 0.13.1",
"base64 0.21.7",
"base64ct",
"bytes",

View File

@@ -72,7 +72,7 @@ aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
base64 = "0.22"
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.71"
bit_field = "0.10.2"
@@ -171,7 +171,7 @@ sentry = { version = "0.37", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
serde_with = { version = "3", features = [ "base64" ] }
serde_with = { version = "2.0", features = [ "base64" ] }
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"

View File

@@ -45,8 +45,6 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use base64::Engine;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
@@ -166,7 +164,7 @@ impl ComputeControlPlane {
public_key_use: Some(PublicKeyUse::Signature),
key_operations: Some(vec![KeyOperations::Verify]),
key_algorithm: Some(KeyAlgorithm::EdDSA),
key_id: Some(BASE64_URL_SAFE_NO_PAD.encode(key_hash)),
key_id: Some(base64::encode_config(key_hash, base64::URL_SAFE_NO_PAD)),
x509_url: None::<String>,
x509_chain: None::<Vec<String>>,
x509_sha1_fingerprint: None::<String>,
@@ -175,7 +173,7 @@ impl ComputeControlPlane {
algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
key_type: OctetKeyPairType::OctetKeyPair,
curve: EllipticCurve::Ed25519,
x: BASE64_URL_SAFE_NO_PAD.encode(public_key),
x: base64::encode_config(public_key, base64::URL_SAFE_NO_PAD),
}),
}],
})

View File

@@ -9,7 +9,7 @@ use utils::id::{NodeId, TimelineId};
use crate::controller_api::NodeRegisterRequest;
use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::shard::{ShardStripeSize, TenantShardId};
use crate::shard::TenantShardId;
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
/// startup.
@@ -36,10 +36,6 @@ pub struct ReAttachResponseTenant {
/// Default value only for backward compat: this field should be set
#[serde(default = "default_mode")]
pub mode: LocationConfigMode,
// Default value only for backward compat: this field should be set
#[serde(default = "ShardStripeSize::default")]
pub stripe_size: ShardStripeSize,
}
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponse {

View File

@@ -55,16 +55,9 @@ impl FeatureResolverBackgroundLoop {
continue;
}
};
let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec: {}", e);
}
}
let feature_store = FeatureStore::new_with_flags(resp.flags);
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
tracing::info!("PostHog feature resolver stopped");
}

View File

@@ -39,9 +39,6 @@ pub struct LocalEvaluationResponse {
#[derive(Deserialize)]
pub struct LocalEvaluationFlag {
#[allow(dead_code)]
id: u64,
team_id: u64,
key: String,
filters: LocalEvaluationFlagFilters,
active: bool,
@@ -110,32 +107,17 @@ impl FeatureStore {
}
}
pub fn new_with_flags(
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<Self, &'static str> {
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
let mut store = Self::new();
store.set_flags(flags, project_id)?;
Ok(store)
store.set_flags(flags);
store
}
pub fn set_flags(
&mut self,
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<(), &'static str> {
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
self.flags.clear();
for flag in flags {
if let Some(project_id) = project_id {
if flag.team_id != project_id {
return Err(
"Retrieved a spec with different project id, wrong config? Discarding the feature flags.",
);
}
}
self.flags.insert(flag.key.clone(), flag);
}
Ok(())
}
/// Generate a consistent hash for a user ID (e.g., tenant ID).
@@ -552,13 +534,6 @@ impl PostHogClient {
})
}
/// Check if the server API key is a feature flag secure API key. This key can only be
/// used to fetch the feature flag specs and can only be used on a undocumented API
/// endpoint.
fn is_feature_flag_secure_api_key(&self) -> bool {
self.config.server_api_key.starts_with("phs_")
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
@@ -572,22 +547,10 @@ impl PostHogClient {
) -> anyhow::Result<LocalEvaluationResponse> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
// OR
// BASE_URL/api/feature_flag/local_evaluation/
// with bearer token of feature flag specific self.server_api_key
let url = if self.is_feature_flag_secure_api_key() {
// The new feature local evaluation secure API token
format!(
"{}/api/feature_flag/local_evaluation",
self.config.private_api_url
)
} else {
// The old personal API token
format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
)
};
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
);
let response = self
.client
.get(url)
@@ -840,7 +803,7 @@ mod tests {
fn evaluate_multivariate() {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags, None).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =
@@ -910,7 +873,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags, None).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
@@ -966,7 +929,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags, None).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =

View File

@@ -5,7 +5,7 @@ edition = "2024"
license = "MIT/Apache-2.0"
[dependencies]
base64.workspace = true
base64 = "0.20"
byteorder.workspace = true
bytes.workspace = true
fallible-iterator.workspace = true

View File

@@ -3,8 +3,6 @@
use std::fmt::Write;
use std::{io, iter, mem, str};
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::{self, Rng};
use sha2::digest::FixedOutput;
@@ -228,7 +226,7 @@ impl ScramSha256 {
let (client_key, server_key) = match password {
Credentials::Password(password) => {
let salt = match BASE64_STANDARD.decode(parsed.salt) {
let salt = match base64::decode(parsed.salt) {
Ok(salt) => salt,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};
@@ -257,7 +255,7 @@ impl ScramSha256 {
let mut cbind_input = vec![];
cbind_input.extend(channel_binding.gs2_header().as_bytes());
cbind_input.extend(channel_binding.cbind_data());
let cbind_input = BASE64_STANDARD.encode(&cbind_input);
let cbind_input = base64::encode(&cbind_input);
self.message.clear();
write!(&mut self.message, "c={},r={}", cbind_input, parsed.nonce).unwrap();
@@ -274,12 +272,7 @@ impl ScramSha256 {
*proof ^= signature;
}
write!(
&mut self.message,
",p={}",
BASE64_STANDARD.encode(client_proof)
)
.unwrap();
write!(&mut self.message, ",p={}", base64::encode(client_proof)).unwrap();
self.state = State::Finish {
server_key,
@@ -313,7 +306,7 @@ impl ScramSha256 {
ServerFinalMessage::Verifier(verifier) => verifier,
};
let verifier = match BASE64_STANDARD.decode(verifier) {
let verifier = match base64::decode(verifier) {
Ok(verifier) => verifier,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};

View File

@@ -6,8 +6,6 @@
//! side. This is good because it ensures the cleartext password won't
//! end up in logs pg_stat displays, etc.
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::RngCore;
use sha2::digest::FixedOutput;
@@ -85,8 +83,8 @@ pub(crate) async fn scram_sha_256_salt(
format!(
"SCRAM-SHA-256${}:{}${}:{}",
SCRAM_DEFAULT_ITERATIONS,
BASE64_STANDARD.encode(salt),
BASE64_STANDARD.encode(stored_key),
BASE64_STANDARD.encode(server_key)
base64::encode(salt),
base64::encode(stored_key),
base64::encode(server_key)
)
}

View File

@@ -824,7 +824,6 @@ impl RemoteStorage for AzureBlobStorage {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected "
.to_string()

View File

@@ -440,7 +440,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError>;
}
@@ -652,23 +651,22 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
match self {
Self::LocalFs(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::AwsS3(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::AzureBlob(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::Unreliable(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
}

View File

@@ -610,7 +610,6 @@ impl RemoteStorage for LocalFs {
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}

View File

@@ -981,16 +981,22 @@ impl RemoteStorage for S3Bucket {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, complexity_limit, cancel)
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
.await
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),

View File

@@ -240,12 +240,11 @@ impl RemoteStorage for UnreliableWrapper {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(TimeTravelError::Other)?;
self.inner
.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
}

View File

@@ -157,7 +157,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, &cancel, None)
.time_travel_recover(None, t2, t_final, &cancel)
.await?;
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
@@ -173,7 +173,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, &cancel, None)
.time_travel_recover(None, t1, t_final, &cancel)
.await?;
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
@@ -189,7 +189,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, &cancel, None)
.time_travel_recover(None, t0, t_final, &cancel)
.await?;
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t0: {t0_files_recovered:?}");

View File

@@ -176,11 +176,9 @@ async fn main() -> anyhow::Result<()> {
let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config).await;
let cancel = CancellationToken::new();
// Complexity limit: as we are running this command locally, we should have a lot of memory available, and we do not
// need to limit the number of versions we are going to delete.
storage
.unwrap()
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel, None)
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel)
.await?;
}
Commands::Key(dkc) => dkc.execute(),

View File

@@ -1,6 +1,5 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use camino::{Utf8Path, Utf8PathBuf};
use metrics::core::{AtomicU64, GenericCounter};
@@ -168,17 +167,14 @@ impl BasebackupCache {
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn tmp_dir(&self) -> Utf8PathBuf {
self.data_dir.join("tmp")
}
fn entry_tmp_path(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Utf8PathBuf {
self.tmp_dir()
self.data_dir
.join("tmp")
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
@@ -198,18 +194,15 @@ impl BasebackupCache {
Some((tenant_id, timeline_id, lsn))
}
// Recreate the tmp directory to clear all files in it.
async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
let tmp_dir = self.tmp_dir();
if tmp_dir.exists() {
tokio::fs::remove_dir_all(&tmp_dir).await?;
}
tokio::fs::create_dir_all(&tmp_dir).await?;
Ok(())
}
async fn cleanup(&self) -> anyhow::Result<()> {
self.clean_tmp_dir().await?;
// Cleanup tmp directory.
let tmp_dir = self.data_dir.join("tmp");
let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
while let Some(dir_entry) = tmp_dir.next_entry().await? {
if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
}
}
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
@@ -248,14 +241,16 @@ impl BasebackupCache {
}
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir if it does not exist.
tokio::fs::create_dir_all(&self.data_dir)
// Create data_dir and tmp directory if they do not exist.
tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
.await
.context("Failed to create basebackup cache data directory")?;
self.clean_tmp_dir()
.await
.context("Failed to clean tmp directory")?;
.map_err(|e| {
anyhow::anyhow!(
"Failed to create basebackup cache data_dir {:?}: {:?}",
self.data_dir,
e
)
})?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::new();
@@ -456,11 +451,6 @@ impl BasebackupCache {
}
// Move the tmp file to the final location atomically.
// The tmp file is fsynced, so it's guaranteed that we will not have a partial file
// in the main directory.
// It's not necessary to fsync the inode after renaming, because the worst case is that
// the rename operation will be rolled back on the disk failure, the entry will disappear
// from the main directory, and the entry access will cause a cache miss.
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
@@ -478,17 +468,16 @@ impl BasebackupCache {
}
/// Prepares a basebackup in a temporary file.
/// Guarantees that the tmp file is fsynced before returning.
async fn prepare_basebackup_tmp(
&self,
entry_tmp_path: &Utf8Path,
emptry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<()> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
let file = tokio::fs::File::create(entry_tmp_path).await?;
let file = tokio::fs::File::create(emptry_tmp_path).await?;
let mut writer = BufWriter::new(file);
let mut encoder = GzipEncoder::with_quality(

View File

@@ -73,7 +73,6 @@ use crate::tenant::remote_timeline_client::{
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
use crate::tenant::timeline::{
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
@@ -1452,10 +1451,7 @@ async fn timeline_layer_scan_disposable_keys(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
.with_scope_timeline(&timeline);
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = timeline.layers.read().await;
let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Layer {tenant_shard_id}/{timeline_id}/{layer_name} not found").into(),

View File

@@ -51,7 +51,6 @@ use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::import_pgdata::ImportingTimeline;
use timeline::layer_manager::LayerManagerLockHolder;
use timeline::offload::{OffloadError, offload_timeline};
use timeline::{
CompactFlags, CompactOptions, CompactionError, PreviousHeatmap, ShutdownMode, import_pgdata,
@@ -1316,7 +1315,7 @@ impl TenantShard {
ancestor.is_some()
|| timeline
.layers
.read(LayerManagerLockHolder::LoadLayerMap)
.read()
.await
.layer_map()
.expect(
@@ -2644,7 +2643,7 @@ impl TenantShard {
}
let layer_names = tline
.layers
.read(LayerManagerLockHolder::Testing)
.read()
.await
.layer_map()
.unwrap()
@@ -3159,12 +3158,7 @@ impl TenantShard {
for timeline in &compact {
// Collect L0 counts. Can't await while holding lock above.
if let Ok(lm) = timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await
.layer_map()
{
if let Ok(lm) = timeline.layers.read().await.layer_map() {
l0_counts.insert(timeline.timeline_id, lm.level0_deltas().len());
}
}
@@ -4906,7 +4900,7 @@ impl TenantShard {
}
let layer_names = tline
.layers
.read(LayerManagerLockHolder::Testing)
.read()
.await
.layer_map()
.unwrap()
@@ -6976,7 +6970,7 @@ mod tests {
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
let layer_map = tline.layers.read(LayerManagerLockHolder::Testing).await;
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map
.layer_map()?
.level0_deltas()
@@ -7212,7 +7206,7 @@ mod tests {
let lsn = Lsn(0x10);
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read(LayerManagerLockHolder::Testing).await;
let guard = tline.layers.read().await;
let lm = guard.layer_map()?;
lm.dump(true, &ctx).await?;
@@ -8240,23 +8234,12 @@ mod tests {
tline.freeze_and_flush().await?; // force create a delta layer
}
let before_num_l0_delta_files = tline
.layers
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()?
.level0_deltas()
.len();
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
let after_num_l0_delta_files = tline
.layers
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()?
.level0_deltas()
.len();
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
assert!(
after_num_l0_delta_files < before_num_l0_delta_files,

View File

@@ -61,8 +61,8 @@ pub(crate) struct LocationConf {
/// The detailed shard identity. This structure is already scoped within
/// a TenantShardId, but we need the full ShardIdentity to enable calculating
/// key->shard mappings.
// TODO(vlad): Remove this default once all configs have a shard identity on disk.
#[serde(default = "ShardIdentity::unsharded")]
#[serde(skip_serializing_if = "ShardIdentity::is_unsharded")]
pub(crate) shard: ShardIdentity,
/// The pan-cluster tenant configuration, the same on all locations
@@ -149,12 +149,7 @@ impl LocationConf {
/// For use when attaching/re-attaching: update the generation stored in this
/// structure. If we were in a secondary state, promote to attached (posession
/// of a fresh generation implies this).
pub(crate) fn attach_in_generation(
&mut self,
mode: AttachmentMode,
generation: Generation,
stripe_size: ShardStripeSize,
) {
pub(crate) fn attach_in_generation(&mut self, mode: AttachmentMode, generation: Generation) {
match &mut self.mode {
LocationMode::Attached(attach_conf) => {
attach_conf.generation = generation;
@@ -168,8 +163,6 @@ impl LocationConf {
})
}
}
self.shard.stripe_size = stripe_size;
}
pub(crate) fn try_from(conf: &'_ models::LocationConfig) -> anyhow::Result<Self> {

View File

@@ -51,7 +51,6 @@ use crate::tenant::config::{
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{
AttachedTenantConf, GcError, LoadConfigError, SpawnMode, TenantShard, TenantState,
};
@@ -129,7 +128,7 @@ pub(crate) enum ShardSelector {
///
/// This represents the subset of a LocationConfig that we receive during re-attach.
pub(crate) enum TenantStartupMode {
Attached((AttachmentMode, Generation, ShardStripeSize)),
Attached((AttachmentMode, Generation)),
Secondary,
}
@@ -143,21 +142,15 @@ impl TenantStartupMode {
match (rart.mode, rart.r#gen) {
(LocationConfigMode::Detached, _) => None,
(LocationConfigMode::Secondary, _) => Some(Self::Secondary),
(LocationConfigMode::AttachedMulti, Some(g)) => Some(Self::Attached((
AttachmentMode::Multi,
Generation::new(g),
rart.stripe_size,
))),
(LocationConfigMode::AttachedSingle, Some(g)) => Some(Self::Attached((
AttachmentMode::Single,
Generation::new(g),
rart.stripe_size,
))),
(LocationConfigMode::AttachedStale, Some(g)) => Some(Self::Attached((
AttachmentMode::Stale,
Generation::new(g),
rart.stripe_size,
))),
(LocationConfigMode::AttachedMulti, Some(g)) => {
Some(Self::Attached((AttachmentMode::Multi, Generation::new(g))))
}
(LocationConfigMode::AttachedSingle, Some(g)) => {
Some(Self::Attached((AttachmentMode::Single, Generation::new(g))))
}
(LocationConfigMode::AttachedStale, Some(g)) => {
Some(Self::Attached((AttachmentMode::Stale, Generation::new(g))))
}
_ => {
tracing::warn!(
"Received invalid re-attach state for tenant {}: {rart:?}",
@@ -325,11 +318,9 @@ fn emergency_generations(
Some((
*tid,
match &lc.mode {
LocationMode::Attached(alc) => TenantStartupMode::Attached((
alc.attach_mode,
alc.generation,
ShardStripeSize::default(),
)),
LocationMode::Attached(alc) => {
TenantStartupMode::Attached((alc.attach_mode, alc.generation))
}
LocationMode::Secondary(_) => TenantStartupMode::Secondary,
},
))
@@ -373,7 +364,7 @@ async fn init_load_generations(
.iter()
.flat_map(|(id, start_mode)| {
match start_mode {
TenantStartupMode::Attached((_mode, generation, _stripe_size)) => Some(generation),
TenantStartupMode::Attached((_mode, generation)) => Some(generation),
TenantStartupMode::Secondary => None,
}
.map(|gen_| (*id, *gen_))
@@ -593,7 +584,7 @@ pub async fn init_tenant_mgr(
location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
}
}
Some(TenantStartupMode::Attached((attach_mode, generation, stripe_size))) => {
Some(TenantStartupMode::Attached((attach_mode, generation))) => {
let old_gen_higher = match &location_conf.mode {
LocationMode::Attached(AttachedLocationConfig {
generation: old_generation,
@@ -617,7 +608,7 @@ pub async fn init_tenant_mgr(
// local disk content: demote to secondary rather than detaching.
location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
} else {
location_conf.attach_in_generation(*attach_mode, *generation, *stripe_size);
location_conf.attach_in_generation(*attach_mode, *generation);
}
}
}
@@ -1667,10 +1658,7 @@ impl TenantManager {
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
for timeline in timelines.values() {
tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
let layers = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layers = timeline.layers.read().await;
for layer in layers.likely_resident_layers() {
let relative_path = layer

View File

@@ -1,7 +1,6 @@
//! Helper functions to upload files to remote storage with a RemoteStorage
use std::io::{ErrorKind, SeekFrom};
use std::num::NonZeroU32;
use std::time::SystemTime;
use anyhow::{Context, bail};
@@ -229,25 +228,11 @@ pub(crate) async fn time_travel_recover_tenant(
let timelines_path = super::remote_timelines_path(tenant_shard_id);
prefixes.push(timelines_path);
}
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
for prefix in &prefixes {
backoff::retry(
|| async {
storage
.time_travel_recover(
Some(prefix),
timestamp,
done_if_after,
cancel,
COMPLEXITY_LIMIT,
)
.time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
.await
},
|e| !matches!(e, TimeTravelError::Other(_)),

View File

@@ -1635,7 +1635,6 @@ pub(crate) mod test {
use crate::tenant::disk_btree::tests::TestDisk;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{TenantShard, Timeline};
/// Construct an index for a fictional delta layer and and then
@@ -2003,7 +2002,7 @@ pub(crate) mod test {
let initdb_layer = timeline
.layers
.read(crate::tenant::timeline::layer_manager::LayerManagerLockHolder::Testing)
.read()
.await
.likely_resident_layers()
.next()
@@ -2079,7 +2078,7 @@ pub(crate) mod test {
let new_layer = timeline
.layers
.read(LayerManagerLockHolder::Testing)
.read()
.await
.likely_resident_layers()
.find(|&x| x != &initdb_layer)

View File

@@ -10,7 +10,6 @@ use super::*;
use crate::context::DownloadBehavior;
use crate::tenant::harness::{TenantHarness, test_img};
use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
/// Used in tests to advance a future to wanted await point, and not futher.
const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
@@ -60,7 +59,7 @@ async fn smoke_test() {
// there to avoid the timeline being illegally empty
let (layer, dummy_layer) = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -216,7 +215,7 @@ async fn smoke_test() {
// Simulate GC removing our test layer.
{
let mut g = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let mut g = timeline.layers.write().await;
let layers = &[layer];
g.open_mut().unwrap().finish_gc_timeline(layers);
@@ -262,7 +261,7 @@ async fn evict_and_wait_on_wanted_deleted() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -306,7 +305,7 @@ async fn evict_and_wait_on_wanted_deleted() {
// assert that once we remove the `layer` from the layer map and drop our reference,
// the deletion of the layer in remote_storage happens.
{
let mut layers = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let mut layers = timeline.layers.write().await;
layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
}
@@ -348,7 +347,7 @@ fn read_wins_pending_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -481,7 +480,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -656,7 +655,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -742,7 +741,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -863,7 +862,7 @@ async fn eviction_cancellation_on_drop() {
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
// remove the layers from layermap
guard.open_mut().unwrap().finish_gc_timeline(&layers);

View File

@@ -35,11 +35,7 @@ use fail::fail_point;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use handle::ShardTimelineId;
use layer_manager::{
LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager,
Shutdown,
};
use layer_manager::Shutdown;
use offload::OffloadError;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
@@ -86,6 +82,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use self::delete::DeleteTimelineFlow;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
@@ -184,13 +181,13 @@ impl std::fmt::Display for ImageLayerCreationMode {
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_layer_manager_rlock(rlock: LayerManagerReadGuard<'_>) {
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
drop(rlock)
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_layer_manager_wlock(rlock: LayerManagerWriteGuard<'_>) {
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
drop(rlock)
}
@@ -244,7 +241,7 @@ pub struct Timeline {
///
/// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
pub(crate) layers: LockedLayerManager,
pub(crate) layers: tokio::sync::RwLock<LayerManager>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
@@ -1538,10 +1535,7 @@ impl Timeline {
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub(crate) async fn layer_size_sum(&self) -> u64 {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
guard.layer_size_sum()
}
@@ -1851,7 +1845,7 @@ impl Timeline {
// time, and this was missed.
// if write_guard.is_none() { return; }
let Ok(layers_guard) = self.layers.try_read(LayerManagerLockHolder::TryFreezeLayer) else {
let Ok(layers_guard) = self.layers.try_read() else {
// Don't block if the layer lock is busy
return;
};
@@ -2164,7 +2158,7 @@ impl Timeline {
if let ShutdownMode::FreezeAndFlush = mode {
let do_flush = if let Some((open, frozen)) = self
.layers
.read(LayerManagerLockHolder::Shutdown)
.read()
.await
.layer_map()
.map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len()))
@@ -2268,10 +2262,7 @@ impl Timeline {
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
// open.
let mut write_guard = self.write_lock.lock().await;
self.layers
.write(LayerManagerLockHolder::Shutdown)
.await
.shutdown(&mut write_guard);
self.layers.write().await.shutdown(&mut write_guard);
}
// Finally wait until any gate-holders are complete.
@@ -2374,10 +2365,7 @@ impl Timeline {
&self,
reset: LayerAccessStatsReset,
) -> Result<LayerMapInfo, layer_manager::Shutdown> {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
@@ -3244,7 +3232,7 @@ impl Timeline {
/// Initialize with an empty layer map. Used when creating a new timeline.
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
let mut layers = self.layers.try_write(LayerManagerLockHolder::Init).expect(
let mut layers = self.layers.try_write().expect(
"in the context where we call this function, no other task has access to the object",
);
layers
@@ -3264,10 +3252,7 @@ impl Timeline {
use init::Decision::*;
use init::{Discovered, DismissedLayer};
let mut guard = self
.layers
.write(LayerManagerLockHolder::LoadLayerMap)
.await;
let mut guard = self.layers.write().await;
let timer = self.metrics.load_layer_map_histo.start_timer();
@@ -3884,10 +3869,7 @@ impl Timeline {
&self,
layer_name: &LayerName,
) -> Result<Option<Layer>, layer_manager::Shutdown> {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
let layer = guard
.layer_map()?
.iter_historic_layers()
@@ -3920,10 +3902,7 @@ impl Timeline {
return None;
}
let guard = self
.layers
.read(LayerManagerLockHolder::GenerateHeatmap)
.await;
let guard = self.layers.read().await;
// Firstly, if there's any heatmap left over from when this location
// was a secondary, take that into account. Keep layers that are:
@@ -4021,10 +4000,7 @@ impl Timeline {
}
pub(super) async fn generate_unarchival_heatmap(&self, end_lsn: Lsn) -> PreviousHeatmap {
let guard = self
.layers
.read(LayerManagerLockHolder::GenerateHeatmap)
.await;
let guard = self.layers.read().await;
let now = SystemTime::now();
let mut heatmap_layers = Vec::default();
@@ -4366,7 +4342,7 @@ impl Timeline {
query: &VersionedKeySpaceQuery,
) -> Result<LayerFringe, GetVectoredError> {
let mut fringe = LayerFringe::new();
let guard = self.layers.read(LayerManagerLockHolder::GetPage).await;
let guard = self.layers.read().await;
match query {
VersionedKeySpaceQuery::Uniform { keyspace, lsn } => {
@@ -4469,7 +4445,7 @@ impl Timeline {
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read(LayerManagerLockHolder::GetPage).await;
let guard = timeline.layers.read().await;
guard.update_search_fringe(&unmapped_keyspace, cont_lsn, &mut fringe)?;
// It's safe to drop the layer map lock after planning the next round of reads.
@@ -4579,10 +4555,7 @@ impl Timeline {
_guard: &tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self
.layers
.write(LayerManagerLockHolder::GetLayerForWrite)
.await;
let mut guard = self.layers.write().await;
let last_record_lsn = self.get_last_record_lsn();
ensure!(
@@ -4624,10 +4597,7 @@ impl Timeline {
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) -> Result<u64, FlushLayerError> {
let frozen = {
let mut guard = self
.layers
.write(LayerManagerLockHolder::TryFreezeLayer)
.await;
let mut guard = self.layers.write().await;
guard
.open_mut()?
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock, &self.metrics)
@@ -4668,12 +4638,7 @@ impl Timeline {
ctx: &RequestContext,
) {
// Subscribe to L0 delta layer updates, for compaction backpressure.
let mut watch_l0 = match self
.layers
.read(LayerManagerLockHolder::FlushLoop)
.await
.layer_map()
{
let mut watch_l0 = match self.layers.read().await.layer_map() {
Ok(lm) => lm.watch_level0_deltas(),
Err(Shutdown) => return,
};
@@ -4710,7 +4675,7 @@ impl Timeline {
// Fetch the next layer to flush, if any.
let (layer, l0_count, frozen_count, frozen_size) = {
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
let layers = self.layers.read().await;
let Ok(lm) = layers.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
return;
@@ -5006,10 +4971,7 @@ impl Timeline {
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{
let mut guard = self
.layers
.write(LayerManagerLockHolder::FlushFrozenLayer)
.await;
let mut guard = self.layers.write().await;
guard.open_mut()?.finish_flush_l0_layer(
delta_layer_to_add.as_ref(),
@@ -5224,7 +5186,7 @@ impl Timeline {
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = self.layers.read().await;
let Ok(layers) = guard.layer_map() else {
return false;
};
@@ -5642,7 +5604,7 @@ impl Timeline {
if let ImageLayerCreationMode::Force = mode {
// When forced to create image layers, we might try and create them where they already
// exist. This mode is only used in tests/debug.
let layers = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers = self.layers.read().await;
if layers.contains_key(&PersistentLayerKey {
key_range: img_range.clone(),
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
@@ -5767,7 +5729,7 @@ impl Timeline {
let image_layers = batch_image_writer.finish(self, ctx).await?;
let mut guard = self.layers.write(LayerManagerLockHolder::Compaction).await;
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
// now they are being scheduled outside of write lock; current way is inconsistent with
@@ -5775,7 +5737,7 @@ impl Timeline {
guard
.open_mut()?
.track_new_image_layers(&image_layers, &self.metrics);
drop_layer_manager_wlock(guard);
drop_wlock(guard);
let duration = timer.stop_and_record();
// Creating image layers may have caused some previously visible layers to be covered
@@ -6145,7 +6107,7 @@ impl Timeline {
layers_to_remove: &[Layer],
) -> Result<(), CompactionError> {
let mut guard = tokio::select! {
guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard,
guard = self.layers.write() => guard,
_ = self.cancel.cancelled() => {
return Err(CompactionError::ShuttingDown);
}
@@ -6194,7 +6156,7 @@ impl Timeline {
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
drop_layer_manager_wlock(guard);
drop_wlock(guard);
Ok(())
}
@@ -6204,7 +6166,7 @@ impl Timeline {
mut replace_layers: Vec<(Layer, ResidentLayer)>,
mut drop_layers: Vec<Layer>,
) -> Result<(), CompactionError> {
let mut guard = self.layers.write(LayerManagerLockHolder::Compaction).await;
let mut guard = self.layers.write().await;
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
// to avoid double-removing, and avoid rewriting something that was removed.
@@ -6555,10 +6517,7 @@ impl Timeline {
// 5. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self
.layers
.write(LayerManagerLockHolder::GarbageCollection)
.await;
let mut guard = self.layers.write().await;
let layers = guard.layer_map()?;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -6860,10 +6819,7 @@ impl Timeline {
use pageserver_api::models::DownloadRemoteLayersTaskState;
let remaining = {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
let Ok(lm) = guard.layer_map() else {
// technically here we could look into iterating accessible layers, but downloading
// all layers of a shutdown timeline makes no sense regardless.
@@ -6969,7 +6925,7 @@ impl Timeline {
impl Timeline {
/// Returns non-remote layers for eviction.
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let guard = self.layers.read(LayerManagerLockHolder::Eviction).await;
let guard = self.layers.read().await;
let mut max_layer_size: Option<u64> = None;
let resident_layers = guard
@@ -7070,7 +7026,7 @@ impl Timeline {
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = self.layers.write().await;
guard
.open_mut()
.unwrap()
@@ -7133,7 +7089,7 @@ impl Timeline {
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created delta layer {}", delta_layer.local_path());
{
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = self.layers.write().await;
guard
.open_mut()
.unwrap()
@@ -7228,7 +7184,7 @@ impl Timeline {
// Link the layer to the layer map
{
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = self.layers.write().await;
let layer_map = guard.open_mut().unwrap();
layer_map.force_insert_in_memory_layer(Arc::new(layer));
}
@@ -7245,7 +7201,7 @@ impl Timeline {
io_concurrency: IoConcurrency,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read(LayerManagerLockHolder::Testing).await;
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
@@ -7274,7 +7230,7 @@ impl Timeline {
self: &Arc<Timeline>,
) -> anyhow::Result<Vec<super::storage_layer::PersistentLayerKey>> {
let mut layers = Vec::new();
let guard = self.layers.read(LayerManagerLockHolder::Testing).await;
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
layers.push(layer.key());
}
@@ -7386,7 +7342,7 @@ impl TimelineWriter<'_> {
let l0_count = self
.tl
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.read()
.await
.layer_map()?
.level0_deltas()
@@ -7605,7 +7561,6 @@ mod tests {
use crate::tenant::harness::{TenantHarness, test_img};
use crate::tenant::layer_map::LayerMap;
use crate::tenant::storage_layer::{Layer, LayerName, LayerVisibilityHint};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::{DeltaLayerTestDesc, EvictionError};
use crate::tenant::{PreviousHeatmap, Timeline};
@@ -7713,7 +7668,7 @@ mod tests {
// Evict all the layers and stash the old heatmap in the timeline.
// This simulates a migration to a cold secondary location.
let guard = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let guard = timeline.layers.read().await;
let mut all_layers = Vec::new();
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
@@ -7835,7 +7790,7 @@ mod tests {
})));
// Evict all the layers in the previous heatmap
let guard = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let guard = timeline.layers.read().await;
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
layer.evict_and_wait(forever).await.unwrap();
@@ -7898,10 +7853,7 @@ mod tests {
}
async fn find_some_layer(timeline: &Timeline) -> Layer {
let layers = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layers = timeline.layers.read().await;
let desc = layers
.layer_map()
.unwrap()

View File

@@ -4,7 +4,6 @@ use std::ops::Range;
use utils::lsn::Lsn;
use super::Timeline;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
#[derive(serde::Serialize)]
pub(crate) struct RangeAnalysis {
@@ -25,10 +24,7 @@ impl Timeline {
let num_of_l0;
let all_layer_files = {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
num_of_l0 = guard.layer_map().unwrap().level0_deltas().len();
guard.all_persistent_layers()
};

View File

@@ -9,7 +9,7 @@ use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
@@ -62,7 +62,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::tasks::log_compaction_error;
use crate::tenant::timeline::{
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
ResidentLayer, drop_layer_manager_rlock,
ResidentLayer, drop_rlock,
};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
@@ -314,10 +314,7 @@ impl GcCompactionQueue {
.unwrap_or(Lsn::INVALID);
let layers = {
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = timeline.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -411,10 +408,7 @@ impl GcCompactionQueue {
timeline: &Arc<Timeline>,
lsn: Lsn,
) -> Result<u64, CompactionError> {
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = timeline.layers.read().await;
let layer_map = guard.layer_map()?;
let layers = layer_map.iter_historic_layers().collect_vec();
let mut size = 0;
@@ -857,7 +851,7 @@ impl KeyHistoryRetention {
}
let layer_generation;
{
let guard = tline.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = tline.layers.read().await;
if !guard.contains_key(key) {
return false;
}
@@ -1288,10 +1282,7 @@ impl Timeline {
// We do the repartition on the L0-L1 boundary. All data below the boundary
// are compacted by L0 with low read amplification, thus making the `repartition`
// function run fast.
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
guard
.all_persistent_layers()
.iter()
@@ -1470,7 +1461,7 @@ impl Timeline {
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
let layers = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers = self.layers.read().await;
let layers_iter = layers.layer_map()?.iter_historic_layers();
let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
for layer_desc in layers_iter {
@@ -1731,10 +1722,7 @@ impl Timeline {
// are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
// Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
// they will be subject to L0->L1 compaction in the near future.
let layer_manager = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map()?;
let readable_points = {
@@ -1787,7 +1775,7 @@ impl Timeline {
};
let begin = tokio::time::Instant::now();
let phase1_layers_locked = self.layers.read(LayerManagerLockHolder::Compaction).await;
let phase1_layers_locked = self.layers.read().await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
@@ -1815,7 +1803,7 @@ impl Timeline {
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
async fn compact_level0_phase1<'a>(
self: &'a Arc<Self>,
guard: LayerManagerReadGuard<'a>,
guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
@@ -2041,7 +2029,7 @@ impl Timeline {
holes
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_layer_manager_rlock(guard);
drop_rlock(guard);
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
@@ -2481,7 +2469,7 @@ impl Timeline {
// Find the top of the historical layers
let end_lsn = {
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = self.layers.read().await;
let layers = guard.layer_map()?;
let l0_deltas = layers.level0_deltas();
@@ -3020,7 +3008,7 @@ impl Timeline {
}
split_key_ranges.sort();
let all_layers = {
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -3124,12 +3112,12 @@ impl Timeline {
.await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
let sub_compaction_progress = format!("{}/{}", idx + 1, jobs_len);
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.instrument(info_span!(
"sub_compaction",
sub_compaction_progress = sub_compaction_progress
))
.await?;
}
if jobs_len == 0 {
@@ -3197,10 +3185,7 @@ impl Timeline {
// 1. If a layer is in the selection, all layers below it are in the selection.
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
let job_desc = {
let guard = self
.layers
.read(LayerManagerLockHolder::GarbageCollection)
.await;
let guard = self.layers.read().await;
let layers = guard.layer_map()?;
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
@@ -3971,10 +3956,7 @@ impl Timeline {
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
let all_layers = {
let guard = self
.layers
.read(LayerManagerLockHolder::GarbageCollection)
.await;
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -4038,10 +4020,7 @@ impl Timeline {
let update_guard = self.gc_compaction_layer_update_lock.write().await;
// Acquiring the update guard ensures current read operations end and new read operations are blocked.
// TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
let mut guard = self
.layers
.write(LayerManagerLockHolder::GarbageCollection)
.await;
let mut guard = self.layers.write().await;
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
@@ -4109,11 +4088,7 @@ impl TimelineAdaptor {
pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
let layers_to_delete = {
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
let guard = self.timeline.layers.read().await;
self.layers_to_delete
.iter()
.map(|x| guard.get_from_desc(x))
@@ -4158,11 +4133,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
self.flush_updates().await?;
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
let guard = self.timeline.layers.read().await;
let layer_map = guard.layer_map()?;
let result = layer_map
@@ -4201,11 +4172,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
// this is a lot more complex than a simple downcast...
if layer.is_delta() {
let l = {
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
let guard = self.timeline.layers.read().await;
guard.get_from_desc(layer)
};
let result = l.download_and_keep_resident(ctx).await?;

View File

@@ -19,7 +19,7 @@ use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use super::layer_manager::{LayerManager, LayerManagerLockHolder};
use super::layer_manager::LayerManager;
use super::{FlushLayerError, Timeline};
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
@@ -199,10 +199,7 @@ pub(crate) async fn generate_tombstone_image_layer(
let image_lsn = ancestor_lsn;
{
let layers = detached
.layers
.read(LayerManagerLockHolder::DetachAncestor)
.await;
let layers = detached.layers.read().await;
for layer in layers.all_persistent_layers() {
if !layer.is_delta
&& layer.lsn_range.start == image_lsn
@@ -426,7 +423,7 @@ pub(super) async fn prepare(
// we do not need to start from our layers, because they can only be layers that come
// *after* ancestor_lsn
let layers = tokio::select! {
guard = ancestor.layers.read(LayerManagerLockHolder::DetachAncestor) => guard,
guard = ancestor.layers.read() => guard,
_ = detached.cancel.cancelled() => {
return Err(ShuttingDown);
}
@@ -872,12 +869,7 @@ async fn remote_copy(
// Double check that the file is orphan (probably from an earlier attempt), then delete it
let key = file_name.clone().into();
if adoptee
.layers
.read(LayerManagerLockHolder::DetachAncestor)
.await
.contains_key(&key)
{
if adoptee.layers.read().await.contains_key(&key) {
// We are supposed to filter out such cases before coming to this function
return Err(Error::Prepare(anyhow::anyhow!(
"layer file {file_name} already present and inside layer map"

View File

@@ -33,7 +33,6 @@ use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::storage_layer::LayerVisibilityHint;
use crate::tenant::tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit, sleep_random};
use crate::tenant::timeline::EvictionError;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{LogicalSizeCalculationCause, TenantShard};
#[derive(Default)]
@@ -209,7 +208,7 @@ impl Timeline {
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read(LayerManagerLockHolder::Eviction).await;
let guard = self.layers.read().await;
guard
.likely_resident_layers()

View File

@@ -15,7 +15,6 @@ use super::{Timeline, TimelineDeleteProgress};
use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
mod flow;
mod importbucket_client;
@@ -164,10 +163,7 @@ async fn prepare_import(
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline
.layers
.write(LayerManagerLockHolder::ImportPgData)
.await;
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it

View File

@@ -56,7 +56,6 @@ use crate::pgdatadir_mapping::{
};
use crate::task_mgr::TaskKind;
use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
pub async fn run(
timeline: Arc<Timeline>,
@@ -985,10 +984,7 @@ impl ChunkProcessingJob {
let (desc, path) = writer.finish(ctx).await?;
{
let guard = timeline
.layers
.read(LayerManagerLockHolder::ImportPgData)
.await;
let guard = timeline.layers.read().await;
let existing_layer = guard.try_get_from_key(&desc.key());
if let Some(layer) = existing_layer {
if layer.metadata().generation == timeline.generation {
@@ -1011,10 +1007,7 @@ impl ChunkProcessingJob {
// certain that the existing layer is identical to the new one, so in that case
// we replace the old layer with the one we just generated.
let mut guard = timeline
.layers
.write(LayerManagerLockHolder::ImportPgData)
.await;
let mut guard = timeline.layers.write().await;
let existing_layer = guard
.try_get_from_key(&resident_layer.layer_desc().key())
@@ -1043,7 +1036,7 @@ impl ChunkProcessingJob {
}
}
crate::tenant::timeline::drop_layer_manager_wlock(guard);
crate::tenant::timeline::drop_wlock(guard);
timeline
.remote_client

View File

@@ -1,8 +1,5 @@
use std::collections::HashMap;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail, ensure};
use itertools::Itertools;
@@ -23,155 +20,6 @@ use crate::tenant::storage_layer::{
PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
};
/// Warn if the lock was held for longer than this threshold.
/// It's very generous and we should bring this value down over time.
const LAYER_MANAGER_LOCK_WARN_THRESHOLD: Duration = Duration::from_secs(5);
const LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD: Duration = Duration::from_secs(30);
/// Describes the operation that is holding the layer manager lock
#[derive(Debug, Clone, Copy, strum_macros::Display)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum LayerManagerLockHolder {
GetLayerMapInfo,
GenerateHeatmap,
GetPage,
Init,
LoadLayerMap,
GetLayerForWrite,
TryFreezeLayer,
FlushFrozenLayer,
FlushLoop,
Compaction,
GarbageCollection,
Shutdown,
ImportPgData,
DetachAncestor,
Eviction,
#[cfg(test)]
Testing,
}
/// Wrapper for the layer manager that tracks the amount of time during which
/// it was held under read or write lock
#[derive(Default)]
pub(crate) struct LockedLayerManager {
locked: tokio::sync::RwLock<LayerManager>,
}
pub(crate) struct LayerManagerReadGuard<'a> {
guard: ManuallyDrop<tokio::sync::RwLockReadGuard<'a, LayerManager>>,
acquired_at: std::time::Instant,
holder: LayerManagerLockHolder,
}
pub(crate) struct LayerManagerWriteGuard<'a> {
guard: ManuallyDrop<tokio::sync::RwLockWriteGuard<'a, LayerManager>>,
acquired_at: std::time::Instant,
holder: LayerManagerLockHolder,
}
impl Drop for LayerManagerReadGuard<'_> {
fn drop(&mut self) {
// Drop the lock first, before potentially warning if it was held for too long.
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.guard) };
let held_for = self.acquired_at.elapsed();
if held_for >= LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD {
tracing::warn!(
holder=%self.holder,
"Layer manager read lock held for {}s",
held_for.as_secs_f64(),
);
}
}
}
impl Drop for LayerManagerWriteGuard<'_> {
fn drop(&mut self) {
// Drop the lock first, before potentially warning if it was held for too long.
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.guard) };
let held_for = self.acquired_at.elapsed();
if held_for >= LAYER_MANAGER_LOCK_WARN_THRESHOLD {
tracing::warn!(
holder=%self.holder,
"Layer manager write lock held for {}s",
held_for.as_secs_f64(),
);
}
}
}
impl Deref for LayerManagerReadGuard<'_> {
type Target = LayerManager;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl Deref for LayerManagerWriteGuard<'_> {
type Target = LayerManager;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl DerefMut for LayerManagerWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.deref_mut()
}
}
impl LockedLayerManager {
pub(crate) async fn read(&self, holder: LayerManagerLockHolder) -> LayerManagerReadGuard {
let guard = ManuallyDrop::new(self.locked.read().await);
LayerManagerReadGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
}
}
pub(crate) fn try_read(
&self,
holder: LayerManagerLockHolder,
) -> Result<LayerManagerReadGuard, tokio::sync::TryLockError> {
let guard = ManuallyDrop::new(self.locked.try_read()?);
Ok(LayerManagerReadGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
})
}
pub(crate) async fn write(&self, holder: LayerManagerLockHolder) -> LayerManagerWriteGuard {
let guard = ManuallyDrop::new(self.locked.write().await);
LayerManagerWriteGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
}
}
pub(crate) fn try_write(
&self,
holder: LayerManagerLockHolder,
) -> Result<LayerManagerWriteGuard, tokio::sync::TryLockError> {
let guard = ManuallyDrop::new(self.locked.try_write()?);
Ok(LayerManagerWriteGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
})
}
}
/// Provides semantic APIs to manipulate the layer map.
pub(crate) enum LayerManager {
/// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate

View File

@@ -2,6 +2,6 @@ DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);

View File

@@ -11,6 +11,9 @@
#include "utils/guc.h"
#include "utils/hsearch.h"
#if PG_MAJORVERSION_NUM > 14
#include "access/xlogrecovery.h"
#endif
typedef struct LastWrittenLsnCacheEntry
@@ -24,14 +27,20 @@ typedef struct LastWrittenLsnCacheEntry
typedef struct LwLsnCacheCtl {
int lastWrittenLsnCacheSize;
/*
* Maximal last written LSN for pages not present in lastWrittenLsnCache
*/
XLogRecPtr maxLastWrittenLsn;
* Highest (most recent) last written LSN, for pages not present in
* lastWrittenLsnCache
*/
XLogRecPtr maxLastWrittenLsnData;
/*
* Double linked list to implement LRU replacement policy for last written LSN cache.
* Access to this list as well as to last written LSN cache is protected by 'LastWrittenLsnLock'.
*/
* Maximal last written LSN for metadata, not present in lastWrittenLsnCache
*/
XLogRecPtr maxLastWrittenLsnMetadata;
/*
* Double linked list to implement LRU replacement policy for last written LSN cache.
* Access to this list as well as to last written LSN cache is protected by 'LastWrittenLsnLock'.
*/
dlist_head lastWrittenLsnLRU;
} LwLsnCacheCtl;
@@ -108,19 +117,20 @@ init_lwlsncache(void)
#else
shmemrequest();
#endif
prev_set_lwlsn_block_range_hook = set_lwlsn_block_range_hook;
set_lwlsn_block_range_hook = neon_set_lwlsn_block_range;
prev_set_lwlsn_block_v_hook = set_lwlsn_block_v_hook;
set_lwlsn_block_v_hook = neon_set_lwlsn_block_v;
prev_set_lwlsn_block_hook = set_lwlsn_block_hook;
set_lwlsn_block_hook = neon_set_lwlsn_block;
prev_set_max_lwlsn_hook = set_max_lwlsn_hook;
set_max_lwlsn_hook = neon_set_max_lwlsn;
prev_set_lwlsn_relation_hook = set_lwlsn_relation_hook;
set_lwlsn_relation_hook = neon_set_lwlsn_relation;
prev_set_lwlsn_db_hook = set_lwlsn_db_hook;
set_lwlsn_db_hook = neon_set_lwlsn_db;
#define SET_HOOK(name) do { \
prev_##name##_hook = name##_hook; \
name##_hook = neon_##name; \
} while (false)
SET_HOOK(set_lwlsn_block_range);
SET_HOOK(set_lwlsn_block_v);
SET_HOOK(set_lwlsn_block);
SET_HOOK(set_max_lwlsn);
SET_HOOK(set_lwlsn_relation);
SET_HOOK(set_lwlsn_db);
#undef SET_HOOK
}
@@ -139,24 +149,34 @@ static void shmemrequest(void) {
static void shmeminit(void) {
static HASHCTL info;
bool found;
bool found = true;
if (lwlsn_cache_size > 0)
{
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(LastWrittenLsnCacheEntry);
lastWrittenLsnCache = ShmemInitHash("last_written_lsn_cache",
lwlsn_cache_size, lwlsn_cache_size,
&info,
HASH_ELEM | HASH_BLOBS);
LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl", sizeof(LwLsnCacheCtl), &found);
// Now set the size in the struct
LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size;
if (found) {
return;
}
lwlsn_cache_size, lwlsn_cache_size,
&info,
HASH_ELEM | HASH_BLOBS);
LwLsnCache = ShmemInitStruct("neon/LwLsnCacheCtl",
sizeof(LwLsnCacheCtl), &found);
}
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
LwLsnCache->maxLastWrittenLsn = GetRedoRecPtr();
/* initialize the shmem struct if we allocated it */
if (!found) {
XLogRecPtr redoPtr;
LwLsnCache->lastWrittenLsnCacheSize = lwlsn_cache_size;
dlist_init(&LwLsnCache->lastWrittenLsnLRU);
redoPtr = GetRedoRecPtr();
LwLsnCache->maxLastWrittenLsnMetadata = redoPtr;
LwLsnCache->maxLastWrittenLsnData = redoPtr;
}
if (prev_shmem_startup_hook) {
prev_shmem_startup_hook();
}
@@ -180,17 +200,18 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
LWLockAcquire(LastWrittenLsnLock, LW_SHARED);
/* Maximal last written LSN among all non-cached pages */
lsn = LwLsnCache->maxLastWrittenLsn;
if (NInfoGetRelNumber(rlocator) != InvalidOid)
if (NInfoGetRelNumber(rlocator) != InvalidOid) /* data page*/
{
BufferTag key;
Oid spcOid = NInfoGetSpcOid(rlocator);
Oid dbOid = NInfoGetDbOid(rlocator);
Oid relNumber = NInfoGetRelNumber(rlocator);
BufTagInit(key, relNumber, forknum, blkno, spcOid, dbOid);
/* Maximal last written LSN among all non-cached data pages */
lsn = LwLsnCache->maxLastWrittenLsnData;
entry = hash_search(lastWrittenLsnCache, &key, HASH_FIND, NULL);
if (entry != NULL)
lsn = entry->lsn;
@@ -212,9 +233,13 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
lsn = SetLastWrittenLSNForBlockRangeInternal(lsn, rlocator, forknum, blkno, 1);
}
}
else
else /* metadata */
{
HASH_SEQ_STATUS seq;
/* Maximal last written LSN for metadata */
lsn = Max(LwLsnCache->maxLastWrittenLsnMetadata,
LwLsnCache->maxLastWrittenLsnData);
/* Find maximum of all cached LSNs */
hash_seq_init(&seq, lastWrittenLsnCache);
while ((entry = (LastWrittenLsnCacheEntry *) hash_seq_search(&seq)) != NULL)
@@ -230,7 +255,8 @@ neon_get_lwlsn(NRelFileInfo rlocator, ForkNumber forknum, BlockNumber blkno)
static void neon_set_max_lwlsn(XLogRecPtr lsn) {
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
LwLsnCache->maxLastWrittenLsn = lsn;
LwLsnCache->maxLastWrittenLsnMetadata = lsn;
LwLsnCache->maxLastWrittenLsnData = lsn;
LWLockRelease(LastWrittenLsnLock);
}
@@ -291,7 +317,7 @@ neon_get_lwlsn_v(NRelFileInfo relfilenode, ForkNumber forknum,
LWLockRelease(LastWrittenLsnLock);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
lsn = LwLsnCache->maxLastWrittenLsn;
lsn = LwLsnCache->maxLastWrittenLsnData;
for (int i = 0; i < nblocks; i++)
{
@@ -306,7 +332,8 @@ neon_get_lwlsn_v(NRelFileInfo relfilenode, ForkNumber forknum,
else
{
HASH_SEQ_STATUS seq;
lsn = LwLsnCache->maxLastWrittenLsn;
Assert(nblocks == 1);
lsn = LwLsnCache->maxLastWrittenLsnMetadata;
/* Find maximum of all cached LSNs */
hash_seq_init(&seq, lastWrittenLsnCache);
while ((entry = (LastWrittenLsnCacheEntry *) hash_seq_search(&seq)) != NULL)
@@ -334,10 +361,10 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
{
if (NInfoGetRelNumber(rlocator) == InvalidOid)
{
if (lsn > LwLsnCache->maxLastWrittenLsn)
LwLsnCache->maxLastWrittenLsn = lsn;
if (lsn > LwLsnCache->maxLastWrittenLsnMetadata)
LwLsnCache->maxLastWrittenLsnMetadata = lsn;
else
lsn = LwLsnCache->maxLastWrittenLsn;
lsn = LwLsnCache->maxLastWrittenLsnMetadata;
}
else
{
@@ -369,10 +396,19 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
if (hash_get_num_entries(lastWrittenLsnCache) > LwLsnCache->lastWrittenLsnCacheSize)
{
/* Replace least recently used entry */
LastWrittenLsnCacheEntry* victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
LastWrittenLsnCacheEntry* victim = NULL;
victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
while (!XLogRecordReplayFinished(victim->lsn))
{
/* in recovery, we don't allow eviction of entries with the LSN of a record that has yet to be returned */
dlist_push_tail(&LwLsnCache->lastWrittenLsnLRU, &entry->lru_node);
victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
}
/* Adjust max LSN for not cached relations/chunks if needed */
if (victim->lsn > LwLsnCache->maxLastWrittenLsn)
LwLsnCache->maxLastWrittenLsn = victim->lsn;
if (victim->lsn > LwLsnCache->maxLastWrittenLsnMetadata)
LwLsnCache->maxLastWrittenLsnMetadata = victim->lsn;
hash_search(lastWrittenLsnCache, victim, HASH_REMOVE, NULL);
}
@@ -433,6 +469,13 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
Oid dbOid = NInfoGetDbOid(relfilenode);
Oid relNumber = NInfoGetRelNumber(relfilenode);
/*
* We ignore the operation when the input is invalid:
* - we must have gotten LSNs to set
* - we must have pages to write
* - the cache must be enabled
* - we must be processing a data page, not a metadata request
*/
if (lsns == NULL || nblocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0 ||
NInfoGetRelNumber(relfilenode) == InvalidOid)
return InvalidXLogRecPtr;
@@ -466,10 +509,25 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
if (hash_get_num_entries(lastWrittenLsnCache) > LwLsnCache->lastWrittenLsnCacheSize)
{
/* Replace least recently used entry */
LastWrittenLsnCacheEntry* victim = dlist_container(LastWrittenLsnCacheEntry, lru_node, dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
LastWrittenLsnCacheEntry* victim = dlist_container(LastWrittenLsnCacheEntry, lru_node,
dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
/*
* If replay is still working on this LSN, we can't evict the
* page. Therefore, we must find a different victim, and return
* the one we just found to the pool.
*/
while (!XLogRecordReplayFinished(victim->lsn))
{
dlist_push_tail(&LwLsnCache->lastWrittenLsnLRU,
&entry->lru_node);
victim = dlist_container(LastWrittenLsnCacheEntry, lru_node,
dlist_pop_head_node(&LwLsnCache->lastWrittenLsnLRU));
}
/* Adjust max LSN for not cached relations/chunks if needed */
if (victim->lsn > LwLsnCache->maxLastWrittenLsn)
LwLsnCache->maxLastWrittenLsn = victim->lsn;
if (victim->lsn > LwLsnCache->maxLastWrittenLsnData)
LwLsnCache->maxLastWrittenLsnData = victim->lsn;
hash_search(lastWrittenLsnCache, victim, HASH_REMOVE, NULL);
}

10
poetry.lock generated
View File

@@ -3051,19 +3051,19 @@ files = [
[[package]]
name = "requests"
version = "2.32.4"
version = "2.32.3"
description = "Python HTTP for Humans."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"},
{file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"},
{file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"},
{file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"},
]
[package.dependencies]
certifi = ">=2017.4.17"
charset_normalizer = ">=2,<4"
charset-normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.21.1,<3"
@@ -3846,4 +3846,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"

View File

@@ -4,8 +4,6 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use base64::Engine as _;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use clashmap::ClashMap;
use jose_jwk::crypto::KeyInfo;
use reqwest::{Client, redirect};
@@ -349,17 +347,17 @@ impl JwkCacheEntryLock {
.split_once('.')
.ok_or(JwtEncodingError::InvalidCompactForm)?;
let header = BASE64_URL_SAFE_NO_PAD.decode(header)?;
let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD)?;
let header = serde_json::from_slice::<JwtHeader<'_>>(&header)?;
let payloadb = BASE64_URL_SAFE_NO_PAD.decode(payload)?;
let payloadb = base64::decode_config(payload, base64::URL_SAFE_NO_PAD)?;
let payload = serde_json::from_slice::<JwtPayload<'_>>(&payloadb)?;
if let Some(iss) = &payload.issuer {
ctx.set_jwt_issuer(iss.as_ref().to_owned());
}
let sig = BASE64_URL_SAFE_NO_PAD.decode(signature)?;
let sig = base64::decode_config(signature, base64::URL_SAFE_NO_PAD)?;
let kid = header.key_id.ok_or(JwtError::MissingKeyId)?;
@@ -798,6 +796,7 @@ mod tests {
use std::net::SocketAddr;
use std::time::SystemTime;
use base64::URL_SAFE_NO_PAD;
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
@@ -872,8 +871,9 @@ mod tests {
key_id: Some(Cow::Owned(kid)),
};
let header = BASE64_URL_SAFE_NO_PAD.encode(serde_json::to_string(&header).unwrap());
let body = BASE64_URL_SAFE_NO_PAD.encode(serde_json::to_string(&body).unwrap());
let header =
base64::encode_config(serde_json::to_string(&header).unwrap(), URL_SAFE_NO_PAD);
let body = base64::encode_config(serde_json::to_string(&body).unwrap(), URL_SAFE_NO_PAD);
format!("{header}.{body}")
}
@@ -883,7 +883,7 @@ mod tests {
let payload = build_jwt_payload(kid, jose_jwa::Signing::Es256);
let sig: Signature = SigningKey::from(key).sign(payload.as_bytes());
let sig = BASE64_URL_SAFE_NO_PAD.encode(sig.to_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
format!("{payload}.{sig}")
}
@@ -893,7 +893,7 @@ mod tests {
let payload = build_custom_jwt_payload(kid, body, jose_jwa::Signing::Es256);
let sig: Signature = SigningKey::from(key).sign(payload.as_bytes());
let sig = BASE64_URL_SAFE_NO_PAD.encode(sig.to_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
format!("{payload}.{sig}")
}
@@ -904,7 +904,7 @@ mod tests {
let payload = build_jwt_payload(kid, jose_jwa::Signing::Rs256);
let sig = SigningKey::<sha2::Sha256>::new(key).sign(payload.as_bytes());
let sig = BASE64_URL_SAFE_NO_PAD.encode(sig.to_bytes());
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
format!("{payload}.{sig}")
}

View File

@@ -11,13 +11,11 @@ use anyhow::Context;
use anyhow::{bail, ensure};
use arc_swap::ArcSwapOption;
use futures::future::Either;
use itertools::{Itertools, Position};
use rand::{Rng, thread_rng};
use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, warn};
use tracing::{Instrument, info, warn};
use utils::sentry_init::init_sentry;
use utils::{project_build_tag, project_git_version};
@@ -316,7 +314,7 @@ pub async fn run() -> anyhow::Result<()> {
let jemalloc = match crate::jemalloc::MetricRecorder::new() {
Ok(t) => Some(t),
Err(e) => {
error!(error = ?e, "could not start jemalloc metrics loop");
tracing::error!(error = ?e, "could not start jemalloc metrics loop");
None
}
};
@@ -522,44 +520,23 @@ pub async fn run() -> anyhow::Result<()> {
}
}
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
if let Some(mut redis_kv_client) = redis_kv_client {
for attempt in (0..3).with_position() {
match redis_kv_client.try_connect().await {
Ok(()) => {
info!("Connected to Redis KV client");
maintenance_tasks.spawn(async move {
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
drop(redis_kv_client);
drop(redis_kv_client);
// `handle_cancel_messages` was terminated due to the tx_cancel
// being dropped. this is not worthy of an error, and this task can only return `Err`,
// so let's wait forever instead.
std::future::pending().await
});
break;
}
Err(e) => {
error!("Failed to connect to Redis KV client: {e}");
if matches!(attempt, Position::Last(_)) {
bail!(
"Failed to connect to Redis KV client after {} attempts",
attempt.into_inner()
);
}
let jitter = thread_rng().gen_range(0..100);
tokio::time::sleep(Duration::from_millis(1000 + jitter)).await;
}
}
}
// `handle_cancel_messages` was terminated due to the tx_cancel
// being dropped. this is not worthy of an error, and this task can only return `Err`,
// so let's wait forever instead.
std::future::pending().await
});
}
if let Some(regional_redis_client) = regional_redis_client {

View File

@@ -1,8 +1,5 @@
//! Definition and parser for channel binding flag (a part of the `GS2` header).
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
/// Channel binding flag (possibly with params).
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum ChannelBinding<T> {
@@ -58,7 +55,7 @@ impl<T: std::fmt::Display> ChannelBinding<T> {
let mut cbind_input = vec![];
write!(&mut cbind_input, "p={mode},,",).unwrap();
cbind_input.extend_from_slice(get_cbind_data(mode)?);
BASE64_STANDARD.encode(&cbind_input).into()
base64::encode(&cbind_input).into()
}
})
}
@@ -73,9 +70,9 @@ mod tests {
use ChannelBinding::*;
let cases = [
(NotSupportedClient, BASE64_STANDARD.encode("n,,")),
(NotSupportedServer, BASE64_STANDARD.encode("y,,")),
(Required("foo"), BASE64_STANDARD.encode("p=foo,,bar")),
(NotSupportedClient, base64::encode("n,,")),
(NotSupportedServer, base64::encode("y,,")),
(Required("foo"), base64::encode("p=foo,,bar")),
];
for (cb, input) in cases {

View File

@@ -2,8 +2,6 @@
use std::convert::Infallible;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use sha2::Sha256;
@@ -107,7 +105,7 @@ pub(crate) async fn exchange(
secret: &ServerSecret,
password: &[u8],
) -> sasl::Result<sasl::Outcome<super::ScramKey>> {
let salt = BASE64_STANDARD.decode(&secret.salt_base64)?;
let salt = base64::decode(&secret.salt_base64)?;
let client_key = derive_client_key(pool, endpoint, password, &salt, secret.iterations).await;
if secret.is_password_invalid(&client_key).into() {

View File

@@ -3,9 +3,6 @@
use std::fmt;
use std::ops::Range;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use super::base64_decode_array;
use super::key::{SCRAM_KEY_LEN, ScramKey};
use super::signature::SignatureBuilder;
@@ -91,7 +88,7 @@ impl<'a> ClientFirstMessage<'a> {
let mut message = String::new();
write!(&mut message, "r={}", self.nonce).unwrap();
BASE64_STANDARD.encode_string(nonce, &mut message);
base64::encode_config_buf(nonce, base64::STANDARD, &mut message);
let combined_nonce = 2..message.len();
write!(&mut message, ",s={salt_base64},i={iterations}").unwrap();
@@ -145,7 +142,11 @@ impl<'a> ClientFinalMessage<'a> {
server_key: &ScramKey,
) -> String {
let mut buf = String::from("v=");
BASE64_STANDARD.encode_string(signature_builder.build(server_key), &mut buf);
base64::encode_config_buf(
signature_builder.build(server_key),
base64::STANDARD,
&mut buf,
);
buf
}
@@ -250,7 +251,7 @@ mod tests {
"iiYEfS3rOgn8S3rtpSdrOsHtPLWvIkdgmHxA0hf3JNOAG4dU"
);
assert_eq!(
BASE64_STANDARD.encode(msg.proof),
base64::encode(msg.proof),
"SRpfsIVS4Gk11w1LqQ4QvCUBZYQmqXNSDEcHqbQ3CHI="
);
}

View File

@@ -15,8 +15,6 @@ mod secret;
mod signature;
pub mod threadpool;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
pub(crate) use exchange::{Exchange, exchange};
use hmac::{Hmac, Mac};
pub(crate) use key::ScramKey;
@@ -34,7 +32,7 @@ pub(crate) const METHODS_WITHOUT_PLUS: &[&str] = &[SCRAM_SHA_256];
fn base64_decode_array<const N: usize>(input: impl AsRef<[u8]>) -> Option<[u8; N]> {
let mut bytes = [0u8; N];
let size = BASE64_STANDARD.decode_slice(input, &mut bytes).ok()?;
let size = base64::decode_config_slice(input, base64::STANDARD, &mut bytes).ok()?;
if size != N {
return None;
}

View File

@@ -1,7 +1,5 @@
//! Tools for SCRAM server secret management.
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use subtle::{Choice, ConstantTimeEq};
use super::base64_decode_array;
@@ -58,7 +56,7 @@ impl ServerSecret {
// iteration count 1 for our generated passwords going forward.
// PG16 users can set iteration count=1 already today.
iterations: 1,
salt_base64: BASE64_STANDARD.encode(nonce),
salt_base64: base64::encode(nonce),
stored_key: ScramKey::default(),
server_key: ScramKey::default(),
doomed: true,
@@ -90,7 +88,7 @@ mod tests {
assert_eq!(parsed.iterations, iterations);
assert_eq!(parsed.salt_base64, salt);
assert_eq!(BASE64_STANDARD.encode(parsed.stored_key), stored_key);
assert_eq!(BASE64_STANDARD.encode(parsed.server_key), server_key);
assert_eq!(base64::encode(parsed.stored_key), stored_key);
assert_eq!(base64::encode(parsed.server_key), server_key);
}
}

View File

@@ -16,8 +16,6 @@ use std::sync::atomic::AtomicUsize;
use std::task::{Poll, ready};
use std::time::Duration;
use base64::Engine as _;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use ed25519_dalek::{Signature, Signer, SigningKey};
use futures::Future;
use futures::future::poll_fn;
@@ -348,7 +346,7 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
jwt.push_str("eyJhbGciOiJFZERTQSJ9.");
// encode the jwt payload in-place
BASE64_URL_SAFE_NO_PAD.encode_string(payload, &mut jwt);
base64::encode_config_buf(payload, base64::URL_SAFE_NO_PAD, &mut jwt);
// create the signature from the encoded header || payload
let sig: Signature = sk.sign(jwt.as_bytes());
@@ -356,7 +354,7 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
jwt.push('.');
// encode the jwt signature in-place
BASE64_URL_SAFE_NO_PAD.encode_string(sig.to_bytes(), &mut jwt);
base64::encode_config_buf(sig.to_bytes(), base64::URL_SAFE_NO_PAD, &mut jwt);
debug_assert_eq!(
jwt.len(),

View File

@@ -3,8 +3,6 @@ pub mod postgres_rustls;
pub mod server_config;
use anyhow::Context;
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use rustls::pki_types::CertificateDer;
use sha2::{Digest, Sha256};
use tracing::{error, info};
@@ -60,7 +58,7 @@ impl TlsServerEndPoint {
let oid = certificate.signature_algorithm.oid;
if SHA256_OIDS.contains(&oid) {
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
info!(%subject, tls_server_end_point = %BASE64_STANDARD.encode(tls_server_end_point), "determined channel binding");
info!(%subject, tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
Ok(Self::Sha256(tls_server_end_point))
} else {
error!(%subject, "unknown channel binding");

View File

@@ -9,7 +9,7 @@ pytest = "^7.4.4"
psycopg2-binary = "^2.9.10"
typing-extensions = "^4.12.2"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.32.4"
requests = "^2.32.3"
pytest-xdist = "^3.3.1"
asyncpg = "^0.30.0"
aiopg = "^1.4.0"

View File

@@ -676,7 +676,7 @@ async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
@@ -1398,31 +1398,6 @@ async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, Ap
)
}
async fn handle_tenant_timeline_locate(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
check_permissions(&req, Scope::Admin)?;
maybe_rate_limit(&req, tenant_id).await;
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(_req) => {}
};
json_response(
StatusCode::OK,
service
.tenant_timeline_locate(tenant_id, timeline_id)
.await?,
)
}
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -2164,16 +2139,6 @@ pub fn make_router(
)
},
)
.get(
"/debug/v1/tenant/:tenant_id/timeline/:timeline_id/locate",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_locate,
RequestName("v1_tenant_timeline_locate"),
)
},
)
.get("/debug/v1/scheduler", |r| {
named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
})
@@ -2470,7 +2435,7 @@ pub fn make_router(
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_shard_id", |r| {
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
@@ -2480,7 +2445,7 @@ pub fn make_router(
// The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
// are implicitly exposed here. This must be last in the list to avoid
// taking precedence over other GET methods we might implement by hand.
.get("/v1/tenant/:tenant_shard_id/*", |r| {
.get("/v1/tenant/:tenant_id/*", |r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
@@ -2489,7 +2454,7 @@ pub fn make_router(
})
// Tenant timeline mark_invisible passthrough to shard zero
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
|r| {
tenant_service_handler(
r,

View File

@@ -2267,7 +2267,6 @@ impl Service {
// fail, and start from scratch, so it doesn't make sense for us to try and preserve
// the stale/multi states at this point.
mode: LocationConfigMode::AttachedSingle,
stripe_size: shard.shard.stripe_size,
});
shard.generation = std::cmp::max(shard.generation, Some(new_gen));
@@ -2301,7 +2300,6 @@ impl Service {
id: *tenant_shard_id,
r#gen: None,
mode: LocationConfigMode::Secondary,
stripe_size: shard.shard.stripe_size,
});
// We must not update observed, because we have no guarantee that our
@@ -6651,8 +6649,6 @@ impl Service {
/// This is for debug/support only: assuming tenant data is already present in S3, we "create" a
/// tenant with a very high generation number so that it will see the existing data.
/// It does not create timelines on safekeepers, because they might already exist on some
/// safekeeper set. So, the timelines are not storcon-managed after the import.
pub(crate) async fn tenant_import(
&self,
tenant_id: TenantId,

View File

@@ -107,7 +107,7 @@ impl ChaosInjector {
// - Skip shards doing a graceful migration already, so that we allow these to run to
// completion rather than only exercising the first part and then cancelling with
// some other chaos.
matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
!matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
&& shard.get_preferred_node().is_none()
}

View File

@@ -17,7 +17,7 @@ use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -26,13 +26,6 @@ use utils::lsn::Lsn;
use super::Service;
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct TimelineLocateResponse {
pub generation: SafekeeperGeneration,
pub sk_set: Vec<NodeId>,
pub new_sk_set: Option<Vec<NodeId>>,
}
impl Service {
/// Timeline creation on safekeepers
///
@@ -403,38 +396,6 @@ impl Service {
Ok(())
}
/// Locate safekeepers for a timeline.
/// Return the generation, sk_set and new_sk_set if present.
/// If the timeline is not storcon-managed, return NotFound.
pub(crate) async fn tenant_timeline_locate(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineLocateResponse, ApiError> {
let timeline = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(timeline) = timeline else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
));
};
Ok(TimelineLocateResponse {
generation: SafekeeperGeneration::new(timeline.generation as u32),
sk_set: timeline
.sk_set
.iter()
.map(|id| NodeId(*id as u64))
.collect(),
new_sk_set: timeline
.new_sk_set
.map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
})
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
pub(super) async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,

View File

@@ -2223,17 +2223,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
shards: list[dict[str, Any]] = body["shards"]
return shards
def timeline_locate(self, tenant_id: TenantId, timeline_id: TimelineId):
"""
:return: dict {"generation": int, "sk_set": [int], "new_sk_set": [int]}
"""
response = self.request(
"GET",
f"{self.api}/debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def tenant_describe(self, tenant_id: TenantId):
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int, preferred_az_id: str}

View File

@@ -18,7 +18,6 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
Safekeeper,
flush_ep_to_pageserver,
)
from fixtures.pageserver.http import PageserverApiException
@@ -27,7 +26,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
from fixtures.safekeeper.http import MembershipConfiguration
from fixtures.workload import Workload
if TYPE_CHECKING:
@@ -544,24 +542,6 @@ def test_historic_storage_formats(
# All our artifacts should contain at least one timeline
assert len(timelines) > 0
# Import tenant does not create the timeline on safekeepers,
# because it is a debug handler and the timeline may have already been
# created on some set of safekeepers.
# Create the timeline on safekeepers manually.
# TODO(diko): when we have the script/storcon handler to migrate
# the timeline to storcon, we can replace this code with it.
mconf = MembershipConfiguration(
generation=1,
members=Safekeeper.sks_to_safekeeper_ids([env.safekeepers[0]]),
new_members=None,
)
members_sks = Safekeeper.mconf_sks(env, mconf)
for timeline in timelines:
Safekeeper.create_timeline(
dataset.tenant_id, timeline["timeline_id"], env.pageserver, mconf, members_sks
)
# TODO: ensure that the snapshots we're importing contain a sensible variety of content, at the very
# least they should include a mixture of deltas and image layers. Preferably they should also
# contain some "exotic" stuff like aux files from logical replication.

View File

@@ -430,7 +430,6 @@ def test_tenant_delete_stale_shards(neon_env_builder: NeonEnvBuilder, pg_bin: Pg
workload.init()
workload.write_rows(256)
workload.validate()
workload.stop()
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,

View File

@@ -20,7 +20,8 @@ anstream = { version = "0.6" }
anyhow = { version = "1", features = ["backtrace"] }
axum = { version = "0.8", features = ["ws"] }
axum-core = { version = "0.5", default-features = false, features = ["tracing"] }
base64 = { version = "0.21" }
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
base64-647d43efb71741da = { package = "base64", version = "0.21" }
base64ct = { version = "1", default-features = false, features = ["std"] }
bytes = { version = "1", features = ["serde"] }
camino = { version = "1", default-features = false, features = ["serde1"] }