Compare commits

...

18 Commits

Author SHA1 Message Date
John Spray
93e069ceae refactors 2023-12-11 15:29:37 +00:00
John Spray
90e27f1800 fix doc link 2023-12-07 10:29:36 +00:00
John Spray
0f533194d2 Avoid implict Weak ugprade in local_path 2023-12-07 10:05:06 +00:00
John Spray
5b181443b3 Merge remote-tracking branch 'upstream/main' into jcsp/layer-tenant-id 2023-12-07 09:49:12 +00:00
Joonas Koivunen
52718bb8ff fix(layer): metric splitting, span rename (#5902)
Per [feedback], split the Layer metrics, also finally account for lost
and [re-submitted feedback] on `layer_gc` by renaming it to
`layer_delete`, `Layer::garbage_collect_on_drop` renamed to
`Layer::delete_on_drop`. References to "gc" dropped from metric names
and elsewhere.

Also fixes how the cancellations were tracked: there was one rare
counter. Now there is a top level metric for cancelled inits, and the
rare "download failed but failed to communicate" counter is kept.

Fixes: #6027

[feedback]: https://github.com/neondatabase/neon/pull/5809#pullrequestreview-1720043251
[re-submitted feedback]: https://github.com/neondatabase/neon/pull/5108#discussion_r1401867311
2023-12-07 11:39:40 +02:00
Joonas Koivunen
10c77cb410 temp: increase the wait tenant activation timeout (#6058)
5s is causing way too much noise; this is of course a temporary fix, we
should prioritize tenants for which there are pagestream openings the
highest, second highest the basebackups.

Deployment thread for context:
https://neondb.slack.com/archives/C03H1K0PGKH/p1701935048144479?thread_ts=1701765158.926659&cid=C03H1K0PGKH
2023-12-07 09:01:08 +00:00
Heikki Linnakangas
31be301ef3 Make simple_rcu::RcuWaitList::wait() async (#6046)
The gc_timeline() function is async, but it calls the synchronous wait()
function. In the worst case, that could lead to a deadlock by using up
all tokio executor threads.

In the passing, fix a few typos in comments.

Fixes issue #6045.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-12-07 10:20:40 +02:00
Joonas Koivunen
a3c7d400b4 fix: avoid allocations with logging a slug (#6047)
to_string forces allocating a less than pointer sized string (costing on
stack 4 usize), using a Display formattable slug saves that. the
difference seems small, but at the same time, we log these a lot.
2023-12-07 07:25:22 +00:00
Vadim Kharitonov
7501ca6efb Revert timescaledb for pg14 and pg15 (#6056)
```
could not start the compute node: compute is in state "failed": db error: ERROR: could not access file "$libdir/timescaledb-2.10.1": No such file or directory Caused by: ERROR: could not access file "$libdir/timescaledb-2.10.1": No such file or directory
```
2023-12-06 15:12:36 +00:00
Christian Schwarz
987c9aaea0 virtual_file: fix the metric for close() calls done by VirtualFile::drop (#6051)
Before this PR we would inc() the counter for `Close` even though the
slot's FD had already been closed.

Especially visible when subtracting `open` from `close+close-by-replace`
on a system that does a lot of attach and detach.

refs https://github.com/neondatabase/cloud/issues/8440
refs https://github.com/neondatabase/cloud/issues/8351
2023-12-06 12:05:28 +00:00
John Spray
41401ea2b8 pageserver: make Timeline::shutdown safe against Layer::drop 2023-12-05 17:41:28 +00:00
John Spray
2a8197b7ce pageserver: avoid storing path in layer
We can build it on-demand via Timeline.
2023-12-05 17:41:28 +00:00
John Spray
d07bc7ba01 pageserver: don't store tenant/timeline ID on layers 2023-12-05 17:41:28 +00:00
Konstantin Knizhnik
7fab731f65 Track size of FSM fork while applying records at replica (#5901)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1700560921471619

## Summary of changes

Update relation size cache for FSM fork in WAL records filter

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-12-05 18:49:24 +02:00
John Spray
483caa22c6 pageserver: logging tweaks (#6039)
- The `Attaching tenant` log message omitted some useful information
like the generation and mode
- info-level messages about writing configuration files were
unnecessarily verbose
- During process shutdown, we don't emit logs about the various phases:
this is very cheap to log since we do it once per process lifetime, and
is helpful when figuring out where something got stuck during a hang.
2023-12-05 16:11:15 +00:00
John Spray
da5e03b0d8 pageserver: add a /reset API for tenants (#6014)
## Problem

Traditionally we would detach/attach directly with curl if we wanted to
"reboot" a single tenant. That's kind of inconvenient these days,
because one needs to know a generation number to issue an attach
request.

Closes: https://github.com/neondatabase/neon/issues/6011

## Summary of changes

- Introduce a new `/reset` API, which remembers the LocationConf from
the current attachment so that callers do not have to work out the
correct configuration/generation to use.
- As an additional support tool, allow an optional `drop_cache` query
parameter, for situations where we are concerned that some on-disk state
might be bad and want to clear that as well as the in-memory state.

One might wonder why I didn't call this "reattach" -- it's because
there's already a PS->CP API of that name and it could get confusing.
2023-12-05 15:38:27 +00:00
John Spray
be885370f6 pageserver: remove redundant unsafe_create_dir_all (#6040)
This non-fsyncing analog to our safe directory creation function was
just duplicating what tokio's fs::create_dir_all does.
2023-12-05 15:03:07 +00:00
Alexey Kondratov
bc1020f965 compute_ctl: Notify waiters when Postgres failed to start (#6034)
In case of configuring the empty compute, API handler is waiting on
condvar for compute state change. Yet, previously if Postgres failed to
start we were just setting compute status to `Failed` without notifying.
It causes a timeout on control plane side, although we can return a
proper error from compute earlier.

With this commit API handler should be properly notified.
2023-12-05 13:38:45 +01:00
26 changed files with 673 additions and 499 deletions

View File

@@ -387,10 +387,20 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN apt-get update && \
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
export TIMESCALEDB_VERSION=2.10.1 \
export TIMESCALEDB_CHECKSUM=6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 \
;; \
*) \
export TIMESCALEDB_VERSION=2.13.0 \
export TIMESCALEDB_CHECKSUM=584a351c7775f0e067eaa0e7277ea88cab9077cc4c455cbbf09a5d9723dce95d \
;; \
esac && \
apt-get update && \
apt-get install -y cmake && \
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.13.0.tar.gz -O timescaledb.tar.gz && \
echo "584a351c7775f0e067eaa0e7277ea88cab9077cc4c455cbbf09a5d9723dce95d timescaledb.tar.gz" | sha256sum --check && \
wget https://github.com/timescale/timescaledb/archive/refs/tags/${TIMESCALEDB_VERSION}.tar.gz -O timescaledb.tar.gz && \
echo "${TIMESCALEDB_CHECKSUM} timescaledb.tar.gz" | sha256sum --check && \
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON -DCMAKE_BUILD_TYPE=Release && \
cd build && \

View File

@@ -274,7 +274,13 @@ fn main() -> Result<()> {
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
drop(state);
// Notify others that Postgres failed to start. In case of configuring the
// empty compute, it's likely that API handler is still waiting for compute
// state change. With this we will notify it that compute is in Failed state,
// so control plane will know about it earlier and record proper error instead
// of timeout.
compute.state_changed.notify_all();
drop(state); // unlock
delay_exit = true;
None
}

View File

@@ -73,19 +73,28 @@ impl TenantShardId {
)
}
pub fn shard_slug(&self) -> String {
format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
ShardSlug(self)
}
}
/// Formatting helper
struct ShardSlug<'a>(&'a TenantShardId);
impl<'a> std::fmt::Display for ShardSlug<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:02x}{:02x}",
self.0.shard_number.0, self.0.shard_count.0
)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(
f,
"{}-{:02x}{:02x}",
self.tenant_id, self.shard_number.0, self.shard_count.0
)
write!(f, "{}-{}", self.tenant_id, self.shard_slug())
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).

View File

@@ -1,10 +1,10 @@
//!
//! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
//! similar to a lock, but it allows readers to "hold on" to an old value of RCU
//! without blocking writers, and allows writing a new values without blocking
//! readers. When you update the new value, the new value is immediately visible
//! without blocking writers, and allows writing a new value without blocking
//! readers. When you update the value, the new value is immediately visible
//! to new readers, but the update waits until all existing readers have
//! finishe, so that no one sees the old value anymore.
//! finished, so that on return, no one sees the old value anymore.
//!
//! This implementation isn't wait-free; it uses an RwLock that is held for a
//! short duration when the value is read or updated.
@@ -26,6 +26,7 @@
//! Increment the value by one, and wait for old readers to finish:
//!
//! ```
//! # async fn dox() {
//! # let rcu = utils::simple_rcu::Rcu::new(1);
//! let write_guard = rcu.lock_for_write();
//!
@@ -36,15 +37,17 @@
//!
//! // Concurrent reads and writes are now possible again. Wait for all the readers
//! // that still observe the old value to finish.
//! waitlist.wait();
//! waitlist.wait().await;
//! # }
//! ```
//!
#![warn(missing_docs)]
use std::ops::Deref;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Weak};
use std::sync::{Mutex, RwLock, RwLockWriteGuard};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::watch;
///
/// Rcu allows multiple readers to read and hold onto a value without blocking
@@ -68,22 +71,21 @@ struct RcuCell<V> {
value: V,
/// A dummy channel. We never send anything to this channel. The point is
/// that when the RcuCell is dropped, any cloned Senders will be notified
/// that when the RcuCell is dropped, any subscribed Receivers will be notified
/// that the channel is closed. Updaters can use this to wait out until the
/// RcuCell has been dropped, i.e. until the old value is no longer in use.
///
/// We never do anything with the receiver, we just need to hold onto it so
/// that the Senders will be notified when it's dropped. But because it's
/// not Sync, we need a Mutex on it.
watch: (SyncSender<()>, Mutex<Receiver<()>>),
/// We never send anything to this, we just need to hold onto it so that the
/// Receivers will be notified when it's dropped.
watch: watch::Sender<()>,
}
impl<V> RcuCell<V> {
fn new(value: V) -> Self {
let (watch_sender, watch_receiver) = sync_channel(0);
let (watch_sender, _) = watch::channel(());
RcuCell {
value,
watch: (watch_sender, Mutex::new(watch_receiver)),
watch: watch_sender,
}
}
}
@@ -141,10 +143,10 @@ impl<V> Deref for RcuReadGuard<V> {
///
/// Write guard returned by `write`
///
/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so
/// it should only be held for a short duration!
/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
/// held for a short duration!
///
/// Calling `store` consumes the guard, making new reads and new writes possible
/// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
/// again.
///
pub struct RcuWriteGuard<'a, V> {
@@ -179,7 +181,7 @@ impl<'a, V> RcuWriteGuard<'a, V> {
// the watches for any that do.
self.inner.old_cells.retain(|weak| {
if let Some(cell) = weak.upgrade() {
watches.push(cell.watch.0.clone());
watches.push(cell.watch.subscribe());
true
} else {
false
@@ -193,20 +195,20 @@ impl<'a, V> RcuWriteGuard<'a, V> {
///
/// List of readers who can still see old values.
///
pub struct RcuWaitList(Vec<SyncSender<()>>);
pub struct RcuWaitList(Vec<watch::Receiver<()>>);
impl RcuWaitList {
///
/// Wait for old readers to finish.
///
pub fn wait(mut self) {
pub async fn wait(mut self) {
// after all the old_cells are no longer in use, we're done
for w in self.0.iter_mut() {
// This will block until the Receiver is closed. That happens when
// the RcuCell is dropped.
#[allow(clippy::single_match)]
match w.send(()) {
Ok(_) => panic!("send() unexpectedly succeeded on dummy channel"),
match w.changed().await {
Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
Err(_) => {
// closed, which means that the cell has been dropped, and
// its value is no longer in use
@@ -220,11 +222,10 @@ impl RcuWaitList {
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;
#[test]
fn two_writers() {
#[tokio::test]
async fn two_writers() {
let rcu = Rcu::new(1);
let read1 = rcu.read();
@@ -248,33 +249,35 @@ mod tests {
assert_eq!(*read1, 1);
let log = Arc::new(Mutex::new(Vec::new()));
// Wait for the old readers to finish in separate threads.
// Wait for the old readers to finish in separate tasks.
let log_clone = Arc::clone(&log);
let thread2 = spawn(move || {
wait2.wait();
let task2 = tokio::spawn(async move {
wait2.wait().await;
log_clone.lock().unwrap().push("wait2 done");
});
let log_clone = Arc::clone(&log);
let thread3 = spawn(move || {
wait3.wait();
let task3 = tokio::spawn(async move {
wait3.wait().await;
log_clone.lock().unwrap().push("wait3 done");
});
// without this sleep the test can pass on accident if the writer is slow
sleep(Duration::from_millis(500));
tokio::time::sleep(Duration::from_millis(100)).await;
// Release first reader. This allows first write to finish, but calling
// wait() on the second one would still block.
// wait() on the 'task3' would still block.
log.lock().unwrap().push("dropping read1");
drop(read1);
thread2.join().unwrap();
task2.await.unwrap();
sleep(Duration::from_millis(500));
assert!(!task3.is_finished());
tokio::time::sleep(Duration::from_millis(100)).await;
// Release second reader, and finish second writer.
log.lock().unwrap().push("dropping read2");
drop(read2);
thread3.join().unwrap();
task3.await.unwrap();
assert_eq!(
log.lock().unwrap().as_slice(),

View File

@@ -3,7 +3,6 @@ use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::storage_layer::PersistentLayerDesc;
use pageserver_api::shard::TenantShardId;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
@@ -11,7 +10,6 @@ use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Instant;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -211,13 +209,8 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = PersistentLayerDesc::new_img(
TenantShardId::unsharded(TenantId::generate()),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
0,
);
let layer =
PersistentLayerDesc::new_img(zero.add(10 * i32)..zero.add(10 * i32 + 1), Lsn(i), 0);
updates.insert_historic(layer);
}
updates.flush();

View File

@@ -310,8 +310,8 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
.unwrap()
.as_micros(),
partition,
desc.tenant_shard_id,
desc.timeline_id,
candidate.timeline.tenant_shard_id,
candidate.timeline.timeline_id,
candidate.layer,
);
}

View File

@@ -709,6 +709,26 @@ async fn tenant_detach_handler(
json_response(StatusCode::OK, ())
}
async fn tenant_reset_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
state
.tenant_manager
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn tenant_load_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
@@ -824,7 +844,7 @@ async fn tenant_delete_handler(
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id)
.instrument(info_span!("tenant_delete_handler",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
shard = %tenant_shard_id.shard_slug()
))
.await?;
@@ -1173,7 +1193,7 @@ async fn put_tenant_location_config_handler(
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
shard = %tenant_shard_id.shard_slug()
))
.await
{
@@ -1828,6 +1848,9 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/detach", |r| {
api_handler(r, tenant_detach_handler)
})
.post("/v1/tenant/:tenant_shard_id/reset", |r| {
api_handler(r, tenant_reset_handler)
})
.post("/v1/tenant/:tenant_id/load", |r| {
api_handler(r, tenant_load_handler)
})

View File

@@ -205,7 +205,7 @@ async fn timed<Fut: std::future::Future>(
match tokio::time::timeout(warn_at, &mut fut).await {
Ok(ret) => {
tracing::info!(
task = name,
stage = name,
elapsed_ms = started.elapsed().as_millis(),
"completed"
);
@@ -213,7 +213,7 @@ async fn timed<Fut: std::future::Future>(
}
Err(_) => {
tracing::info!(
task = name,
stage = name,
elapsed_ms = started.elapsed().as_millis(),
"still waiting, taking longer than expected..."
);
@@ -222,7 +222,7 @@ async fn timed<Fut: std::future::Future>(
// this has a global allowed_errors
tracing::warn!(
task = name,
stage = name,
elapsed_ms = started.elapsed().as_millis(),
"completed, took longer than expected"
);

View File

@@ -2094,6 +2094,8 @@ pub fn preinitialize_metrics() {
// Tenant manager stats
Lazy::force(&TENANT_MANAGER);
Lazy::force(&crate::tenant::storage_layer::layer::LAYER_IMPL_METRICS);
// countervecs
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
.into_iter()

View File

@@ -67,9 +67,9 @@ use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
// How long we may block waiting for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
// How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
// is not yet in state [`TenantState::Active`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Read the end of a tar archive.
///

View File

@@ -2515,7 +2515,7 @@ impl Tenant {
}
}
info!("persisting tenantconf to {config_path}");
debug!("persisting tenantconf to {config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
@@ -2550,7 +2550,7 @@ impl Tenant {
target_config_path: &Utf8Path,
tenant_conf: &TenantConfOpt,
) -> anyhow::Result<()> {
info!("persisting tenantconf to {target_config_path}");
debug!("persisting tenantconf to {target_config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.

View File

@@ -270,49 +270,6 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
/// Create a directory, including parents. This does no fsyncs and makes
/// no guarantees about the persistence of the resulting metadata: for
/// use when creating dirs for use as cache.
async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> {
let mut dirs_to_create = Vec::new();
let mut path: &Utf8Path = path.as_ref();
// Figure out which directories we need to create.
loop {
let meta = tokio::fs::metadata(path).await;
match meta {
Ok(metadata) if metadata.is_dir() => break,
Ok(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
format!("non-directory found in path: {path}"),
));
}
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e),
}
dirs_to_create.push(path);
match path.parent() {
Some(parent) => path = parent,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("can't find parent of path '{path}'"),
));
}
}
}
// Create directories from parent to child.
for &path in dirs_to_create.iter().rev() {
tokio::fs::create_dir(path).await?;
}
Ok(())
}
/// The TenantManager is responsible for storing and mutating the collection of all tenants
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
/// lives inside the TenantManager.
@@ -646,7 +603,13 @@ pub(crate) fn tenant_spawn(
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
);
info!("Attaching tenant {tenant_shard_id}");
info!(
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
generation = ?location_conf.location.generation,
attach_mode = ?location_conf.location.attach_mode,
"Attaching tenant"
);
let tenant = match Tenant::spawn(
conf,
tenant_shard_id,
@@ -1035,7 +998,7 @@ impl TenantManager {
LocationMode::Secondary(_) => {
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path)
tokio::fs::create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
@@ -1051,7 +1014,7 @@ impl TenantManager {
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
// re-attached, see https://github.com/neondatabase/neon/issues/5550
unsafe_create_dir_all(&timelines_path)
tokio::fs::create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {timelines_path}"))?;
@@ -1081,6 +1044,81 @@ impl TenantManager {
Ok(())
}
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
/// dropped before re-attaching.
///
/// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
/// where an issue is identified that would go away with a restart of the tenant.
///
/// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
/// to respect the cancellation tokens used in normal shutdown().
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
pub(crate) async fn reset_tenant(
&self,
tenant_shard_id: TenantShardId,
drop_cache: bool,
ctx: RequestContext,
) -> anyhow::Result<()> {
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let Some(old_slot) = slot_guard.get_old_value() else {
anyhow::bail!("Tenant not found when trying to reset");
};
let Some(tenant) = old_slot.get_attached() else {
slot_guard.revert();
anyhow::bail!("Tenant is not in attached state");
};
let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, false).await {
Ok(()) => {
slot_guard.drop_old_value()?;
}
Err(_barrier) => {
slot_guard.revert();
anyhow::bail!("Cannot reset Tenant, already shutting down");
}
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
if drop_cache {
tracing::info!("Dropping local file cache");
match tokio::fs::read_dir(&timelines_path).await {
Err(e) => {
tracing::warn!("Failed to list timelines while dropping cache: {}", e);
}
Ok(mut entries) => {
while let Some(entry) = entries.next_entry().await? {
tokio::fs::remove_dir_all(entry.path()).await?;
}
}
}
}
let shard_identity = config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
self.tenants,
SpawnMode::Normal,
&ctx,
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
@@ -1289,8 +1327,7 @@ pub(crate) async fn delete_tenant(
// See https://github.com/neondatabase/neon/issues/5080
// TODO(sharding): make delete API sharding-aware
let mut slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
// unwrap is safe because we used MustExist mode when acquiring
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
@@ -1617,9 +1654,10 @@ pub enum TenantSlotUpsertError {
MapState(#[from] TenantMapError),
}
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
enum TenantSlotDropError {
/// It is only legal to drop a TenantSlot if its contents are fully shut down
#[error("Tenant was not shut down")]
NotShutdown,
}
@@ -1679,9 +1717,9 @@ impl SlotGuard {
}
}
/// Take any value that was present in the slot before we acquired ownership
/// Get any value that was present in the slot before we acquired ownership
/// of it: in state transitions, this will be the old state.
fn get_old_value(&mut self) -> &Option<TenantSlot> {
fn get_old_value(&self) -> &Option<TenantSlot> {
&self.old_value
}
@@ -1899,7 +1937,7 @@ fn tenant_map_acquire_slot_impl(
METRICS.tenant_slot_writes.inc();
let mut locked = tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard=tenant_shard_id.shard_slug());
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard = %tenant_shard_id.shard_slug());
let _guard = span.enter();
let m = match &mut *locked {

View File

@@ -1271,11 +1271,12 @@ impl RemoteTimelineClient {
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
let path = layer.local_path();
let path = layer.local_path_from_id(&self.tenant_shard_id, &self.timeline_id);
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
path,
&path,
layer_metadata,
self.generation,
)

View File

@@ -4,7 +4,7 @@ pub mod delta_layer;
mod filename;
pub mod image_layer;
mod inmemory_layer;
mod layer;
pub(crate) mod layer;
mod layer_desc;
use crate::context::{AccessStatsBehavior, RequestContext};
@@ -24,7 +24,7 @@ use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::rate_limit::RateLimit;
use utils::{id::TimelineId, lsn::Lsn};
use utils::lsn::Lsn;
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
@@ -301,31 +301,17 @@ pub trait AsLayerDesc {
}
pub mod tests {
use pageserver_api::shard::TenantShardId;
use super::*;
impl From<DeltaFileName> for PersistentLayerDesc {
fn from(value: DeltaFileName) -> Self {
PersistentLayerDesc::new_delta(
TenantShardId::from([0; 18]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
)
PersistentLayerDesc::new_delta(value.key_range, value.lsn_range, 233)
}
}
impl From<ImageFileName> for PersistentLayerDesc {
fn from(value: ImageFileName) -> Self {
PersistentLayerDesc::new_img(
TenantShardId::from([0; 18]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
233,
)
PersistentLayerDesc::new_img(value.key_range, value.lsn, 233)
}
}

View File

@@ -84,17 +84,6 @@ pub struct Summary {
pub index_root_blk: u32,
}
impl From<&DeltaLayer> for Summary {
fn from(layer: &DeltaLayer) -> Self {
Self::expected(
layer.desc.tenant_shard_id.tenant_id,
layer.desc.timeline_id,
layer.desc.key_range.clone(),
layer.desc.lsn_range.clone(),
)
}
}
impl Summary {
pub(super) fn expected(
tenant_id: TenantId,
@@ -320,15 +309,9 @@ impl DeltaLayer {
.metadata()
.context("get file metadata to determine size")?;
// TODO(sharding): we must get the TenantShardId from the path instead of reading the Summary.
// we should also validate the path against the Summary, as both should contain the same tenant, timeline, key, lsn.
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
Ok(DeltaLayer {
path: path.to_path_buf(),
desc: PersistentLayerDesc::new_delta(
tenant_shard_id,
summary.timeline_id,
summary.key_range,
summary.lsn_range,
metadata.len(),
@@ -505,8 +488,6 @@ impl DeltaLayerWriterInner {
// set inner.file here. The first read will have to re-open it.
let desc = PersistentLayerDesc::new_delta(
self.tenant_shard_id,
self.timeline_id,
self.key_start..key_end,
self.lsn_range.clone(),
metadata.len(),
@@ -517,7 +498,7 @@ impl DeltaLayerWriterInner {
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
trace!("created delta layer {}", layer.local_path());
trace!("created delta layer {}", self.path);
Ok(layer)
}

View File

@@ -85,17 +85,6 @@ pub struct Summary {
// the 'values' part starts after the summary header, on block 1.
}
impl From<&ImageLayer> for Summary {
fn from(layer: &ImageLayer) -> Self {
Self::expected(
layer.desc.tenant_shard_id.tenant_id,
layer.desc.timeline_id,
layer.desc.key_range.clone(),
layer.lsn,
)
}
}
impl Summary {
pub(super) fn expected(
tenant_id: TenantId,
@@ -278,19 +267,9 @@ impl ImageLayer {
.metadata()
.context("get file metadata to determine size")?;
// TODO(sharding): we should get TenantShardId from path.
// OR, not at all: any layer we load from disk should also get reconciled with remote IndexPart.
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
Ok(ImageLayer {
path: path.to_path_buf(),
desc: PersistentLayerDesc::new_img(
tenant_shard_id,
summary.timeline_id,
summary.key_range,
summary.lsn,
metadata.len(),
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
desc: PersistentLayerDesc::new_img(summary.key_range, summary.lsn, metadata.len()), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: summary.lsn,
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: OnceCell::new(),
@@ -581,13 +560,7 @@ impl ImageLayerWriterInner {
.await
.context("get metadata to determine file size")?;
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
self.timeline_id,
self.key_range.clone(),
self.lsn,
metadata.len(),
);
let desc = PersistentLayerDesc::new_img(self.key_range.clone(), self.lsn, metadata.len());
// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
@@ -599,7 +572,7 @@ impl ImageLayerWriterInner {
// FIXME: why not carry the virtualfile here, it supports renaming?
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
trace!("created image layer {}", layer.local_path());
trace!("created image layer {}", self.path);
Ok(layer)
}

View File

@@ -3,13 +3,15 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use tracing::Instrument;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;
use crate::config::PageServerConf;
@@ -81,12 +83,7 @@ impl Layer {
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> Self {
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_shard_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
);
let desc = PersistentLayerDesc::from_filename(file_name, metadata.file_size());
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
@@ -100,7 +97,7 @@ impl Layer {
metadata.shard,
)));
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
debug_assert!(owner.0.needs_download_blocking(timeline).unwrap().is_some());
owner
}
@@ -112,12 +109,7 @@ impl Layer {
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> ResidentLayer {
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_shard_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
);
let desc = PersistentLayerDesc::from_filename(file_name, metadata.file_size());
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
@@ -144,7 +136,7 @@ impl Layer {
let downloaded = resident.expect("just initialized");
debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
debug_assert!(owner.0.needs_download_blocking(timeline).unwrap().is_none());
timeline
.metrics
@@ -189,7 +181,7 @@ impl Layer {
let downloaded = resident.expect("just initialized");
// if the rename works, the path is as expected
std::fs::rename(temp_path, owner.local_path())
std::fs::rename(temp_path, owner.local_path(timeline))
.with_context(|| format!("rename temporary file as correct path for {owner}"))?;
Ok(ResidentLayer { downloaded, owner })
@@ -222,8 +214,8 @@ impl Layer {
///
/// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
/// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
pub(crate) fn garbage_collect_on_drop(&self) {
self.0.garbage_collect_on_drop();
pub(crate) fn delete_on_drop(&self) {
self.0.delete_on_drop();
}
/// Return data needed to reconstruct given page at LSN.
@@ -309,8 +301,12 @@ impl Layer {
&self.0.access_stats
}
pub(crate) fn local_path(&self) -> &Utf8Path {
&self.0.path
fn local_path(&self, timeline: &Timeline) -> Utf8PathBuf {
self.0.local_path(timeline)
}
pub(crate) fn filename(&self) -> LayerFileName {
self.0.desc.filename()
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
@@ -331,10 +327,10 @@ impl Layer {
Ok(())
}
/// Waits until this layer has been dropped (and if needed, local garbage collection and remote
/// Waits until this layer has been dropped (and if needed, local file deletion and remote
/// deletion scheduling has completed).
///
/// Does not start garbage collection, use [`Self::garbage_collect_on_drop`] for that
/// Does not start local deletion, use [`Self::delete_on_drop`] for that
/// separatedly.
#[cfg(feature = "testing")]
pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
@@ -402,13 +398,9 @@ impl ResidentOrWantedEvicted {
}
struct LayerInner {
/// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
/// [`Self::path`].
/// Only needed to check ondemand_download_behavior_treat_error_as_warn and in [`Self::local_path_from_id`]
conf: &'static PageServerConf,
/// Full path to the file; unclear if this should exist anymore.
path: Utf8PathBuf,
desc: PersistentLayerDesc,
/// Timeline access is needed for remote timeline client and metrics.
@@ -423,8 +415,8 @@ struct LayerInner {
/// Initialization and deinitialization are done while holding a permit.
inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
/// Do we want to garbage collect this when `LayerInner` is dropped
wanted_garbage_collected: AtomicBool,
/// Do we want to delete locally and remotely this when `LayerInner` is dropped
wanted_deleted: AtomicBool,
/// Do we want to evict this layer as soon as possible? After being set to `true`, all accesses
/// will try to downgrade [`ResidentOrWantedEvicted`], which will eventually trigger
@@ -438,10 +430,6 @@ struct LayerInner {
version: AtomicUsize,
/// Allow subscribing to when the layer actually gets evicted.
///
/// If in future we need to implement "wait until layer instances are gone and done", carrying
/// this over to the gc spawn_blocking from LayerInner::drop will do the trick, and adding a
/// method for "wait_gc" which will wait to this being closed.
status: tokio::sync::broadcast::Sender<Status>,
/// Counter for exponential backoff with the download
@@ -483,19 +471,39 @@ enum Status {
impl Drop for LayerInner {
fn drop(&mut self) {
if !*self.wanted_garbage_collected.get_mut() {
if !*self.wanted_deleted.get_mut() {
// should we try to evict if the last wish was for eviction?
// feels like there's some hazard of overcrowding near shutdown near by, but we don't
// run drops during shutdown (yet)
return;
}
let span = tracing::info_span!(parent: None, "layer_gc", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
// We will only do I/O on drop if our Timeline still exists. Otherwise, we may safely
// leave garbage layers behind to be cleaned up the next time this Timeline is instantiated.
let Some(timeline) = self.timeline.upgrade() else {
// no need to nag that timeline is gone: under normal situation on
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
return;
};
// We will only do I/O during drop if our Timeline's layer_gate is open: this avoids
// the risk that we would race with Timeline::shutdown and end up doing I/O to a timeline
// path for which the Timeline object has been torn down already.
let _gate_guard = match timeline.layer_gate.enter() {
Ok(g) => g,
Err(GateError::GateClosed) => {
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
return;
}
};
// If timeline is alive, we can construct a span with IDs for this function.
let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %timeline.tenant_shard_id.tenant_id, shard_id=%timeline.tenant_shard_id.shard_slug(), timeline_id = %timeline.timeline_id);
let path = self.local_path(&timeline);
let path = std::mem::take(&mut self.path);
let file_name = self.layer_desc().filename();
let file_size = self.layer_desc().file_size;
let timeline = self.timeline.clone();
let meta = self.metadata();
let status = self.status.clone();
@@ -517,38 +525,32 @@ impl Drop for LayerInner {
false
}
Err(e) => {
tracing::error!("failed to remove garbage collected layer: {e}");
LAYER_IMPL_METRICS.inc_gc_removes_failed();
tracing::error!("failed to remove wanted deleted layer: {e}");
LAYER_IMPL_METRICS.inc_delete_removes_failed();
false
}
};
if let Some(timeline) = timeline.upgrade() {
if removed {
timeline.metrics.resident_physical_size_sub(file_size);
}
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
if removed {
timeline.metrics.resident_physical_size_sub(file_size);
}
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
if let Err(e) = res {
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
// demonstrating this deadlock (without spawn_blocking): stop will drop
// queued items, which will have ResidentLayer's, and those drops would try
// to re-entrantly lock the RemoteTimelineClient inner state.
if !timeline.is_active() {
tracing::info!("scheduling deletion on drop failed: {e:#}");
} else {
tracing::warn!("scheduling deletion on drop failed: {e:#}");
}
LAYER_IMPL_METRICS.inc_gcs_failed(GcFailed::DeleteSchedulingFailed);
if let Err(e) = res {
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
// demonstrating this deadlock (without spawn_blocking): stop will drop
// queued items, which will have ResidentLayer's, and those drops would try
// to re-entrantly lock the RemoteTimelineClient inner state.
if !timeline.is_active() {
tracing::info!("scheduling deletion on drop failed: {e:#}");
} else {
LAYER_IMPL_METRICS.inc_completed_gcs();
tracing::warn!("scheduling deletion on drop failed: {e:#}");
}
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
} else {
LAYER_IMPL_METRICS.inc_completed_deletes();
}
} else {
// no need to nag that timeline is gone: under normal situation on
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
LAYER_IMPL_METRICS.inc_gcs_failed(GcFailed::TimelineGone);
}
});
}
@@ -564,10 +566,6 @@ impl LayerInner {
generation: Generation,
shard: ShardIndex,
) -> Self {
let path = conf
.timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id)
.join(desc.filename().to_string());
let (inner, version) = if let Some(inner) = downloaded {
let version = inner.version;
let resident = ResidentOrWantedEvicted::Resident(inner);
@@ -578,12 +576,11 @@ impl LayerInner {
LayerInner {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats,
wanted_garbage_collected: AtomicBool::new(false),
wanted_deleted: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner,
version: AtomicUsize::new(version),
@@ -594,16 +591,32 @@ impl LayerInner {
}
}
fn garbage_collect_on_drop(&self) {
let res = self.wanted_garbage_collected.compare_exchange(
false,
true,
Ordering::Release,
Ordering::Relaxed,
);
/// All call sites that need this function should already have a Timeline (e.g. from
/// upgrading the Self::timeline weak pointer) -- it doesn't make sense to try and
/// do anything with the local file if the Timeline isn't still alive.
fn local_path(&self, timeline: &Timeline) -> Utf8PathBuf {
self.local_path_from_id(&timeline.tenant_shard_id, &timeline.timeline_id)
}
/// Use this instead of `local_path` if you don't have a Timeline but do have its ID: this
/// is used by external callers such as [`crate::tenant::RemoteTimelineClient`]
pub(crate) fn local_path_from_id(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Utf8PathBuf {
self.conf
.timeline_path(tenant_shard_id, timeline_id)
.join(self.desc.filename().to_string())
}
fn delete_on_drop(&self) {
let res =
self.wanted_deleted
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
if res.is_ok() {
LAYER_IMPL_METRICS.inc_started_gcs();
LAYER_IMPL_METRICS.inc_started_deletes();
}
}
@@ -671,6 +684,10 @@ impl LayerInner {
// disable any scheduled but not yet running eviction deletions for this
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
// count cancellations, which currently remain largely unexpected
let init_cancelled =
scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
// no need to make the evict_and_wait wait for the actual download to complete
drop(self.status.send(Status::Downloaded));
@@ -679,12 +696,14 @@ impl LayerInner {
.upgrade()
.ok_or_else(|| DownloadError::TimelineShutdown)?;
// FIXME: grab a gate
let can_ever_evict = timeline.remote_client.as_ref().is_some();
// check if we really need to be downloaded; could have been already downloaded by a
// cancelled previous attempt.
let needs_download = self
.needs_download()
.needs_download(&timeline)
.await
.map_err(DownloadError::PreStatFailed)?;
@@ -739,6 +758,8 @@ impl LayerInner {
tracing::info!(waiters, "completing the on-demand download for other tasks");
}
scopeguard::ScopeGuard::into_inner(init_cancelled);
Ok((ResidentOrWantedEvicted::Resident(res), permit))
};
@@ -832,12 +853,13 @@ impl LayerInner {
// block tenant::mgr::remove_tenant_from_memory.
let this: Arc<Self> = self.clone();
let timeline_clone = timeline.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
crate::task_mgr::TaskKind::RemoteDownloadTask,
Some(self.desc.tenant_shard_id.tenant_id),
Some(self.desc.timeline_id),
Some(timeline.tenant_shard_id.tenant_id),
Some(timeline.timeline_id),
&task_name,
false,
async move {
@@ -867,14 +889,13 @@ impl LayerInner {
match res {
(Ok(()), _) => {
// our caller is cancellation safe so this is fine; if someone
// else requests the layer, they'll find it already downloaded
// or redownload.
// else requests the layer, they'll find it already downloaded.
//
// however, could be that we should consider marking the layer
// for eviction? alas, cannot: because only DownloadedLayer
// will handle that.
tracing::info!("layer file download completed after requester had cancelled");
LAYER_IMPL_METRICS.inc_download_completed_without_requester();
// See counter [`LayerImplMetrics::inc_init_needed_no_download`]
//
// FIXME(#6028): however, could be that we should consider marking the
// layer for eviction? alas, cannot: because only DownloadedLayer will
// handle that.
},
(Err(e), _) => {
// our caller is cancellation safe, but we might be racing with
@@ -894,7 +915,7 @@ impl LayerInner {
match rx.await {
Ok((Ok(()), permit)) => {
if let Some(reason) = self
.needs_download()
.needs_download(&timeline_clone)
.await
.map_err(DownloadError::PostStatFailed)?
{
@@ -929,16 +950,26 @@ impl LayerInner {
}
}
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match tokio::fs::metadata(&self.path).await {
async fn needs_download(
&self,
timeline: &Timeline,
) -> Result<Option<NeedsDownload>, std::io::Error> {
let path = self.local_path(timeline);
match tokio::fs::metadata(path).await {
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
Err(e) => Err(e),
}
}
fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match self.path.metadata() {
fn needs_download_blocking(
&self,
timeline: &Timeline,
) -> Result<Option<NeedsDownload>, std::io::Error> {
let path = self.local_path(timeline);
match path.metadata() {
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
Err(e) => Err(e),
@@ -994,14 +1025,20 @@ impl LayerInner {
/// `DownloadedLayer` is being dropped, so it calls this method.
fn on_downloaded_layer_drop(self: Arc<LayerInner>, version: usize) {
let gc = self.wanted_garbage_collected.load(Ordering::Acquire);
let delete = self.wanted_deleted.load(Ordering::Acquire);
let evict = self.wanted_evicted.load(Ordering::Acquire);
let can_evict = self.have_remote_client;
if gc {
// do nothing now, only in LayerInner::drop
if delete {
// do nothing now, only in LayerInner::drop -- this was originally implemented because
// we could had already scheduled the deletion at the time.
//
// FIXME: this is not true anymore, we can safely evict wanted deleted files.
} else if can_evict && evict {
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, %version);
// If timeline is alive, we can construct a span with IDs for this function.
let span = self.timeline.upgrade().map(|timeline| {
tracing::info_span!(parent: None, "layer_evict", tenant_id = %timeline.tenant_shard_id.tenant_id, shard_id=%timeline.tenant_shard_id.shard_slug(), timeline_id = %timeline.timeline_id)
});
// downgrade for queueing, in case there's a tear down already ongoing we should not
// hold it alive.
@@ -1012,9 +1049,9 @@ impl LayerInner {
// drop while the `self.inner` is being locked, leading to a deadlock.
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
let _g = span.entered();
let _g = span.map(|s| s.entered());
// if LayerInner is already dropped here, do nothing because the garbage collection
// if LayerInner is already dropped here, do nothing because the delete on drop
// has already ran while we were in queue
let Some(this) = this.upgrade() else {
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
@@ -1072,7 +1109,9 @@ impl LayerInner {
LayerResidenceEventReason::ResidenceChange,
);
let res = match capture_mtime_and_remove(&self.path) {
let local_path = self.local_path(&timeline);
let res = match capture_mtime_and_remove(&local_path) {
Ok(local_layer_mtime) => {
let duration = SystemTime::now().duration_since(local_layer_mtime);
match duration {
@@ -1224,6 +1263,11 @@ impl DownloadedLayer {
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> anyhow::Result<&'a LayerKind> {
let timeline = owner
.timeline
.upgrade()
.ok_or(DownloadError::TimelineShutdown)?;
let init = || async {
assert_eq!(
Weak::as_ptr(&self.owner),
@@ -1233,23 +1277,23 @@ impl DownloadedLayer {
let res = if owner.desc.is_delta {
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,
owner.desc.timeline_id,
timeline.tenant_shard_id.tenant_id,
timeline.timeline_id,
owner.desc.key_range.clone(),
owner.desc.lsn_range.clone(),
));
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
delta_layer::DeltaLayerInner::load(&owner.local_path(&timeline), summary, ctx)
.await
.map(|res| res.map(LayerKind::Delta))
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
owner.desc.tenant_shard_id.tenant_id,
owner.desc.timeline_id,
timeline.tenant_shard_id.tenant_id,
timeline.timeline_id,
owner.desc.key_range.clone(),
lsn,
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
image_layer::ImageLayerInner::load(&owner.local_path(&timeline), lsn, summary, ctx)
.await
.map(|res| res.map(LayerKind::Image))
};
@@ -1373,8 +1417,14 @@ impl ResidentLayer {
}
}
pub(crate) fn local_path(&self) -> &Utf8Path {
&self.owner.0.path
pub(crate) fn local_path_from_id(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Utf8PathBuf {
self.owner
.0
.local_path_from_id(tenant_shard_id, timeline_id)
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
@@ -1405,35 +1455,37 @@ impl From<ResidentLayer> for Layer {
}
}
use metrics::{IntCounter, IntCounterVec};
use metrics::IntCounter;
struct LayerImplMetrics {
pub(crate) struct LayerImplMetrics {
started_evictions: IntCounter,
completed_evictions: IntCounter,
cancelled_evictions: IntCounterVec,
cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
started_gcs: IntCounter,
completed_gcs: IntCounter,
failed_gcs: IntCounterVec,
started_deletes: IntCounter,
completed_deletes: IntCounter,
failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
rare_counters: IntCounterVec,
rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
}
impl Default for LayerImplMetrics {
fn default() -> Self {
let evictions = metrics::register_int_counter_vec!(
"pageserver_layer_evictions_count",
"Evictions started and completed in the Layer implementation",
&["state"]
use enum_map::Enum;
// reminder: these will be pageserver_layer_* with "_total" suffix
let started_evictions = metrics::register_int_counter!(
"pageserver_layer_started_evictions",
"Evictions started in the Layer implementation"
)
.unwrap();
let completed_evictions = metrics::register_int_counter!(
"pageserver_layer_completed_evictions",
"Evictions completed in the Layer implementation"
)
.unwrap();
let started_evictions = evictions
.get_metric_with_label_values(&["started"])
.unwrap();
let completed_evictions = evictions
.get_metric_with_label_values(&["completed"])
.unwrap();
let cancelled_evictions = metrics::register_int_counter_vec!(
"pageserver_layer_cancelled_evictions_count",
@@ -1442,24 +1494,36 @@ impl Default for LayerImplMetrics {
)
.unwrap();
// reminder: this will be pageserver_layer_gcs_count_total with "_total" suffix
let gcs = metrics::register_int_counter_vec!(
"pageserver_layer_gcs_count",
"Garbage collections started and completed in the Layer implementation",
&["state"]
let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let reason = EvictionCancelled::from_usize(i);
let s = reason.as_str();
cancelled_evictions.with_label_values(&[s])
}));
let started_deletes = metrics::register_int_counter!(
"pageserver_layer_started_deletes",
"Deletions on drop pending in the Layer implementation"
)
.unwrap();
let completed_deletes = metrics::register_int_counter!(
"pageserver_layer_completed_deletes",
"Deletions on drop completed in the Layer implementation"
)
.unwrap();
let started_gcs = gcs.get_metric_with_label_values(&["pending"]).unwrap();
let completed_gcs = gcs.get_metric_with_label_values(&["completed"]).unwrap();
let failed_gcs = metrics::register_int_counter_vec!(
"pageserver_layer_failed_gcs_count",
"Different reasons for garbage collections to have failed",
let failed_deletes = metrics::register_int_counter_vec!(
"pageserver_layer_failed_deletes_count",
"Different reasons for deletions on drop to have failed",
&["reason"]
)
.unwrap();
let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let reason = DeleteFailed::from_usize(i);
let s = reason.as_str();
failed_deletes.with_label_values(&[s])
}));
let rare_counters = metrics::register_int_counter_vec!(
"pageserver_layer_assumed_rare_count",
"Times unexpected or assumed rare event happened",
@@ -1467,16 +1531,29 @@ impl Default for LayerImplMetrics {
)
.unwrap();
let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let event = RareEvent::from_usize(i);
let s = event.as_str();
rare_counters.with_label_values(&[s])
}));
let inits_cancelled = metrics::register_int_counter!(
"pageserver_layer_inits_cancelled_count",
"Times Layer initialization was cancelled",
)
.unwrap();
Self {
started_evictions,
completed_evictions,
cancelled_evictions,
started_gcs,
completed_gcs,
failed_gcs,
started_deletes,
completed_deletes,
failed_deletes,
rare_counters,
inits_cancelled,
}
}
}
@@ -1489,57 +1566,33 @@ impl LayerImplMetrics {
self.completed_evictions.inc();
}
fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
self.cancelled_evictions
.get_metric_with_label_values(&[reason.as_str()])
.unwrap()
.inc()
self.cancelled_evictions[reason].inc()
}
fn inc_started_gcs(&self) {
self.started_gcs.inc();
fn inc_started_deletes(&self) {
self.started_deletes.inc();
}
fn inc_completed_gcs(&self) {
self.completed_gcs.inc();
fn inc_completed_deletes(&self) {
self.completed_deletes.inc();
}
fn inc_gcs_failed(&self, reason: GcFailed) {
self.failed_gcs
.get_metric_with_label_values(&[reason.as_str()])
.unwrap()
.inc();
fn inc_deletes_failed(&self, reason: DeleteFailed) {
self.failed_deletes[reason].inc();
}
/// Counted separatedly from failed gcs because we will complete the gc attempt regardless of
/// failure to delete local file.
fn inc_gc_removes_failed(&self) {
self.rare_counters
.get_metric_with_label_values(&["gc_remove_failed"])
.unwrap()
.inc();
/// Counted separatedly from failed layer deletes because we will complete the layer deletion
/// attempt regardless of failure to delete local file.
fn inc_delete_removes_failed(&self) {
self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
}
/// Expected rare because requires a race with `evict_blocking` and
/// `get_or_maybe_download`.
/// Expected rare because requires a race with `evict_blocking` and `get_or_maybe_download`.
fn inc_retried_get_or_maybe_download(&self) {
self.rare_counters
.get_metric_with_label_values(&["retried_gomd"])
.unwrap()
.inc();
self.rare_counters[RareEvent::RetriedGetOrMaybeDownload].inc();
}
/// Expected rare because cancellations are unexpected
fn inc_download_completed_without_requester(&self) {
self.rare_counters
.get_metric_with_label_values(&["download_completed_without"])
.unwrap()
.inc();
}
/// Expected rare because cancellations are unexpected
/// Expected rare because cancellations are unexpected, and failures are unexpected
fn inc_download_failed_without_requester(&self) {
self.rare_counters
.get_metric_with_label_values(&["download_failed_without"])
.unwrap()
.inc();
self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
}
/// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
@@ -1547,37 +1600,30 @@ impl LayerImplMetrics {
/// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
/// Option.
fn inc_raced_wanted_evicted_accesses(&self) {
self.rare_counters
.get_metric_with_label_values(&["raced_wanted_evicted"])
.unwrap()
.inc();
self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
}
/// These are only expected for [`Self::inc_download_completed_without_requester`] amount when
/// These are only expected for [`Self::inc_init_cancelled`] amount when
/// running with remote storage.
fn inc_init_needed_no_download(&self) {
self.rare_counters
.get_metric_with_label_values(&["init_needed_no_download"])
.unwrap()
.inc();
self.rare_counters[RareEvent::InitWithoutDownload].inc();
}
/// Expected rare because all layer files should be readable and good
fn inc_permanent_loading_failures(&self) {
self.rare_counters
.get_metric_with_label_values(&["permanent_loading_failure"])
.unwrap()
.inc();
self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
}
fn inc_broadcast_lagged(&self) {
self.rare_counters
.get_metric_with_label_values(&["broadcast_lagged"])
.unwrap()
.inc();
self.rare_counters[RareEvent::EvictAndWaitLagged].inc();
}
fn inc_init_cancelled(&self) {
self.inits_cancelled.inc()
}
}
#[derive(enum_map::Enum)]
enum EvictionCancelled {
LayerGone,
TimelineGone,
@@ -1606,19 +1652,47 @@ impl EvictionCancelled {
}
}
enum GcFailed {
#[derive(enum_map::Enum)]
enum DeleteFailed {
TimelineGone,
DeleteSchedulingFailed,
}
impl GcFailed {
impl DeleteFailed {
fn as_str(&self) -> &'static str {
match self {
GcFailed::TimelineGone => "timeline_gone",
GcFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
DeleteFailed::TimelineGone => "timeline_gone",
DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
}
}
}
static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
#[derive(enum_map::Enum)]
enum RareEvent {
RemoveOnDropFailed,
RetriedGetOrMaybeDownload,
DownloadFailedWithoutRequester,
UpgradedWantedEvicted,
InitWithoutDownload,
PermanentLoadingFailure,
EvictAndWaitLagged,
}
impl RareEvent {
fn as_str(&self) -> &'static str {
use RareEvent::*;
match self {
RemoveOnDropFailed => "remove_on_drop_failed",
RetriedGetOrMaybeDownload => "retried_gomd",
DownloadFailedWithoutRequester => "download_failed_without",
UpgradedWantedEvicted => "raced_wanted_evicted",
InitWithoutDownload => "init_needed_no_download",
PermanentLoadingFailure => "permanent_loading_failure",
EvictAndWaitLagged => "broadcast_lagged",
}
}
}
pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
once_cell::sync::Lazy::new(LayerImplMetrics::default);

View File

@@ -1,7 +1,6 @@
use core::fmt::Display;
use pageserver_api::shard::TenantShardId;
use std::ops::Range;
use utils::{id::TimelineId, lsn::Lsn};
use utils::lsn::Lsn;
use crate::repository::Key;
@@ -9,16 +8,11 @@ use super::{DeltaFileName, ImageFileName, LayerFileName};
use serde::{Deserialize, Serialize};
#[cfg(test)]
use utils::id::TenantId;
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct PersistentLayerDesc {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
/// Range of keys that this layer covers
pub key_range: Range<Key>,
/// Inclusive start, exclusive end of the LSN range that this layer holds.
@@ -57,8 +51,6 @@ impl PersistentLayerDesc {
#[cfg(test)]
pub fn new_test(key_range: Range<Key>) -> Self {
Self {
tenant_shard_id: TenantShardId::unsharded(TenantId::generate()),
timeline_id: TimelineId::generate(),
key_range,
lsn_range: Lsn(0)..Lsn(1),
is_delta: false,
@@ -66,16 +58,8 @@ impl PersistentLayerDesc {
}
}
pub fn new_img(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn: Lsn,
file_size: u64,
) -> Self {
pub fn new_img(key_range: Range<Key>, lsn: Lsn, file_size: u64) -> Self {
Self {
tenant_shard_id,
timeline_id,
key_range,
lsn_range: Self::image_layer_lsn_range(lsn),
is_delta: false,
@@ -83,16 +67,8 @@ impl PersistentLayerDesc {
}
}
pub fn new_delta(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
file_size: u64,
) -> Self {
pub fn new_delta(key_range: Range<Key>, lsn_range: Range<Lsn>, file_size: u64) -> Self {
Self {
tenant_shard_id,
timeline_id,
key_range,
lsn_range,
is_delta: true,
@@ -100,23 +76,10 @@ impl PersistentLayerDesc {
}
}
pub fn from_filename(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
filename: LayerFileName,
file_size: u64,
) -> Self {
pub fn from_filename(filename: LayerFileName, file_size: u64) -> Self {
match filename {
LayerFileName::Image(i) => {
Self::new_img(tenant_shard_id, timeline_id, i.key_range, i.lsn, file_size)
}
LayerFileName::Delta(d) => Self::new_delta(
tenant_shard_id,
timeline_id,
d.key_range,
d.lsn_range,
file_size,
),
LayerFileName::Image(i) => Self::new_img(i.key_range, i.lsn, file_size),
LayerFileName::Delta(d) => Self::new_delta(d.key_range, d.lsn_range, file_size),
}
}
@@ -173,10 +136,6 @@ impl PersistentLayerDesc {
self.key_range.clone()
}
pub fn get_timeline_id(&self) -> TimelineId {
self.timeline_id
}
/// Does this layer only contain some data for the key-range (incremental),
/// or does it contain a version of every page? This is important to know
/// for garbage collecting old layers: an incremental layer depends on
@@ -192,9 +151,7 @@ impl PersistentLayerDesc {
pub fn dump(&self) {
if self.is_delta {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} is_incremental {} size {} ----",
self.tenant_shard_id,
self.timeline_id,
"----- delta layer keys {}-{} lsn {}-{} is_incremental {} size {} ----",
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
@@ -204,9 +161,7 @@ impl PersistentLayerDesc {
);
} else {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
self.tenant_shard_id,
self.timeline_id,
"----- image layer key {}-{} at {} is_incremental {} size {} ----",
self.key_range.start,
self.key_range.end,
self.image_layer_lsn(),

View File

@@ -313,6 +313,10 @@ pub struct Timeline {
/// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
pub(crate) gate: Gate,
/// Gate to prevent shutdown completing until all Layers for this Timeline have finished
/// doing any background I/O such as deleting files on drop.
pub(crate) layer_gate: Gate,
/// Cancellation token scoped to this timeline: anything doing long-running work relating
/// to the timeline should drop out when this token fires.
pub(crate) cancel: CancellationToken,
@@ -478,7 +482,7 @@ impl Timeline {
.map(|ancestor| ancestor.timeline_id)
}
/// Lock and get timeline's GC cuttof
/// Lock and get timeline's GC cutoff
pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read()
}
@@ -1002,8 +1006,15 @@ impl Timeline {
)
.await;
// Finally wait until any gate-holders are complete
// Wait until any normal gate-holders such as page_service requests are complete
self.gate.close().await;
// Drop our references to layers: this should permit all layers to be dropped, and any I/O
// in their drop() method to complete.
self.layers.write().await.clear();
// Wait until any Layer gate holders such as LayerInner::drop are complete
self.layer_gate.close().await;
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -1445,6 +1456,7 @@ impl Timeline {
cancel,
gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
layer_gate: Gate::new(format!("TimelineLayers<{tenant_shard_id}/{timeline_id}>")),
compaction_lock: tokio::sync::Mutex::default(),
gc_lock: tokio::sync::Mutex::default(),
@@ -2176,7 +2188,7 @@ trait TraversalLayerExt {
impl TraversalLayerExt for Layer {
fn traversal_id(&self) -> TraversalId {
self.local_path().to_string()
self.filename().to_string()
}
}
@@ -2890,7 +2902,8 @@ impl Timeline {
let _g = span.entered();
let new_delta =
Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
let new_delta_path = new_delta.local_path().to_owned();
let new_delta_path = new_delta
.local_path_from_id(&self_clone.tenant_shard_id, &self_clone.timeline_id);
// Sync it to disk.
//
@@ -3134,7 +3147,7 @@ impl Timeline {
// and fsync them all in parallel.
let all_paths = image_layers
.iter()
.map(|layer| layer.local_path().to_owned())
.map(|layer| layer.local_path_from_id(&self.tenant_shard_id, &self.timeline_id))
.collect::<Vec<_>>();
par_fsync::par_fsync_async(&all_paths)
@@ -3683,7 +3696,7 @@ impl Timeline {
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
let layer_paths: Vec<Utf8PathBuf> = new_layers
.iter()
.map(|l| l.local_path().to_owned())
.map(|l| l.local_path_from_id(&self.tenant_shard_id, &self.timeline_id))
.collect();
// Fsync all the layer files and directory using multiple threads to
@@ -3971,7 +3984,7 @@ impl Timeline {
// for details. This will block until the old value is no longer in use.
//
// The GC cutoff should only ever move forwards.
{
let waitlist = {
let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
ensure!(
*write_guard <= new_gc_cutoff,
@@ -3979,8 +3992,9 @@ impl Timeline {
*write_guard,
new_gc_cutoff
);
write_guard.store_and_unlock(new_gc_cutoff).wait();
}
write_guard.store_and_unlock(new_gc_cutoff)
};
waitlist.wait().await;
info!("GC starting");

View File

@@ -33,6 +33,11 @@ impl LayerManager {
}
}
pub(crate) fn clear(&mut self) {
self.layer_map = LayerMap::default();
self.layer_fmgr.clear();
}
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.layer_fmgr.get_from_desc(desc)
}
@@ -243,7 +248,7 @@ impl LayerManager {
// map index without actually rebuilding the index.
updates.remove_historic(desc);
mapping.remove(layer);
layer.garbage_collect_on_drop();
layer.delete_on_drop();
}
pub(crate) fn contains(&self, layer: &Layer) -> bool {
@@ -271,6 +276,10 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
}
}
pub(crate) fn clear(&mut self) {
self.0.clear();
}
pub(crate) fn contains(&self, layer: &T) -> bool {
self.0.contains_key(&layer.layer_desc().key())
}

View File

@@ -610,9 +610,11 @@ impl Drop for VirtualFile {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
if let Some(fd) = slot_guard.file.take() {
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(fd));
}
}
}
}

View File

@@ -59,6 +59,7 @@
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
#include "storage/fsm_internals.h"
#include "storage/smgr.h"
#include "storage/md.h"
#include "pgstat.h"
@@ -2722,6 +2723,86 @@ smgr_init_neon(void)
}
static void
neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, XLogRecPtr end_recptr)
{
BlockNumber relsize;
/* Extend the relation if we know its size */
if (get_cached_relsize(rinfo, forknum, &relsize))
{
if (relsize < blkno + 1)
{
update_cached_relsize(rinfo, forknum, blkno + 1);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
}
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rinfo = rinfo,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
relsize = Max(nbresponse->n_blocks, blkno+1);
set_cached_relsize(rinfo, forknum, relsize);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
elog(SmgrTrace, "Set length to %d", relsize);
}
}
#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4)
/*
* TODO: May be it is better to make correspondent fgunctio from freespace.c public?
*/
static BlockNumber
get_fsm_physical_block(BlockNumber heapblk)
{
BlockNumber pages;
int leafno;
int l;
/*
* Calculate the logical page number of the first leaf page below the
* given page.
*/
leafno = heapblk / SlotsPerFSMPage;
/* Count upper level nodes required to address the leaf page */
pages = 0;
for (l = 0; l < FSM_TREE_DEPTH; l++)
{
pages += leafno + 1;
leafno /= SlotsPerFSMPage;
}
/* Turn the page count into 0-based block number */
return pages - 1;
}
/*
* Return whether we can skip the redo for this block.
*
@@ -2769,7 +2850,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
LWLock *partitionLock;
Buffer buffer;
bool no_redo_needed;
BlockNumber relsize;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
@@ -2819,49 +2899,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
LWLockRelease(partitionLock);
/* Extend the relation if we know its size */
if (get_cached_relsize(rinfo, forknum, &relsize))
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
if (relsize < blkno + 1)
{
update_cached_relsize(rinfo, forknum, blkno + 1);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
}
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rinfo = rinfo,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rinfo, forknum, nbresponse->n_blocks);
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}
return no_redo_needed;
}

View File

@@ -260,6 +260,14 @@ class PageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_reset(self, tenant_id: TenantId, drop_cache: bool):
params = {}
if drop_cache:
params["drop_cache"] = "true"
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
self.verbose_error(res)
def tenant_delete(self, tenant_id: TenantId):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)

View File

@@ -0,0 +1,29 @@
import random
import time
from fixtures.neon_fixtures import NeonEnv
def test_physical_replication(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 100000
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute(
"CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))"
)
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
for pk in range(n_records):
p_cur.execute("insert into t (pk) values (%s)", (pk,))
s_cur.execute(
"select * from t where pk=%s", (random.randrange(1, n_records),)
)

View File

@@ -840,7 +840,7 @@ def test_compaction_waits_for_upload(
), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)"
def layer_deletes_completed():
m = client.get_metric_value("pageserver_layer_gcs_count_total", {"state": "completed"})
m = client.get_metric_value("pageserver_layer_completed_deletes_total")
if m is None:
return 0
return int(m)

View File

@@ -1,4 +1,5 @@
import asyncio
import enum
import random
import time
from threading import Thread
@@ -51,11 +52,20 @@ def do_gc_target(
log.info("gc http thread returning")
class ReattachMode(str, enum.Enum):
REATTACH_EXPLICIT = "explicit"
REATTACH_RESET = "reset"
REATTACH_RESET_DROP = "reset"
# Basic detach and re-attach test
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize(
"mode",
[ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
)
def test_tenant_reattach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
@@ -100,8 +110,15 @@ def test_tenant_reattach(
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
)
pageserver_http.tenant_detach(tenant_id)
pageserver_http.tenant_attach(tenant_id)
if mode == ReattachMode.REATTACH_EXPLICIT:
# Explicitly detach then attach the tenant as two separate API calls
pageserver_http.tenant_detach(tenant_id)
pageserver_http.tenant_attach(tenant_id)
elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP):
# Use the reset API to detach/attach in one shot
pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP)
else:
raise NotImplementedError(mode)
time.sleep(1) # for metrics propagation