Compare commits

..

8 Commits

Author SHA1 Message Date
Christian Schwarz
1863a04fb0 WIP 2023-10-05 18:21:18 +02:00
Christian Schwarz
f83a71ca6a WIP 2023-10-05 18:13:54 +02:00
Christian Schwarz
74a634c9fa WIP 2023-10-05 18:06:26 +02:00
Christian Schwarz
fc3f8a65b3 WIP: provide permits in requestcontext 2023-10-05 18:02:22 +02:00
Christian Schwarz
9f03dd24c2 page_cache: find_victim: prevent starvation 2023-10-05 16:54:02 +02:00
Christian Schwarz
dc96a7604a page_cache: ensure forward progress on cache miss 2023-10-05 16:51:08 +02:00
Christian Schwarz
d7c94e67ce inline lock_for_write and try_lock_for_write into memorize_materialized_page
Motivation
==========

It's the only user, and the name of `_for_write` is wrong as of

    commit 7a63685cde
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Fri Aug 18 19:31:03 2023 +0200

        simplify page-caching of EphemeralFile (#4994)

Notes
=====

This also allows us to get rid of the WriteBufResult type.

Also rename `search_mapping_for_write` to `search_mapping_exact`.
It makes more sense that way because there is `_for_write`-locking
anymore.
2023-10-05 16:01:29 +02:00
John Spray
baa5fa1e77 pageserver: location configuration API, attachment modes, secondary locations (#5299)
## Problem

These changes are part of building seamless tenant migration, as
described in the RFC:
- https://github.com/neondatabase/neon/pull/5029

## Summary of changes

- A new configuration type `LocationConf` supersedes `TenantConfOpt` for
storing a tenant's configuration in the pageserver repo dir. It contains
`TenantConfOpt`, as well as a new `mode` attribute that describes what
kind of location this is (secondary, attached, attachment mode etc). It
is written to a file called `config-v1` instead of `config` -- this
prepares us for neatly making any other profound changes to the format
of the file in future. Forward compat for existing pageserver code is
achieved by writing out both old and new style files. Backward compat is
achieved by checking for the old-style file if the new one isn't found.
- The `TenantMap` type changes, to hold `TenantSlot` instead of just
`Tenant`. The `Tenant` type continues to be used for attached tenants
only. Tenants in other states (such as secondaries) are represented by a
different variant of `TenantSlot`.
- Where `Tenant` & `Timeline` used to hold an Arc<Mutex<TenantConfOpt>>,
they now hold a reference to a AttachedTenantConf, which includes the
extra information from LocationConf. This enables them to know the
current attachment mode.
- The attachment mode is used as an advisory input to decide whether to
do compaction and GC (AttachedStale is meant to avoid doing uploads,
AttachedMulti is meant to avoid doing deletions).
- A new HTTP API is added at `PUT /tenants/<tenant_id>/location_config`
to drive new location configuration. This provides a superset of the
functionality of attach/detach/load/ignore:
  - Attaching a tenant is just configuring it in an attached state
  - Detaching a tenant is configuring it to a detached state
  - Loading a tenant is just the same as attaching it
- Ignoring a tenant is the same as configuring it into Secondary with
warm=false (i.e. retain the files on disk but do nothing else).

Caveats:
- AttachedMulti tenants don't do compaction in this PR, but they do in
the follow on #5397
- Concurrent updates to the `location_config` API are not handled
elegantly in this PR, a better mechanism is added in the follow on
https://github.com/neondatabase/neon/pull/5367
- Secondary mode is just a placeholder in this PR: the code to upload
heatmaps and do downloads on secondary locations will be added in a
later PR (but that shouldn't change any external interfaces)

Closes: https://github.com/neondatabase/neon/issues/5379

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-10-05 09:55:10 +01:00
23 changed files with 1275 additions and 639 deletions

27
Cargo.lock generated
View File

@@ -158,6 +158,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.4.0"
@@ -1031,6 +1042,15 @@ dependencies = [
"zstd",
]
[[package]]
name = "concurrent-queue"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "const_format"
version = "0.2.30"
@@ -1452,6 +1472,12 @@ dependencies = [
"libc",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fail"
version = "0.5.1"
@@ -2674,6 +2700,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-compression",
"async-stream",
"async-trait",

View File

@@ -35,6 +35,7 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-channel = "1.9.0"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
flate2 = "1.0.26"
async-stream = "0.3"

View File

@@ -10,6 +10,7 @@ use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
completion,
generation::Generation,
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
@@ -218,6 +219,8 @@ impl std::ops::Deref for TenantCreateRequest {
}
}
/// An alternative representation of `pageserver::tenant::TenantConf` with
/// simpler types.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
@@ -243,6 +246,39 @@ pub struct TenantConfig {
pub gc_feedback: Option<bool>,
}
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
/// lists out all possible states (and the virtual "Detached" state)
/// in a flat form rather than using rust-style enums.
#[derive(Serialize, Deserialize, Debug)]
pub enum LocationConfigMode {
AttachedSingle,
AttachedMulti,
AttachedStale,
Secondary,
Detached,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfigSecondary {
pub warm: bool,
}
/// An alternative representation of `pageserver::tenant::LocationConf`,
/// for use in external-facing APIs.
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfig {
pub mode: LocationConfigMode,
/// If attaching, in what generation?
#[serde(default)]
pub generation: Option<Generation>,
#[serde(default)]
pub secondary_conf: Option<LocationConfigSecondary>,
// If requesting mode `Secondary`, configuration for that.
// Custom storage configuration for the tenant, if any
pub tenant_conf: TenantConfig,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
@@ -253,6 +289,16 @@ pub struct StatusResponse {
pub id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
async-channel.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true

View File

@@ -37,8 +37,8 @@ use crate::tenant::{
TIMELINES_SEGMENT_NAME,
};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
TIMELINE_UNINIT_MARK_SUFFIX,
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
};
pub mod defaults {
@@ -631,10 +631,18 @@ impl PageServerConf {
/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
///
/// Legacy: superseded by tenant_location_config_path. Eventually
/// remove this function.
pub fn tenant_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TENANT_CONFIG_NAME)
}
pub fn tenant_location_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_LOCATION_CONFIG_NAME)
}
pub fn timelines_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TIMELINES_SEGMENT_NAME)
}

View File

@@ -86,15 +86,18 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::sync::{Arc, Mutex, MutexGuard};
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
page_cache_permit: Option<Arc<crate::page_cache::PinnedSlotsPermit>>,
}
/// The kind of access to the page cache.
@@ -150,6 +153,7 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
page_cache_permit: None,
},
}
}
@@ -163,6 +167,7 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
page_cache_permit: original.page_cache_permit.clone(),
},
}
}
@@ -186,6 +191,11 @@ impl RequestContextBuilder {
self
}
pub(crate) fn page_cache_permit(mut self, p: Arc<crate::page_cache::PinnedSlotsPermit>) -> Self {
self.inner.page_cache_permit = Some(p);
self
}
pub fn build(self) -> RequestContext {
self.inner
}
@@ -286,4 +296,8 @@ impl RequestContext {
pub(crate) fn page_content_kind(&self) -> PageContentKind {
self.page_content_kind
}
pub(crate) fn permit(&self) -> Option<&crate::page_cache::PinnedSlotsPermit> {
self.page_cache_permit.as_ref().map(|p| &**p)
}
}

View File

@@ -34,8 +34,6 @@ use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file;
use crate::virtual_file::on_fatal_io_error;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
@@ -197,7 +195,7 @@ impl ListWriter {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
on_fatal_io_error(&e);
Err(anyhow::anyhow!(e))
}
}
}
@@ -223,9 +221,9 @@ impl ListWriter {
Err(e) => {
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
// This is fatal: any failure to read this local directory indicates a
// storage problem or configuration problem of the node.
virtual_file::on_fatal_io_error(&e);
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
}
};
@@ -251,8 +249,6 @@ impl ListWriter {
// Non-fatal error: we will just leave the file behind but not
// try and load it.
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
virtual_file::on_fatal_io_error(&e);
}
continue;
@@ -265,7 +261,7 @@ impl ListWriter {
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected filename in deletion queue: {basename}");
warn!("Unexpected key in deletion queue: {basename}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
continue;
};
@@ -293,12 +289,7 @@ impl ListWriter {
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = match tokio::fs::read(&list_path).await {
Ok(b) => b,
Err(e) => {
virtual_file::on_fatal_io_error(&e);
}
};
let list_bytes = tokio::fs::read(&list_path).await?;
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,

View File

@@ -28,7 +28,6 @@ use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::on_fatal_io_error;
use super::deleter::DeleterMessage;
use super::DeletionHeader;
@@ -117,11 +116,6 @@ where
/// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
/// go into the queue of ready-to-execute lists.
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
// Figure out for each tenant which generation number to validate.
//
// It is sufficient to validate the max generation number of each tenant because only the
// highest generation number can possibly be valid. Hence this map will collect the
// highest generation pending validation for each tenant.
let mut tenant_generations = HashMap::new();
for list in &self.pending_lists {
for (tenant_id, tenant_list) in &list.tenants {
@@ -252,11 +246,6 @@ where
}
}
// Assert monotonicity of the list sequence numbers we are processing
if let Some(validated) = validated_sequence {
assert!(list.sequence >= validated)
}
validated_sequence = Some(list.sequence);
}
@@ -304,8 +293,7 @@ where
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {list_path}: {e:#}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
on_fatal_io_error(&e);
break;
}
}
}

View File

@@ -10,7 +10,8 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest, TenantLoadRequest,
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
@@ -29,7 +30,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
@@ -150,7 +151,10 @@ impl From<TenantMapInsertError> for ApiError {
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
}
TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
}
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}
@@ -1011,6 +1015,48 @@ async fn update_tenant_config_handler(
json_response(StatusCode::OK, ())
}
async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
let conf = state.conf;
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());
}
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}
/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
async fn handle_tenant_break(
r: Request<Body>,
@@ -1464,6 +1510,9 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})

View File

@@ -112,6 +112,10 @@ pub const METADATA_FILE_NAME: &str = "metadata";
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_CONFIG_NAME: &str = "config";
/// Per-tenant configuration file.
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
/// A suffix used for various temporary files. Any temporary files found in the
/// data directory at pageserver startup can be automatically removed.
pub const TEMP_FILE_SUFFIX: &str = "___temp";

View File

@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
AcquirePinnedSlotTimeout,
EvictIterLimit,
}
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {

View File

@@ -66,8 +66,7 @@
//! inserted to the mapping, but you must hold the write-lock on the slot until
//! the contents are valid. If you need to release the lock without initializing
//! the contents, you must remove the mapping first. We make that easy for the
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
//! page, the caller must explicitly call guard.mark_valid() after it has
//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
//! initialized it. If the guard is dropped without calling mark_valid(), the
//! mapping is automatically removed and the slot is marked free.
//!
@@ -79,6 +78,7 @@ use std::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
},
task::Poll,
time::Duration,
};
@@ -215,16 +215,21 @@ impl Slot {
impl SlotInner {
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
let mut guard = self.permit.lock().unwrap();
if let Some(existing_permit) = guard.upgrade() {
drop(guard);
drop(permit);
existing_permit
} else {
let permit = Arc::new(permit);
*guard = Arc::downgrade(&permit);
permit
fn coalesce_readers_permit<'c>(&self, permit: PermitKind<'c>) -> PermitKindReadGuard<'c> {
match permit {
PermitKind::CtxProvided(permit) => PermitKindReadGuard::CtxProvided(permit),
PermitKind::Acquired(permit) => {
let mut guard = self.permit.lock().unwrap();
if let Some(existing_permit) = guard.upgrade() {
drop(guard);
drop(permit);
existing_permit
} else {
let permit = Arc::new(permit);
*guard = Arc::downgrade(&permit);
permit
}
}
}
}
}
@@ -252,21 +257,36 @@ pub struct PageCache {
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
find_victim_sender:
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
find_victim_waiters:
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
size_metrics: &'static PageCacheSizeMetrics,
}
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
pub(crate) struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
enum PermitKind<'c> {
CtxProvided(&'c PinnedSlotsPermit),
Acquired(PinnedSlotsPermit),
}
enum PermitKindReadGuard<'c> {
CtxProvided(&'c PinnedSlotsPermit),
Coalesced(Arc<PinnedSlotsPermit>),
}
///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i> {
_permit: Arc<PinnedSlotsPermit>,
pub struct PageReadGuard<'c, 'i> {
_permit: PermitKindReadGuard<'c>,
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
}
impl std::ops::Deref for PageReadGuard<'_> {
impl std::ops::Deref for PageReadGuard<'_, '_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
@@ -274,7 +294,7 @@ impl std::ops::Deref for PageReadGuard<'_> {
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_, '_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.slot_guard.buf
}
@@ -286,78 +306,89 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
///
/// Counterintuitively, this is used even for a read, if the requested page is not
/// currently found in the page cache. In that case, the caller of lock_for_read()
/// is expected to fill in the page contents and call mark_valid(). Similarly
/// lock_for_write() can return an invalid buffer that the caller is expected to
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
/// is expected to fill in the page contents and call mark_valid().
pub struct PageWriteGuard<'c, 'i> {
state: PageWriteGuardState<'c, 'i>,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
enum PageWriteGuardState<'c, 'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PermitKindReadGuard<'c>,
},
Downgraded,
}
impl std::ops::DerefMut for PageWriteGuard<'_, '_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl std::ops::Deref for PageWriteGuard<'_> {
impl std::ops::Deref for PageWriteGuard<'_, '_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_, '_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => todo!(),
}
}
}
impl PageWriteGuard<'_> {
impl<'c, 'a> PageWriteGuard<'c, 'a> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
#[must_use]
pub fn mark_valid(mut self) -> PageReadGuard<'c, 'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit,
slot_guard: inner.downgrade(),
}
}
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl Drop for PageWriteGuard<'_> {
impl Drop for PageWriteGuard<'_, '_> {
///
/// If the buffer was allocated for a page that was not already in the
/// cache, but the lock_for_read/write() caller dropped the buffer without
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
inner.key = None;
}
PageWriteGuardState::Downgraded => {}
}
}
}
/// lock_for_read() return value
pub enum ReadBufResult<'a> {
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
pub enum ReadBufResult<'c, 'a> {
Found(PageReadGuard<'c, 'a>),
NotFound(PageWriteGuard<'c, 'a>),
}
impl PageCache {
@@ -379,10 +410,9 @@ impl PageCache {
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
let Ok(permit) = self.try_get_pinned_slot_permit(ctx).await else {
return None;
};
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_accesses_materialized_page
@@ -430,12 +460,13 @@ impl PageCache {
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
&'static self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
lsn: Lsn,
img: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
@@ -446,30 +477,87 @@ impl PageCache {
lsn,
};
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(&cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
debug_assert_eq!(inner.buf.len(), img.len());
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(inner.buf == img);
return Ok(());
}
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
}
}
debug_assert!(permit.is_some());
Ok(())
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
// Create a write guard for the slot so we go through the expected motions.
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
let mut write_guard = PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
};
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
return Ok(());
}
}
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
pub async fn read_immutable_buf<'c>(
&'static self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
ctx: &'c RequestContext,
) -> anyhow::Result<ReadBufResult<'c, 'static>> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key, ctx).await
@@ -483,7 +571,22 @@ impl PageCache {
// "mappings" after this section. But the routines in this section should
// not require changes.
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
pub(crate) async fn get_permit(&self) -> Arc<PinnedSlotsPermit> {
Arc::new(PinnedSlotsPermit(
Arc::clone(&self.pinned_slots)
.acquire_owned()
.await
.expect("the semaphore is never closed"),
))
}
async fn try_get_pinned_slot_permit<'c>(
&self,
ctx: &'c RequestContext,
) -> anyhow::Result<PermitKind<'c>> {
if let Some(permit) = ctx.permit() {
return Ok(PermitKind::CtxProvided(permit));
};
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
@@ -493,9 +596,9 @@ impl PageCache {
)
.await
{
Ok(res) => Ok(PinnedSlotsPermit(
Ok(res) => Ok(PermitKind::Acquired(PinnedSlotsPermit(
res.expect("this semaphore is never closed"),
)),
))),
Err(_timeout) => {
timer.stop_and_discard();
crate::metrics::page_cache_errors_inc(
@@ -515,10 +618,10 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(
async fn try_lock_for_read<'c>(
&self,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
permit: &mut Option<PermitKind<'c>>,
) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
@@ -571,11 +674,11 @@ impl PageCache {
/// ```
///
async fn lock_for_read(
&self,
&'static self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
@@ -638,99 +741,10 @@ impl PageCache {
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Some(PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
valid: true,
});
}
}
None
}
/// Return a write-locked buffer for given block.
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
return Ok(WriteBufResult::Found(write_guard));
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
}));
}
}
@@ -775,7 +789,7 @@ impl PageCache {
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
@@ -882,10 +896,12 @@ impl PageCache {
///
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
&'static self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
// Get in line.
let receiver = self.find_victim_waiters.recv();
let mut iters = 0;
loop {
iters += 1;
@@ -897,41 +913,8 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
if iters > iter_limit {
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
// any particular number of iterations: other threads might race ahead and acquire and
// release pins just as we're scanning the array.
//
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
// slots. There are two threads running concurrently, A and B. A has just
// acquired the permit from the semaphore.
//
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
// B: Acquire permit.
// B: Look at slot 2, decrement its usage_count to zero and continue the search
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
// B: Acquire permit.
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
//
// Now we're back in the starting situation that both slots have
// usage_count 1, but A has now been through one iteration of the
// find_victim() loop. This can repeat indefinitely and on each
// iteration, A's iteration count increases by one.
//
// So, even though the semaphore for the permits is fair, the victim search
// itself happens in parallel and is not fair.
// Hence even with a permit, a task can theoretically be starved.
// To avoid this, we'd need tokio to give priority to tasks that are holding
// permits for longer.
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
);
anyhow::bail!("exceeded evict iter limit");
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
unreachable!("find_victim_waiters prevents starvation");
}
continue;
}
@@ -942,7 +925,16 @@ impl PageCache {
inner.key = None;
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
self.find_victim_sender
.try_send((slot_idx, inner))
.expect("we always get in line first");
match futures::poll!(receiver) {
Poll::Ready(Ok(res)) => return Ok(res),
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
Poll::Pending => {
unreachable!("we just sent to the channel and got in line earlier")
}
}
}
}
}
@@ -979,6 +971,7 @@ impl PageCache {
})
.collect();
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
@@ -986,6 +979,8 @@ impl PageCache {
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
find_victim_sender,
find_victim_waiters,
}
}
}

View File

@@ -44,6 +44,8 @@ use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::LoadMetadataError;
@@ -64,6 +66,7 @@ use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETI
use crate::repository::GcResult;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::metadata::load_metadata;
pub use crate::tenant::remote_timeline_client::index::IndexPart;
@@ -160,6 +163,28 @@ pub struct TenantSharedResources {
pub deletion_queue_client: DeletionQueueClient,
}
/// A [`Tenant`] is really an _attached_ tenant. The configuration
/// for an attached tenant is a subset of the [`LocationConf`], represented
/// in this struct.
pub(super) struct AttachedTenantConf {
tenant_conf: TenantConfOpt,
location: AttachedLocationConfig,
}
impl AttachedTenantConf {
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => Ok(Self {
tenant_conf: location_conf.tenant_conf,
location: attach_conf.clone(),
}),
LocationMode::Secondary(_) => {
anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
}
}
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -177,12 +202,15 @@ pub struct Tenant {
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
// This is necessary to allow global config updates.
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
tenant_id: TenantId,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
@@ -526,14 +554,13 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
let tenant_conf =
Self::load_tenant_config(conf, &tenant_id).context("load tenant config")?;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let TenantSharedResources {
broker_client,
@@ -541,14 +568,12 @@ impl Tenant {
deletion_queue_client,
} = resources;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
conf,
tenant_conf,
attached_conf,
wal_redo_manager,
tenant_id,
generation,
remote_storage.clone(),
deletion_queue_client,
));
@@ -859,10 +884,9 @@ impl Tenant {
backtrace: String::new(),
},
conf,
TenantConfOpt::default(),
AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
wal_redo_manager,
tenant_id,
Generation::broken(),
None,
DeletionQueueClient::broken(),
))
@@ -881,7 +905,7 @@ impl Tenant {
pub(crate) fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
attached_conf: AttachedTenantConf,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -889,14 +913,6 @@ impl Tenant {
) -> Arc<Tenant> {
span::debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, &tenant_id) {
Ok(conf) => conf,
Err(e) => {
error!("load tenant config failed: {:?}", e);
return Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"));
}
};
let broker_client = resources.broker_client;
let remote_storage = resources.remote_storage;
@@ -904,10 +920,9 @@ impl Tenant {
let tenant = Tenant::new(
TenantState::Loading,
conf,
tenant_conf,
attached_conf,
wal_redo_manager,
tenant_id,
generation,
remote_storage.clone(),
resources.deletion_queue_client.clone(),
);
@@ -1646,6 +1661,15 @@ impl Tenant {
"Cannot run GC iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() {
info!("Skipping GC in location state {:?}", conf.location);
return Ok(GcResult::default());
}
}
self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
.await
}
@@ -1664,6 +1688,14 @@ impl Tenant {
"Cannot run compaction iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(());
}
}
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -2089,7 +2121,7 @@ where
impl Tenant {
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
*self.tenant_conf.read().unwrap()
self.tenant_conf.read().unwrap().tenant_conf
}
pub fn effective_config(&self) -> TenantConf {
@@ -2098,84 +2130,95 @@ impl Tenant {
}
pub fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
pub fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
pub fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
pub fn get_compaction_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_period
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
}
pub fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_gc_horizon(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.gc_horizon
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
}
pub fn get_gc_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.gc_period
.unwrap_or(self.conf.default_tenant_conf.gc_period)
}
pub fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
pub fn get_pitr_interval(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.pitr_interval
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn get_min_resident_size_override(&self) -> Option<u64> {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.min_resident_size_override
.or(self.conf.default_tenant_conf.min_resident_size_override)
}
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
*self.tenant_conf.write().unwrap() = new_tenant_conf;
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
*self.tenant_conf.write().unwrap() = new_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
@@ -2245,10 +2288,9 @@ impl Tenant {
fn new(
state: TenantState,
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
attached_conf: AttachedTenantConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
generation: Generation,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
) -> Tenant {
@@ -2308,12 +2350,12 @@ impl Tenant {
Tenant {
tenant_id,
generation,
generation: attached_conf.location.generation,
conf,
// using now here is good enough approximation to catch tenants with really long
// activation times.
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
tenant_conf: Arc::new(RwLock::new(attached_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
@@ -2331,52 +2373,123 @@ impl Tenant {
pub(super) fn load_tenant_config(
conf: &'static PageServerConf,
tenant_id: &TenantId,
) -> anyhow::Result<TenantConfOpt> {
let target_config_path = conf.tenant_config_path(tenant_id);
) -> anyhow::Result<LocationConf> {
let legacy_config_path = conf.tenant_config_path(tenant_id);
let config_path = conf.tenant_location_config_path(tenant_id);
info!("loading tenantconf from {target_config_path}");
if config_path.exists() {
// New-style config takes precedence
let deserialized = Self::read_config(&config_path)?;
Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
} else if legacy_config_path.exists() {
// Upgrade path: found an old-style configuration only
let deserialized = Self::read_config(&legacy_config_path)?;
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
if !target_config_path.exists() {
info!("tenant config not found in {target_config_path}");
return Ok(TenantConfOpt::default());
let mut tenant_conf = TenantConfOpt::default();
for (key, item) in deserialized.iter() {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{legacy_config_path}' as pageserver config")
})?;
}
_ => bail!(
"config file {legacy_config_path} has unrecognized pageserver option '{key}'"
),
}
}
// Legacy configs are implicitly in attached state
Ok(LocationConf::attached_single(
tenant_conf,
Generation::none(),
))
} else {
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
info!(
"tenant config not found in {} or {}",
config_path, legacy_config_path
);
Ok(LocationConf::default())
}
}
fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
info!("loading tenant configuration from {path}");
// load and parse file
let config = fs::read_to_string(&target_config_path)
.with_context(|| format!("Failed to load config from path '{target_config_path}'"))?;
let config = fs::read_to_string(path)
.with_context(|| format!("Failed to load config from path '{path}'"))?;
let toml = config.parse::<toml_edit::Document>().with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as toml file")
})?;
let mut tenant_conf = TenantConfOpt::default();
for (key, item) in toml.iter() {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as pageserver config")
})?;
}
_ => bail!(
"config file {target_config_path} has unrecognized pageserver option '{key}'"
),
}
}
Ok(tenant_conf)
config
.parse::<toml_edit::Document>()
.with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config(
conf: &'static PageServerConf,
tenant_id: &TenantId,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
let legacy_config_path = conf.tenant_config_path(tenant_id);
let config_path = conf.tenant_location_config_path(tenant_id);
Self::persist_tenant_config_at(tenant_id, &config_path, &legacy_config_path, location_conf)
.await
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config_at(
tenant_id: &TenantId,
config_path: &Utf8Path,
legacy_config_path: &Utf8Path,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
// Forward compat: write out an old-style configuration that old versions can read, in case we roll back
Self::persist_tenant_config_legacy(
tenant_id,
legacy_config_path,
&location_conf.tenant_conf,
)
.await?;
if let LocationMode::Attached(attach_conf) = &location_conf.mode {
// Once we use LocationMode, generations are mandatory. If we aren't using generations,
// then drop out after writing legacy-style config.
if attach_conf.generation.is_none() {
tracing::debug!("Running without generations, not writing new-style LocationConf");
return Ok(());
}
}
info!("persisting tenantconf to {config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
"#
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
let conf_content = conf_content.as_bytes();
let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
VirtualFile::crashsafe_overwrite(config_path, &temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_id} config to {config_path}"))?;
Ok(())
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
async fn persist_tenant_config_legacy(
tenant_id: &TenantId,
target_config_path: &Utf8Path,
tenant_conf: TenantConfOpt,
tenant_conf: &TenantConfOpt,
) -> anyhow::Result<()> {
// imitate a try-block with a closure
info!("persisting tenantconf to {target_config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
@@ -3076,7 +3189,7 @@ pub(crate) enum CreateTenantFilesMode {
pub(crate) async fn create_tenant_files(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
location_conf: &LocationConf,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
) -> anyhow::Result<Utf8PathBuf> {
@@ -3099,7 +3212,7 @@ pub(crate) async fn create_tenant_files(
let creation_result = try_create_target_tenant_dir(
conf,
tenant_conf,
location_conf,
tenant_id,
mode,
&temporary_tenant_dir,
@@ -3125,7 +3238,7 @@ pub(crate) async fn create_tenant_files(
async fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
location_conf: &LocationConf,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
temporary_tenant_dir: &Utf8Path,
@@ -3155,14 +3268,26 @@ async fn try_create_target_tenant_dir(
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
let temporary_tenant_config_path = rebase_directory(
let temporary_legacy_tenant_config_path = rebase_directory(
&conf.tenant_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
let temporary_tenant_config_path = rebase_directory(
&conf.tenant_location_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf).await?;
Tenant::persist_tenant_config_at(
tenant_id,
&temporary_tenant_config_path,
&temporary_legacy_tenant_config_path,
location_conf,
)
.await?;
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
@@ -3443,10 +3568,13 @@ pub mod harness {
let tenant = Arc::new(Tenant::new(
TenantState::Loading,
self.conf,
TenantConfOpt::from(self.tenant_conf),
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt::from(self.tenant_conf),
self.generation,
))
.unwrap(),
walredo_mgr,
self.tenant_id,
self.generation,
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),
));

View File

@@ -20,10 +20,10 @@ use std::io::{Error, ErrorKind};
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(
pub async fn read_blob<'c>(
&self,
offset: u64,
ctx: &RequestContext,
ctx: &'c RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
@@ -31,11 +31,11 @@ impl<'a> BlockCursor<'a> {
}
/// Read blob into the given buffer. Any previous contents in the buffer
/// are overwritten.
pub async fn read_blob_into_buf(
pub async fn read_blob_into_buf<'c>(
&self,
offset: u64,
dstbuf: &mut Vec<u8>,
ctx: &RequestContext,
ctx: &'c RequestContext,
) -> Result<(), std::io::Error> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;
@@ -234,10 +234,7 @@ impl BlobWriter<false> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef,
virtual_file::Error,
};
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {

View File

@@ -6,7 +6,7 @@ use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::{self, VirtualFile};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
@@ -34,27 +34,27 @@ where
}
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
pub enum BlockLease<'c, 'a> {
PageReadGuard(PageReadGuard<'c, 'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
impl<'c, 'a> From<PageReadGuard<'c, 'a>> for BlockLease<'c, 'a> {
fn from(value: PageReadGuard<'c, 'a>) -> BlockLease<'c, 'a> {
BlockLease::PageReadGuard(value)
}
}
#[cfg(test)]
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
impl<'c, 'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'c, 'a> {
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Arc(value)
}
}
impl<'a> Deref for BlockLease<'a> {
impl<'c, 'a> Deref for BlockLease<'c, 'a> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
@@ -83,11 +83,11 @@ pub(crate) enum BlockReaderRef<'a> {
impl<'a> BlockReaderRef<'a> {
#[inline(always)]
async fn read_blk(
async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
@@ -96,7 +96,7 @@ impl<'a> BlockReaderRef<'a> {
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
VirtualFile(r) => r.read_blk(blknum).await.map_err(virtual_file::Error::into),
VirtualFile(r) => r.read_blk(blknum).await,
}
}
}
@@ -141,11 +141,11 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub async fn read_blk(
pub async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, std::io::Error> {
self.reader.read_blk(blknum, ctx).await
}
}
@@ -174,39 +174,33 @@ impl FileBlockReader {
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.await
.map_err(virtual_file::Error::into)
}
/// Read a block.
///
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub async fn read_blk(
pub async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, 'static>, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
Ok(write_guard.mark_valid().into())
}
}
}
}

View File

@@ -13,6 +13,7 @@ use pageserver_api::models;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::time::Duration;
use utils::generation::Generation;
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
@@ -44,7 +45,211 @@ pub mod defaults {
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}
/// Per-tenant configuration options
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum AttachmentMode {
/// Our generation is current as far as we know, and as far as we know we are the only attached
/// pageserver. This is the "normal" attachment mode.
Single,
/// Our generation number is current as far as we know, but we are advised that another
/// pageserver is still attached, and therefore to avoid executing deletions. This is
/// the attachment mode of a pagesever that is the destination of a migration.
Multi,
/// Our generation number is superseded, or about to be superseded. We are advised
/// to avoid remote storage writes if possible, and to avoid sending billing data. This
/// is the attachment mode of a pageserver that is the origin of a migration.
Stale,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct AttachedLocationConfig {
pub(crate) generation: Generation,
pub(crate) attach_mode: AttachmentMode,
// TODO: add a flag to override AttachmentMode's policies under
// disk pressure (i.e. unblock uploads under disk pressure in Stale
// state, unblock deletions after timeout in Multi state)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct SecondaryLocationConfig {
/// If true, keep the local cache warm by polling remote storage
pub(crate) warm: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum LocationMode {
Attached(AttachedLocationConfig),
Secondary(SecondaryLocationConfig),
}
/// Per-tenant, per-pageserver configuration. All pageservers use the same TenantConf,
/// but have distinct LocationConf.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct LocationConf {
/// The location-specific part of the configuration, describes the operating
/// mode of this pageserver for this tenant.
pub(crate) mode: LocationMode,
/// The pan-cluster tenant configuration, the same on all locations
pub(crate) tenant_conf: TenantConfOpt,
}
impl std::fmt::Debug for LocationConf {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.mode {
LocationMode::Attached(conf) => {
write!(
f,
"Attached {:?}, gen={:?}",
conf.attach_mode, conf.generation
)
}
LocationMode::Secondary(conf) => {
write!(f, "Secondary, warm={}", conf.warm)
}
}
}
}
impl AttachedLocationConfig {
/// Consult attachment mode to determine whether we are currently permitted
/// to delete layers. This is only advisory, not required for data safety.
/// See [`AttachmentMode`] for more context.
pub(crate) fn may_delete_layers_hint(&self) -> bool {
// TODO: add an override for disk pressure in AttachedLocationConfig,
// and respect it here.
match &self.attach_mode {
AttachmentMode::Single => true,
AttachmentMode::Multi | AttachmentMode::Stale => {
// In Multi mode we avoid doing deletions because some other
// attached pageserver might get 404 while trying to read
// a layer we delete which is still referenced in their metadata.
//
// In Stale mode, we avoid doing deletions because we expect
// that they would ultimately fail validation in the deletion
// queue due to our stale generation.
false
}
}
}
/// Whether we are currently hinted that it is worthwhile to upload layers.
/// This is only advisory, not required for data safety.
/// See [`AttachmentMode`] for more context.
pub(crate) fn may_upload_layers_hint(&self) -> bool {
// TODO: add an override for disk pressure in AttachedLocationConfig,
// and respect it here.
match &self.attach_mode {
AttachmentMode::Single | AttachmentMode::Multi => true,
AttachmentMode::Stale => {
// In Stale mode, we avoid doing uploads because we expect that
// our replacement pageserver will already have started its own
// IndexPart that will never reference layers we upload: it is
// wasteful.
false
}
}
}
}
impl LocationConf {
/// For use when loading from a legacy configuration: presence of a tenant
/// implies it is in AttachmentMode::Single, which used to be the only
/// possible state. This function should eventually be removed.
pub(crate) fn attached_single(tenant_conf: TenantConfOpt, generation: Generation) -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
}),
tenant_conf,
}
}
/// For use when attaching/re-attaching: update the generation stored in this
/// structure. If we were in a secondary state, promote to attached (posession
/// of a fresh generation implies this).
pub(crate) fn attach_in_generation(&mut self, generation: Generation) {
match &mut self.mode {
LocationMode::Attached(attach_conf) => {
attach_conf.generation = generation;
}
LocationMode::Secondary(_) => {
// We are promoted to attached by the control plane's re-attach response
self.mode = LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
})
}
}
}
pub(crate) fn try_from(conf: &'_ models::LocationConfig) -> anyhow::Result<Self> {
let tenant_conf = TenantConfOpt::try_from(&conf.tenant_conf)?;
fn get_generation(conf: &'_ models::LocationConfig) -> Result<Generation, anyhow::Error> {
conf.generation
.ok_or_else(|| anyhow::anyhow!("Generation must be set when attaching"))
}
let mode = match &conf.mode {
models::LocationConfigMode::AttachedMulti => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Multi,
})
}
models::LocationConfigMode::AttachedSingle => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Single,
})
}
models::LocationConfigMode::AttachedStale => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Stale,
})
}
models::LocationConfigMode::Secondary => {
anyhow::ensure!(conf.generation.is_none());
let warm = conf
.secondary_conf
.as_ref()
.map(|c| c.warm)
.unwrap_or(false);
LocationMode::Secondary(SecondaryLocationConfig { warm })
}
models::LocationConfigMode::Detached => {
// Should not have been called: API code should translate this mode
// into a detach rather than trying to decode it as a LocationConf
return Err(anyhow::anyhow!("Cannot decode a Detached configuration"));
}
};
Ok(Self { mode, tenant_conf })
}
}
impl Default for LocationConf {
// TODO: this should be removed once tenant loading can guarantee that we are never
// loading from a directory without a configuration.
// => tech debt since https://github.com/neondatabase/neon/issues/1555
fn default() -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation: Generation::none(),
attach_mode: AttachmentMode::Single,
}),
tenant_conf: TenantConfOpt::default(),
}
}
}
/// A tenant's calcuated configuration, which is the result of merging a
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
///
/// For storing and transmitting individual tenant's configuration, see
/// TenantConfOpt.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
// Flush out an inmemory layer, if it's holding WAL older than this

View File

@@ -197,6 +197,7 @@ async fn cleanup_remaining_fs_traces(
};
rm(conf.tenant_config_path(tenant_id), false).await?;
rm(conf.tenant_location_config_path(tenant_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
Err(anyhow::anyhow!(

View File

@@ -64,44 +64,40 @@ impl EphemeralFile {
self.len
}
pub(crate) async fn read_blk(
pub(crate) async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -171,7 +167,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {

View File

@@ -24,9 +24,11 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
@@ -38,6 +40,39 @@ use super::delete::DeleteTenantError;
use super::timeline::delete::DeleteTimelineFlow;
use super::TenantSharedResources;
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
/// reads and ingest WAL.
/// - `Secondary`: is only keeping a local cache warm.
///
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
/// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub enum TenantSlot {
Attached(Arc<Tenant>),
Secondary,
}
impl TenantSlot {
/// Return the `Tenant` in this slot if attached, else None
fn get_attached(&self) -> Option<&Arc<Tenant>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary => None,
}
}
/// Consume self and return the `Tenant` that was in this slot if attached, else None
fn into_attached(self) -> Option<Arc<Tenant>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary => None,
}
}
}
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
pub(crate) enum TenantsMap {
@@ -45,14 +80,27 @@ pub(crate) enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_insert`].
Open(HashMap<TenantId, Arc<Tenant>>),
Open(HashMap<TenantId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
ShuttingDown(HashMap<TenantId, TenantSlot>),
}
impl TenantsMap {
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
/// None is returned.
pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.get(tenant_id).and_then(TenantSlot::get_attached)
}
}
}
/// Get the contents of the map at this tenant ID, even if it is in secondary state.
pub(crate) fn get_slot(&self, tenant_id: &TenantId) -> Option<&TenantSlot> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
@@ -61,7 +109,9 @@ impl TenantsMap {
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.remove(tenant_id).and_then(TenantSlot::into_attached)
}
}
}
}
@@ -205,19 +255,59 @@ pub async fn init_tenant_mgr(
}
};
// Try loading the location configuration
let mut location_conf = match Tenant::load_tenant_config(conf, &tenant_id)
.context("load tenant config")
{
Ok(c) => c,
Err(e) => {
warn!("Marking tenant broken, failed to {e:#}");
tenants.insert(
tenant_id,
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_id,
"error loading tenant location configuration".to_string(),
)),
);
continue;
}
};
let generation = if let Some(generations) = &tenant_generations {
// We have a generation map: treat it as the authority for whether
// this tenant is really attached.
if let Some(gen) = generations.get(&tenant_id) {
*gen
} else {
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path, e
);
}
match &location_conf.mode {
LocationMode::Secondary(_) => {
// We do not require the control plane's permission for secondary mode
// tenants, because they do no remote writes and hence require no
// generation number
info!("Loaded tenant {tenant_id} in secondary mode");
tenants.insert(tenant_id, TenantSlot::Secondary);
}
LocationMode::Attached(_) => {
// TODO: augment re-attach API to enable the control plane to
// instruct us about secondary attachments. That way, instead of throwing
// away local state, we can gracefully fall back to secondary here, if the control
// plane tells us so.
// (https://github.com/neondatabase/neon/issues/5377)
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
if let Err(e) =
safe_remove_tenant_dir_all(&tenant_dir_path).await
{
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path, e
);
}
}
};
continue;
}
} else {
@@ -230,18 +320,23 @@ pub async fn init_tenant_mgr(
Generation::none()
};
// Presence of a generation number implies attachment: attach the tenant
// if it wasn't already, and apply the generation number.
location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
match schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_dir_path,
generation,
AttachedTenantConf::try_from(location_conf)?,
resources.clone(),
Some(init_order.clone()),
&TENANTS,
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), tenant);
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
}
Err(e) => {
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
@@ -273,7 +368,7 @@ pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_path: &Utf8Path,
generation: Generation,
location_conf: AttachedTenantConf,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -310,7 +405,7 @@ pub(crate) fn schedule_local_tenant_processing(
"attaching mark file present but no remote storage configured".to_string(),
)
} else {
match Tenant::spawn_attach(conf, tenant_id, generation, resources, tenants, ctx) {
match Tenant::spawn_attach(conf, tenant_id, resources, location_conf, tenants, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -322,7 +417,13 @@ pub(crate) fn schedule_local_tenant_processing(
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf, tenant_id, generation, resources, init_order, tenants, ctx,
conf,
tenant_id,
location_conf,
resources,
init_order,
tenants,
ctx,
)
};
Ok(tenant)
@@ -378,7 +479,16 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
let res = {
let (_guard, shutdown_progress) = completion::channel();
tenant.shutdown(shutdown_progress, freeze_and_flush).await
match tenant {
TenantSlot::Attached(t) => {
t.shutdown(shutdown_progress, freeze_and_flush).await
}
TenantSlot::Secondary => {
// TODO: once secondary mode downloads are implemented,
// ensure they have all stopped before we reach this point.
Ok(())
}
}
};
if let Err(other_progress) = res {
@@ -451,16 +561,19 @@ pub async fn create_tenant(
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
let tenant_directory = super::create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
generation, resources, None, &TENANTS, ctx)?;
AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -489,14 +602,126 @@ pub async fn set_new_tenant_config(
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_id, true).await?;
let tenant_config_path = conf.tenant_config_path(&tenant_id);
Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf)
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
}
#[instrument(skip_all, fields(tenant_id, new_location_config))]
pub(crate) async fn upsert_location(
conf: &'static PageServerConf,
tenant_id: TenantId,
new_location_config: LocationConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
let mut existing_tenant = match get_tenant(tenant_id, false).await {
Ok(t) => Some(t),
Err(GetTenantError::NotFound(_)) => None,
Err(e) => anyhow::bail!(e),
};
// If we need to shut down a Tenant, do that first
let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) {
(LocationMode::Secondary(_), Some(t)) => Some(t),
(LocationMode::Attached(attach_conf), Some(t)) => {
if attach_conf.generation != t.generation {
Some(t)
} else {
None
}
}
_ => None,
};
// TODO: currently we risk concurrent operations interfering with the tenant
// while we await shutdown, but we also should not hold the TenantsMap lock
// across the whole operation. Before we start using this function in production,
// a follow-on change will revise how concurrency is handled in TenantsMap.
// (https://github.com/neondatabase/neon/issues/5378)
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
Err(barrier) => {
info!("Shutdown already in progress, waiting for it to complete");
barrier.wait().await;
}
}
existing_tenant = None;
}
if let Some(tenant) = existing_tenant {
// Update the existing tenant
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_location_config(AttachedTenantConf::try_from(new_location_config)?);
} else {
// Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock,
// and re-check that the state of anything we are replacing is as expected.
tenant_map_upsert_slot(tenant_id, |old_value| async move {
if let Some(TenantSlot::Attached(t)) = old_value {
if !matches!(t.current_state(), TenantState::Stopping { .. }) {
anyhow::bail!("Tenant state changed during location configuration update");
}
}
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => TenantSlot::Secondary,
LocationMode::Attached(_attach_config) => {
// Do a schedule_local_tenant_processing
// FIXME: should avoid doing this disk I/O inside the TenantsMap lock,
// we have the same problem in load_tenant/attach_tenant. Probably
// need a lock in TenantSlot to fix this.
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant_path = conf.tenant_path(&tenant_id);
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
};
let new_tenant = schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_path,
AttachedTenantConf::try_from(new_location_config)?,
resources,
None,
&TENANTS,
ctx,
)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
TenantSlot::Attached(new_tenant)
}
};
Ok(new_slot)
})
.await?;
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
#[error("Tenant {0} not found")]
@@ -657,7 +882,12 @@ pub async fn load_tenant(
remote_storage,
deletion_queue_client
};
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, generation, resources, None, &TENANTS, ctx)
let mut location_conf = Tenant::load_tenant_config(conf, &tenant_id).map_err( TenantMapInsertError::Other)?;
location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -710,7 +940,10 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Secondary => None,
})
.collect())
}
@@ -727,7 +960,8 @@ pub async fn attach_tenant(
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
@@ -738,8 +972,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -762,8 +995,10 @@ pub enum TenantMapInsertError {
ShuttingDown,
#[error("tenant {0} already exists, state: {1:?}")]
TenantAlreadyExists(TenantId, TenantState),
#[error("tenant {0} already exists in secondary state")]
TenantExistsSecondary(TenantId),
#[error(transparent)]
Closure(#[from] anyhow::Error),
Other(#[from] anyhow::Error),
}
/// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that
@@ -787,20 +1022,47 @@ where
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Occupied(e) => match e.get() {
TenantSlot::Attached(t) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
t.current_state(),
)),
TenantSlot::Secondary => Err(TenantMapInsertError::TenantExistsSecondary(tenant_id)),
},
hash_map::Entry::Vacant(v) => match insert_fn().await {
Ok(tenant) => {
v.insert(tenant.clone());
v.insert(TenantSlot::Attached(tenant.clone()));
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
Err(e) => Err(TenantMapInsertError::Other(e)),
},
}
}
async fn tenant_map_upsert_slot<'a, F, R>(
tenant_id: TenantId,
upsert_fn: F,
) -> Result<(), TenantMapInsertError>
where
F: FnOnce(Option<TenantSlot>) -> R,
R: std::future::Future<Output = anyhow::Result<TenantSlot>>,
{
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => m,
};
match upsert_fn(m.remove(&tenant_id)).await {
Ok(upsert_val) => {
m.insert(tenant_id, upsert_val);
Ok(())
}
Err(e) => Err(TenantMapInsertError::Other(e)),
}
}
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
@@ -820,28 +1082,40 @@ where
// tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
// avoid holding the lock for the entire process.
let tenant = {
tenants
match tenants
.write()
.await
.get(&tenant_id)
.cloned()
.get_slot(&tenant_id)
.ok_or(TenantStateError::NotFound(tenant_id))?
{
TenantSlot::Attached(t) => Some(t.clone()),
TenantSlot::Secondary => None,
}
};
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
// If the tenant was attached, shut it down gracefully. For secondary
// locations this part is not necessary
match tenant {
Some(attached_tenant) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, freeze_and_flush).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
return Err(TenantStateError::IsStopping(tenant_id));
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match attached_tenant.shutdown(progress, freeze_and_flush).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
None => {
// Nothing to wait on when not attached, proceed.
}
}
@@ -932,6 +1206,8 @@ mod tests {
use std::sync::Arc;
use tracing::{info_span, Instrument};
use crate::tenant::mgr::TenantSlot;
use super::{super::harness::TenantHarness, TenantsMap};
#[tokio::test(start_paused = true)]
@@ -953,7 +1229,7 @@ mod tests {
// tenant harness configures the logging and we cannot escape it
let _e = info_span!("testing", tenant_id = %id).entered();
let tenants = HashMap::from([(id, t.clone())]);
let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants)));
let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();

View File

@@ -549,7 +549,7 @@ impl DeltaLayer {
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
pub(crate) async fn load_keys<'c>(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.await
@@ -1038,9 +1038,9 @@ pub struct ValueRef<'a> {
reader: BlockCursor<'a>,
}
impl<'a> ValueRef<'a> {
impl<'c, 'a> ValueRef<'a> {
/// Loads the value from disk
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
pub async fn load(&self, ctx: &'c RequestContext) -> Result<Value> {
// theoretically we *could* record an access time for each, but it does not really matter
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
@@ -1051,11 +1051,11 @@ impl<'a> ValueRef<'a> {
pub(crate) struct Adapter<T>(T);
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) async fn read_blk(
pub(crate) async fn read_blk<'c>(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
ctx: &'c RequestContext,
) -> Result<BlockLease<'c, '_>, std::io::Error> {
self.0.as_ref().file.read_blk(blknum, ctx).await
}
}

View File

@@ -91,12 +91,12 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::debug_assert_current_span_has_tenant_and_timeline_id;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -149,7 +149,7 @@ pub struct TimelineResources {
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
myself: Weak<Self>,
@@ -158,6 +158,9 @@ pub struct Timeline {
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
/// Never changes for the lifetime of this [`Timeline`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
pub pg_version: u32,
@@ -502,7 +505,7 @@ impl Timeline {
timer.stop_and_record();
let start = Instant::now();
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let res = self.reconstruct_value(key, lsn, reconstruct_state, ctx).await;
let elapsed = start.elapsed();
crate::metrics::RECONSTRUCT_TIME
.for_result(&res)
@@ -1378,42 +1381,42 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
fn get_eviction_policy(&self) -> EvictionPolicy {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.eviction_policy
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
@@ -1429,7 +1432,7 @@ impl Timeline {
}
fn get_gc_feedback(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.gc_feedback
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
@@ -1442,7 +1445,7 @@ impl Timeline {
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap(),
&self.tenant_conf.read().unwrap().tenant_conf,
&self.conf.default_tenant_conf,
);
let tenant_id_str = self.tenant_id.to_string();
@@ -1461,7 +1464,7 @@ impl Timeline {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
timeline_id: TimelineId,
@@ -1484,7 +1487,7 @@ impl Timeline {
let evictions_low_residence_duration_metric_threshold =
Self::get_evictions_low_residence_duration_metric_threshold(
&tenant_conf_guard,
&tenant_conf_guard.tenant_conf,
&conf.default_tenant_conf,
);
drop(tenant_conf_guard);
@@ -1649,12 +1652,15 @@ impl Timeline {
let tenant_conf_guard = self.tenant_conf.read().unwrap();
let wal_connect_timeout = tenant_conf_guard
.tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
let lagging_wal_timeout = tenant_conf_guard
.tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
@@ -4273,6 +4279,7 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
@@ -4336,6 +4343,7 @@ impl Timeline {
key,
last_rec_lsn,
&img,
ctx,
)
.await
.context("Materialized page memoization failed")

View File

@@ -16,7 +16,7 @@
use std::{
collections::HashMap,
ops::ControlFlow,
sync::Arc,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
@@ -25,7 +25,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
context::{DownloadBehavior, RequestContext, RequestContextBuilder},
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
@@ -397,9 +397,14 @@ impl Timeline {
}
}
let permit = crate::page_cache::get().get_permit().await;
let ctx = RequestContextBuilder::extend(ctx)
.page_cache_permit(permit)
.build();
// imitiate repartiting on first compactation
if let Err(e) = self
.collect_keyspace(lsn, ctx)
.collect_keyspace(lsn, &ctx)
.instrument(info_span!("collect_keyspace"))
.await
{

View File

@@ -15,7 +15,7 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
@@ -173,148 +173,55 @@ impl OpenFiles {
}
}
/// Call this when the local filesystem gives us an error with an external
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error) -> ! {
tracing::error!("Fatal I/O error: {}", &e);
std::process::abort();
#[derive(Debug, thiserror::Error)]
pub enum CrashsafeOverwriteError {
#[error("final path has no parent dir")]
FinalPathHasNoParentDir,
#[error("remove tempfile")]
RemovePreviousTempfile(#[source] std::io::Error),
#[error("create tempfile")]
CreateTempfile(#[source] std::io::Error),
#[error("write tempfile")]
WriteContents(#[source] std::io::Error),
#[error("sync tempfile")]
SyncTempfile(#[source] std::io::Error),
#[error("rename tempfile to final path")]
RenameTempfileToFinalPath(#[source] std::io::Error),
#[error("open final path parent dir")]
OpenFinalPathParentDir(#[source] std::io::Error),
#[error("sync final path parent dir")]
SyncFinalPathParentDir(#[source] std::io::Error),
}
/// Identify error types that should alwways terminate the process. Other
/// error types may be elegible for retry.
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
use nix::errno::Errno::*;
match e.raw_os_error().map(nix::errno::from_i32) {
Some(EIO) => {
// Terminate on EIO because we no longer trust the device to store
// data safely, or to uphold persistence guarantees on fsync.
true
}
Some(EROFS) => {
// Terminate on EROFS because a filesystem is usually remounted
// readonly when it has experienced some critical issue, so the same
// logic as EIO applies.
true
}
Some(EACCES) => {
// Terminate on EACCESS because we should always have permissions
// for our own data dir: if we don't, then we can't do our job and
// need administrative intervention to fix permissions. Terminating
// is the best way to make sure we stop cleanly rather than going
// into infinite retry loops, and will make it clear to the outside
// world that we need help.
true
}
_ => {
// Treat all other local file I/O errors are retryable. This includes:
// - ENOSPC: we stay up and wait for eviction to free some space
// - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
// - WriteZero, Interrupted: these are used internally VirtualFile
false
}
}
}
/// Wrap std::io::Error with a behavior where we will terminate the process
/// on most I/O errors from local storage. The rational for terminating is:
/// - EIO means we can't trust the drive any more
/// - EROFS means the local filesystem or drive is damaged, we shouldn't use it any more
/// - EACCESS means something is fatally misconfigured about the pageserver, such
/// as running the process as the wrong user, or the filesystem having the wrong
/// ownership or permission bits. We terminate so that it's obvious to
/// the operator why the pageserver isn't working, and they can restart it when
/// they've fixed the problem.
#[derive(thiserror::Error, Debug)]
pub struct Error {
inner: std::io::Error,
context: Option<String>,
}
impl Error {
/// Wrap a io::Error with some context & terminate
/// the process if the io::Error matches our policy for termination
fn new_with_context(e: std::io::Error, context: &str) -> Self {
Self::build(e, Some(context.to_string()))
}
fn context(e: Self, context: &str) -> Self {
Self {
inner: e.inner,
context: Some(context.to_string()),
}
}
fn new(e: std::io::Error) -> Self {
Self::build(e, None)
}
fn invalid(reason: &str) -> Self {
Self::new(std::io::Error::new(ErrorKind::InvalidInput, reason))
}
fn build(e: std::io::Error, context: Option<String>) -> Self {
// Construct instance early so that we have it for
// using Display in termination message.
let instance = Self { inner: e, context };
// Maybe terminate: this violates the usual expectation that callers
// should make their own decisions about how to handle an Error, but
// it's worthwhile to avoid every single user of the local filesystem
// having to apply the same "terminate on errors" behavior.
if is_fatal_io_error(&instance.inner) {
on_fatal_io_error(&instance.inner);
}
instance
}
fn kind(&self) -> ErrorKind {
self.inner.kind()
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Self::build(e, None)
}
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> std::io::Error {
e.inner
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.context {
Some(context) => {
write!(f, "{}: {}", context, self.inner)
}
None => self.inner.fmt(f),
impl CrashsafeOverwriteError {
/// Returns true iff the new contents are durably stored.
pub fn are_new_contents_durable(&self) -> bool {
match self {
Self::FinalPathHasNoParentDir => false,
Self::RemovePreviousTempfile(_) => false,
Self::CreateTempfile(_) => false,
Self::WriteContents(_) => false,
Self::SyncTempfile(_) => false,
Self::RenameTempfileToFinalPath(_) => false,
Self::OpenFinalPathParentDir(_) => false,
Self::SyncFinalPathParentDir(_) => true,
}
}
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
.await
.map_err(Error::from)
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, Error> {
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
.map_err(Error::from)
}
/// Open a file with given options.
@@ -325,7 +232,7 @@ impl VirtualFile {
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, Error> {
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
@@ -377,16 +284,14 @@ impl VirtualFile {
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> Result<(), Error> {
let final_path_parent = final_path.parent().ok_or(std::io::Error::new(
ErrorKind::InvalidInput,
"Path must be absolute",
))?;
) -> Result<(), CrashsafeOverwriteError> {
let Some(final_path_parent) = final_path.parent() else {
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
};
match std::fs::remove_file(tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(Error::new_with_context(e, "removing tempfile")),
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
}
let mut file = Self::open_with_options(
tmp_path,
@@ -397,17 +302,17 @@ impl VirtualFile {
.create_new(true),
)
.await
.map_err(|e| Error::context(e, "create tempfile"))?;
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
.map_err(|e| Error::context(e, "write contents"))?;
.map_err(CrashsafeOverwriteError::WriteContents)?;
file.sync_all()
.await
.map_err(|e| Error::context(e, "sync tempfile"))?;
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)
.map_err(|e| Error::new_with_context(e, "rename tempfile to final path"))?;
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
@@ -416,11 +321,11 @@ impl VirtualFile {
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(|e| Error::context(e, "open final path parent"))?;
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
.await
.map_err(|e| Error::context(e, "sync final path parent"))?;
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
Ok(())
}
@@ -428,13 +333,11 @@ impl VirtualFile {
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
.map_err(Error::new)
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
.map_err(Error::new)
}
/// Helper function that looks up the underlying File for this VirtualFile,
@@ -529,10 +432,13 @@ impl VirtualFile {
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::invalid("offset would be negative"));
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
}
if pos > u64::MAX as i128 {
return Err(Error::invalid("offset overflow"));
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
}
self.pos = pos as u64;
}
@@ -545,11 +451,10 @@ impl VirtualFile {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(std::io::Error::new(
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
)
.into())
))
}
Ok(n) => {
buf = &mut buf[n..];
@@ -567,11 +472,10 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write_at(buf, offset).await {
Ok(0) => {
return Err(std::io::Error::new(
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)
.into());
));
}
Ok(n) => {
buf = &buf[n..];
@@ -588,11 +492,10 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write(buf).await {
Ok(0) => {
return Err(std::io::Error::new(
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)
.into());
));
}
Ok(n) => {
buf = &buf[n..];
@@ -604,7 +507,7 @@ impl VirtualFile {
Ok(())
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos).await?;
self.pos += n as u64;
@@ -620,7 +523,7 @@ impl VirtualFile {
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result.map_err(Error::new)
result
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
@@ -632,7 +535,7 @@ impl VirtualFile {
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result.map_err(Error::new)
result
}
}
@@ -641,7 +544,7 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, Error> {
) -> Result<crate::tenant::block_io::BlockLease<'_, '_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
@@ -757,25 +660,25 @@ mod tests {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset).map_err(Error::new),
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset).map_err(Error::new),
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos).map_err(Error::new),
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf).map_err(Error::new),
MaybeVirtualFile::File(file) => file.write_all(buf),
}
}
@@ -984,7 +887,7 @@ mod tests {
hdls.push(hdl);
}
for hdl in hdls {
hdl.await.expect("joining")
hdl.await?;
}
std::mem::forget(rt);