mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 12:40:36 +00:00
Compare commits
18 Commits
jcsp/shard
...
jcsp/layer
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93e069ceae | ||
|
|
90e27f1800 | ||
|
|
0f533194d2 | ||
|
|
5b181443b3 | ||
|
|
52718bb8ff | ||
|
|
10c77cb410 | ||
|
|
31be301ef3 | ||
|
|
a3c7d400b4 | ||
|
|
7501ca6efb | ||
|
|
987c9aaea0 | ||
|
|
41401ea2b8 | ||
|
|
2a8197b7ce | ||
|
|
d07bc7ba01 | ||
|
|
7fab731f65 | ||
|
|
483caa22c6 | ||
|
|
da5e03b0d8 | ||
|
|
be885370f6 | ||
|
|
bc1020f965 |
@@ -387,10 +387,20 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
ARG PG_VERSION
|
||||
ENV PATH "/usr/local/pgsql/bin:$PATH"
|
||||
|
||||
RUN apt-get update && \
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15") \
|
||||
export TIMESCALEDB_VERSION=2.10.1 \
|
||||
export TIMESCALEDB_CHECKSUM=6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 \
|
||||
;; \
|
||||
*) \
|
||||
export TIMESCALEDB_VERSION=2.13.0 \
|
||||
export TIMESCALEDB_CHECKSUM=584a351c7775f0e067eaa0e7277ea88cab9077cc4c455cbbf09a5d9723dce95d \
|
||||
;; \
|
||||
esac && \
|
||||
apt-get update && \
|
||||
apt-get install -y cmake && \
|
||||
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.13.0.tar.gz -O timescaledb.tar.gz && \
|
||||
echo "584a351c7775f0e067eaa0e7277ea88cab9077cc4c455cbbf09a5d9723dce95d timescaledb.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/timescale/timescaledb/archive/refs/tags/${TIMESCALEDB_VERSION}.tar.gz -O timescaledb.tar.gz && \
|
||||
echo "${TIMESCALEDB_CHECKSUM} timescaledb.tar.gz" | sha256sum --check && \
|
||||
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
|
||||
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON -DCMAKE_BUILD_TYPE=Release && \
|
||||
cd build && \
|
||||
|
||||
@@ -274,7 +274,13 @@ fn main() -> Result<()> {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
state.error = Some(format!("{:?}", err));
|
||||
state.status = ComputeStatus::Failed;
|
||||
drop(state);
|
||||
// Notify others that Postgres failed to start. In case of configuring the
|
||||
// empty compute, it's likely that API handler is still waiting for compute
|
||||
// state change. With this we will notify it that compute is in Failed state,
|
||||
// so control plane will know about it earlier and record proper error instead
|
||||
// of timeout.
|
||||
compute.state_changed.notify_all();
|
||||
drop(state); // unlock
|
||||
delay_exit = true;
|
||||
None
|
||||
}
|
||||
|
||||
@@ -73,19 +73,28 @@ impl TenantShardId {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn shard_slug(&self) -> String {
|
||||
format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
||||
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
|
||||
ShardSlug(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Formatting helper
|
||||
struct ShardSlug<'a>(&'a TenantShardId);
|
||||
|
||||
impl<'a> std::fmt::Display for ShardSlug<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{:02x}{:02x}",
|
||||
self.0.shard_number.0, self.0.shard_count.0
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TenantShardId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if self.shard_count != ShardCount(0) {
|
||||
write!(
|
||||
f,
|
||||
"{}-{:02x}{:02x}",
|
||||
self.tenant_id, self.shard_number.0, self.shard_count.0
|
||||
)
|
||||
write!(f, "{}-{}", self.tenant_id, self.shard_slug())
|
||||
} else {
|
||||
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
|
||||
// is distinct from the normal single shard case (shard count == 1).
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
//!
|
||||
//! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
|
||||
//! similar to a lock, but it allows readers to "hold on" to an old value of RCU
|
||||
//! without blocking writers, and allows writing a new values without blocking
|
||||
//! readers. When you update the new value, the new value is immediately visible
|
||||
//! without blocking writers, and allows writing a new value without blocking
|
||||
//! readers. When you update the value, the new value is immediately visible
|
||||
//! to new readers, but the update waits until all existing readers have
|
||||
//! finishe, so that no one sees the old value anymore.
|
||||
//! finished, so that on return, no one sees the old value anymore.
|
||||
//!
|
||||
//! This implementation isn't wait-free; it uses an RwLock that is held for a
|
||||
//! short duration when the value is read or updated.
|
||||
@@ -26,6 +26,7 @@
|
||||
//! Increment the value by one, and wait for old readers to finish:
|
||||
//!
|
||||
//! ```
|
||||
//! # async fn dox() {
|
||||
//! # let rcu = utils::simple_rcu::Rcu::new(1);
|
||||
//! let write_guard = rcu.lock_for_write();
|
||||
//!
|
||||
@@ -36,15 +37,17 @@
|
||||
//!
|
||||
//! // Concurrent reads and writes are now possible again. Wait for all the readers
|
||||
//! // that still observe the old value to finish.
|
||||
//! waitlist.wait();
|
||||
//! waitlist.wait().await;
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::{Mutex, RwLock, RwLockWriteGuard};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use tokio::sync::watch;
|
||||
|
||||
///
|
||||
/// Rcu allows multiple readers to read and hold onto a value without blocking
|
||||
@@ -68,22 +71,21 @@ struct RcuCell<V> {
|
||||
value: V,
|
||||
|
||||
/// A dummy channel. We never send anything to this channel. The point is
|
||||
/// that when the RcuCell is dropped, any cloned Senders will be notified
|
||||
/// that when the RcuCell is dropped, any subscribed Receivers will be notified
|
||||
/// that the channel is closed. Updaters can use this to wait out until the
|
||||
/// RcuCell has been dropped, i.e. until the old value is no longer in use.
|
||||
///
|
||||
/// We never do anything with the receiver, we just need to hold onto it so
|
||||
/// that the Senders will be notified when it's dropped. But because it's
|
||||
/// not Sync, we need a Mutex on it.
|
||||
watch: (SyncSender<()>, Mutex<Receiver<()>>),
|
||||
/// We never send anything to this, we just need to hold onto it so that the
|
||||
/// Receivers will be notified when it's dropped.
|
||||
watch: watch::Sender<()>,
|
||||
}
|
||||
|
||||
impl<V> RcuCell<V> {
|
||||
fn new(value: V) -> Self {
|
||||
let (watch_sender, watch_receiver) = sync_channel(0);
|
||||
let (watch_sender, _) = watch::channel(());
|
||||
RcuCell {
|
||||
value,
|
||||
watch: (watch_sender, Mutex::new(watch_receiver)),
|
||||
watch: watch_sender,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -141,10 +143,10 @@ impl<V> Deref for RcuReadGuard<V> {
|
||||
///
|
||||
/// Write guard returned by `write`
|
||||
///
|
||||
/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so
|
||||
/// it should only be held for a short duration!
|
||||
/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
|
||||
/// held for a short duration!
|
||||
///
|
||||
/// Calling `store` consumes the guard, making new reads and new writes possible
|
||||
/// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
|
||||
/// again.
|
||||
///
|
||||
pub struct RcuWriteGuard<'a, V> {
|
||||
@@ -179,7 +181,7 @@ impl<'a, V> RcuWriteGuard<'a, V> {
|
||||
// the watches for any that do.
|
||||
self.inner.old_cells.retain(|weak| {
|
||||
if let Some(cell) = weak.upgrade() {
|
||||
watches.push(cell.watch.0.clone());
|
||||
watches.push(cell.watch.subscribe());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
@@ -193,20 +195,20 @@ impl<'a, V> RcuWriteGuard<'a, V> {
|
||||
///
|
||||
/// List of readers who can still see old values.
|
||||
///
|
||||
pub struct RcuWaitList(Vec<SyncSender<()>>);
|
||||
pub struct RcuWaitList(Vec<watch::Receiver<()>>);
|
||||
|
||||
impl RcuWaitList {
|
||||
///
|
||||
/// Wait for old readers to finish.
|
||||
///
|
||||
pub fn wait(mut self) {
|
||||
pub async fn wait(mut self) {
|
||||
// after all the old_cells are no longer in use, we're done
|
||||
for w in self.0.iter_mut() {
|
||||
// This will block until the Receiver is closed. That happens when
|
||||
// the RcuCell is dropped.
|
||||
#[allow(clippy::single_match)]
|
||||
match w.send(()) {
|
||||
Ok(_) => panic!("send() unexpectedly succeeded on dummy channel"),
|
||||
match w.changed().await {
|
||||
Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
|
||||
Err(_) => {
|
||||
// closed, which means that the cell has been dropped, and
|
||||
// its value is no longer in use
|
||||
@@ -220,11 +222,10 @@ impl RcuWaitList {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::{sleep, spawn};
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn two_writers() {
|
||||
#[tokio::test]
|
||||
async fn two_writers() {
|
||||
let rcu = Rcu::new(1);
|
||||
|
||||
let read1 = rcu.read();
|
||||
@@ -248,33 +249,35 @@ mod tests {
|
||||
assert_eq!(*read1, 1);
|
||||
|
||||
let log = Arc::new(Mutex::new(Vec::new()));
|
||||
// Wait for the old readers to finish in separate threads.
|
||||
// Wait for the old readers to finish in separate tasks.
|
||||
let log_clone = Arc::clone(&log);
|
||||
let thread2 = spawn(move || {
|
||||
wait2.wait();
|
||||
let task2 = tokio::spawn(async move {
|
||||
wait2.wait().await;
|
||||
log_clone.lock().unwrap().push("wait2 done");
|
||||
});
|
||||
let log_clone = Arc::clone(&log);
|
||||
let thread3 = spawn(move || {
|
||||
wait3.wait();
|
||||
let task3 = tokio::spawn(async move {
|
||||
wait3.wait().await;
|
||||
log_clone.lock().unwrap().push("wait3 done");
|
||||
});
|
||||
|
||||
// without this sleep the test can pass on accident if the writer is slow
|
||||
sleep(Duration::from_millis(500));
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Release first reader. This allows first write to finish, but calling
|
||||
// wait() on the second one would still block.
|
||||
// wait() on the 'task3' would still block.
|
||||
log.lock().unwrap().push("dropping read1");
|
||||
drop(read1);
|
||||
thread2.join().unwrap();
|
||||
task2.await.unwrap();
|
||||
|
||||
sleep(Duration::from_millis(500));
|
||||
assert!(!task3.is_finished());
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Release second reader, and finish second writer.
|
||||
log.lock().unwrap().push("dropping read2");
|
||||
drop(read2);
|
||||
thread3.join().unwrap();
|
||||
task3.await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
log.lock().unwrap().as_slice(),
|
||||
|
||||
@@ -3,7 +3,6 @@ use pageserver::repository::Key;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::LayerFileName;
|
||||
use pageserver::tenant::storage_layer::PersistentLayerDesc;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::File;
|
||||
@@ -11,7 +10,6 @@ use std::io::{BufRead, BufReader};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::time::Instant;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -211,13 +209,8 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
for i in 0..100_000 {
|
||||
let i32 = (i as u32) % 100;
|
||||
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
let layer = PersistentLayerDesc::new_img(
|
||||
TenantShardId::unsharded(TenantId::generate()),
|
||||
TimelineId::generate(),
|
||||
zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
Lsn(i),
|
||||
0,
|
||||
);
|
||||
let layer =
|
||||
PersistentLayerDesc::new_img(zero.add(10 * i32)..zero.add(10 * i32 + 1), Lsn(i), 0);
|
||||
updates.insert_historic(layer);
|
||||
}
|
||||
updates.flush();
|
||||
|
||||
@@ -310,8 +310,8 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
.unwrap()
|
||||
.as_micros(),
|
||||
partition,
|
||||
desc.tenant_shard_id,
|
||||
desc.timeline_id,
|
||||
candidate.timeline.tenant_shard_id,
|
||||
candidate.timeline.timeline_id,
|
||||
candidate.layer,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -709,6 +709,26 @@ async fn tenant_detach_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_reset_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.tenant_manager
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_load_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -824,7 +844,7 @@ async fn tenant_delete_handler(
|
||||
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id)
|
||||
.instrument(info_span!("tenant_delete_handler",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard = tenant_shard_id.shard_slug()
|
||||
shard = %tenant_shard_id.shard_slug()
|
||||
))
|
||||
.await?;
|
||||
|
||||
@@ -1173,7 +1193,7 @@ async fn put_tenant_location_config_handler(
|
||||
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
|
||||
.instrument(info_span!("tenant_detach",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard = tenant_shard_id.shard_slug()
|
||||
shard = %tenant_shard_id.shard_slug()
|
||||
))
|
||||
.await
|
||||
{
|
||||
@@ -1828,6 +1848,9 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_id/detach", |r| {
|
||||
api_handler(r, tenant_detach_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_shard_id/reset", |r| {
|
||||
api_handler(r, tenant_reset_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/load", |r| {
|
||||
api_handler(r, tenant_load_handler)
|
||||
})
|
||||
|
||||
@@ -205,7 +205,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
match tokio::time::timeout(warn_at, &mut fut).await {
|
||||
Ok(ret) => {
|
||||
tracing::info!(
|
||||
task = name,
|
||||
stage = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"completed"
|
||||
);
|
||||
@@ -213,7 +213,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
task = name,
|
||||
stage = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"still waiting, taking longer than expected..."
|
||||
);
|
||||
@@ -222,7 +222,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
|
||||
// this has a global allowed_errors
|
||||
tracing::warn!(
|
||||
task = name,
|
||||
stage = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"completed, took longer than expected"
|
||||
);
|
||||
|
||||
@@ -2094,6 +2094,8 @@ pub fn preinitialize_metrics() {
|
||||
// Tenant manager stats
|
||||
Lazy::force(&TENANT_MANAGER);
|
||||
|
||||
Lazy::force(&crate::tenant::storage_layer::layer::LAYER_IMPL_METRICS);
|
||||
|
||||
// countervecs
|
||||
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
|
||||
.into_iter()
|
||||
|
||||
@@ -67,9 +67,9 @@ use crate::trace::Tracer;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
// How long we may block waiting for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
|
||||
// How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
|
||||
// is not yet in state [`TenantState::Active`].
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
|
||||
@@ -2515,7 +2515,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
info!("persisting tenantconf to {config_path}");
|
||||
debug!("persisting tenantconf to {config_path}");
|
||||
|
||||
let mut conf_content = r#"# This file contains a specific per-tenant's config.
|
||||
# It is read in case of pageserver restart.
|
||||
@@ -2550,7 +2550,7 @@ impl Tenant {
|
||||
target_config_path: &Utf8Path,
|
||||
tenant_conf: &TenantConfOpt,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("persisting tenantconf to {target_config_path}");
|
||||
debug!("persisting tenantconf to {target_config_path}");
|
||||
|
||||
let mut conf_content = r#"# This file contains a specific per-tenant's config.
|
||||
# It is read in case of pageserver restart.
|
||||
|
||||
@@ -270,49 +270,6 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
|
||||
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
|
||||
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
|
||||
|
||||
/// Create a directory, including parents. This does no fsyncs and makes
|
||||
/// no guarantees about the persistence of the resulting metadata: for
|
||||
/// use when creating dirs for use as cache.
|
||||
async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> {
|
||||
let mut dirs_to_create = Vec::new();
|
||||
let mut path: &Utf8Path = path.as_ref();
|
||||
|
||||
// Figure out which directories we need to create.
|
||||
loop {
|
||||
let meta = tokio::fs::metadata(path).await;
|
||||
match meta {
|
||||
Ok(metadata) if metadata.is_dir() => break,
|
||||
Ok(_) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::AlreadyExists,
|
||||
format!("non-directory found in path: {path}"),
|
||||
));
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
|
||||
dirs_to_create.push(path);
|
||||
|
||||
match path.parent() {
|
||||
Some(parent) => path = parent,
|
||||
None => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!("can't find parent of path '{path}'"),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create directories from parent to child.
|
||||
for &path in dirs_to_create.iter().rev() {
|
||||
tokio::fs::create_dir(path).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The TenantManager is responsible for storing and mutating the collection of all tenants
|
||||
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
|
||||
/// lives inside the TenantManager.
|
||||
@@ -646,7 +603,13 @@ pub(crate) fn tenant_spawn(
|
||||
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
||||
);
|
||||
|
||||
info!("Attaching tenant {tenant_shard_id}");
|
||||
info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
generation = ?location_conf.location.generation,
|
||||
attach_mode = ?location_conf.location.attach_mode,
|
||||
"Attaching tenant"
|
||||
);
|
||||
let tenant = match Tenant::spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
@@ -1035,7 +998,7 @@ impl TenantManager {
|
||||
LocationMode::Secondary(_) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
unsafe_create_dir_all(&tenant_path)
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
@@ -1051,7 +1014,7 @@ impl TenantManager {
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
unsafe_create_dir_all(&timelines_path)
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
@@ -1081,6 +1044,81 @@ impl TenantManager {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
|
||||
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
|
||||
/// dropped before re-attaching.
|
||||
///
|
||||
/// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
|
||||
/// where an issue is identified that would go away with a restart of the tenant.
|
||||
///
|
||||
/// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
|
||||
/// to respect the cancellation tokens used in normal shutdown().
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
|
||||
pub(crate) async fn reset_tenant(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
drop_cache: bool,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let Some(old_slot) = slot_guard.get_old_value() else {
|
||||
anyhow::bail!("Tenant not found when trying to reset");
|
||||
};
|
||||
|
||||
let Some(tenant) = old_slot.get_attached() else {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Tenant is not in attached state");
|
||||
};
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
}
|
||||
Err(_barrier) => {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Cannot reset Tenant, already shutting down");
|
||||
}
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
|
||||
|
||||
if drop_cache {
|
||||
tracing::info!("Dropping local file cache");
|
||||
|
||||
match tokio::fs::read_dir(&timelines_path).await {
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to list timelines while dropping cache: {}", e);
|
||||
}
|
||||
Ok(mut entries) => {
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
tokio::fs::remove_dir_all(entry.path()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let shard_identity = config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
&ctx,
|
||||
)?;
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -1289,8 +1327,7 @@ pub(crate) async fn delete_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/5080
|
||||
|
||||
// TODO(sharding): make delete API sharding-aware
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// unwrap is safe because we used MustExist mode when acquiring
|
||||
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
|
||||
@@ -1617,9 +1654,10 @@ pub enum TenantSlotUpsertError {
|
||||
MapState(#[from] TenantMapError),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum TenantSlotDropError {
|
||||
/// It is only legal to drop a TenantSlot if its contents are fully shut down
|
||||
#[error("Tenant was not shut down")]
|
||||
NotShutdown,
|
||||
}
|
||||
|
||||
@@ -1679,9 +1717,9 @@ impl SlotGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Take any value that was present in the slot before we acquired ownership
|
||||
/// Get any value that was present in the slot before we acquired ownership
|
||||
/// of it: in state transitions, this will be the old state.
|
||||
fn get_old_value(&mut self) -> &Option<TenantSlot> {
|
||||
fn get_old_value(&self) -> &Option<TenantSlot> {
|
||||
&self.old_value
|
||||
}
|
||||
|
||||
@@ -1899,7 +1937,7 @@ fn tenant_map_acquire_slot_impl(
|
||||
METRICS.tenant_slot_writes.inc();
|
||||
|
||||
let mut locked = tenants.write().unwrap();
|
||||
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard=tenant_shard_id.shard_slug());
|
||||
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard = %tenant_shard_id.shard_slug());
|
||||
let _guard = span.enter();
|
||||
|
||||
let m = match &mut *locked {
|
||||
|
||||
@@ -1271,11 +1271,12 @@ impl RemoteTimelineClient {
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
|
||||
let path = layer.local_path();
|
||||
let path = layer.local_path_from_id(&self.tenant_shard_id, &self.timeline_id);
|
||||
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
path,
|
||||
&path,
|
||||
layer_metadata,
|
||||
self.generation,
|
||||
)
|
||||
|
||||
@@ -4,7 +4,7 @@ pub mod delta_layer;
|
||||
mod filename;
|
||||
pub mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod layer;
|
||||
pub(crate) mod layer;
|
||||
mod layer_desc;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
@@ -24,7 +24,7 @@ use tracing::warn;
|
||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
@@ -301,31 +301,17 @@ pub trait AsLayerDesc {
|
||||
}
|
||||
|
||||
pub mod tests {
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
use super::*;
|
||||
|
||||
impl From<DeltaFileName> for PersistentLayerDesc {
|
||||
fn from(value: DeltaFileName) -> Self {
|
||||
PersistentLayerDesc::new_delta(
|
||||
TenantShardId::from([0; 18]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn_range,
|
||||
233,
|
||||
)
|
||||
PersistentLayerDesc::new_delta(value.key_range, value.lsn_range, 233)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for PersistentLayerDesc {
|
||||
fn from(value: ImageFileName) -> Self {
|
||||
PersistentLayerDesc::new_img(
|
||||
TenantShardId::from([0; 18]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn,
|
||||
233,
|
||||
)
|
||||
PersistentLayerDesc::new_img(value.key_range, value.lsn, 233)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -84,17 +84,6 @@ pub struct Summary {
|
||||
pub index_root_blk: u32,
|
||||
}
|
||||
|
||||
impl From<&DeltaLayer> for Summary {
|
||||
fn from(layer: &DeltaLayer) -> Self {
|
||||
Self::expected(
|
||||
layer.desc.tenant_shard_id.tenant_id,
|
||||
layer.desc.timeline_id,
|
||||
layer.desc.key_range.clone(),
|
||||
layer.desc.lsn_range.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
@@ -320,15 +309,9 @@ impl DeltaLayer {
|
||||
.metadata()
|
||||
.context("get file metadata to determine size")?;
|
||||
|
||||
// TODO(sharding): we must get the TenantShardId from the path instead of reading the Summary.
|
||||
// we should also validate the path against the Summary, as both should contain the same tenant, timeline, key, lsn.
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
|
||||
Ok(DeltaLayer {
|
||||
path: path.to_path_buf(),
|
||||
desc: PersistentLayerDesc::new_delta(
|
||||
tenant_shard_id,
|
||||
summary.timeline_id,
|
||||
summary.key_range,
|
||||
summary.lsn_range,
|
||||
metadata.len(),
|
||||
@@ -505,8 +488,6 @@ impl DeltaLayerWriterInner {
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
|
||||
let desc = PersistentLayerDesc::new_delta(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.key_start..key_end,
|
||||
self.lsn_range.clone(),
|
||||
metadata.len(),
|
||||
@@ -517,7 +498,7 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
|
||||
trace!("created delta layer {}", layer.local_path());
|
||||
trace!("created delta layer {}", self.path);
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
@@ -85,17 +85,6 @@ pub struct Summary {
|
||||
// the 'values' part starts after the summary header, on block 1.
|
||||
}
|
||||
|
||||
impl From<&ImageLayer> for Summary {
|
||||
fn from(layer: &ImageLayer) -> Self {
|
||||
Self::expected(
|
||||
layer.desc.tenant_shard_id.tenant_id,
|
||||
layer.desc.timeline_id,
|
||||
layer.desc.key_range.clone(),
|
||||
layer.lsn,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
@@ -278,19 +267,9 @@ impl ImageLayer {
|
||||
.metadata()
|
||||
.context("get file metadata to determine size")?;
|
||||
|
||||
// TODO(sharding): we should get TenantShardId from path.
|
||||
// OR, not at all: any layer we load from disk should also get reconciled with remote IndexPart.
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
|
||||
Ok(ImageLayer {
|
||||
path: path.to_path_buf(),
|
||||
desc: PersistentLayerDesc::new_img(
|
||||
tenant_shard_id,
|
||||
summary.timeline_id,
|
||||
summary.key_range,
|
||||
summary.lsn,
|
||||
metadata.len(),
|
||||
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
desc: PersistentLayerDesc::new_img(summary.key_range, summary.lsn, metadata.len()), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
lsn: summary.lsn,
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: OnceCell::new(),
|
||||
@@ -581,13 +560,7 @@ impl ImageLayerWriterInner {
|
||||
.await
|
||||
.context("get metadata to determine file size")?;
|
||||
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.key_range.clone(),
|
||||
self.lsn,
|
||||
metadata.len(),
|
||||
);
|
||||
let desc = PersistentLayerDesc::new_img(self.key_range.clone(), self.lsn, metadata.len());
|
||||
|
||||
// Note: Because we open the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
@@ -599,7 +572,7 @@ impl ImageLayerWriterInner {
|
||||
// FIXME: why not carry the virtualfile here, it supports renaming?
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
|
||||
trace!("created image layer {}", layer.local_path());
|
||||
trace!("created image layer {}", self.path);
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
@@ -3,13 +3,15 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use tracing::Instrument;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateError;
|
||||
use utils::sync::heavier_once_cell;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -81,12 +83,7 @@ impl Layer {
|
||||
file_name: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
) -> Self {
|
||||
let desc = PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size(),
|
||||
);
|
||||
let desc = PersistentLayerDesc::from_filename(file_name, metadata.file_size());
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
|
||||
|
||||
@@ -100,7 +97,7 @@ impl Layer {
|
||||
metadata.shard,
|
||||
)));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
debug_assert!(owner.0.needs_download_blocking(timeline).unwrap().is_some());
|
||||
|
||||
owner
|
||||
}
|
||||
@@ -112,12 +109,7 @@ impl Layer {
|
||||
file_name: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
) -> ResidentLayer {
|
||||
let desc = PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size(),
|
||||
);
|
||||
let desc = PersistentLayerDesc::from_filename(file_name, metadata.file_size());
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
|
||||
|
||||
@@ -144,7 +136,7 @@ impl Layer {
|
||||
|
||||
let downloaded = resident.expect("just initialized");
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
|
||||
debug_assert!(owner.0.needs_download_blocking(timeline).unwrap().is_none());
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
@@ -189,7 +181,7 @@ impl Layer {
|
||||
let downloaded = resident.expect("just initialized");
|
||||
|
||||
// if the rename works, the path is as expected
|
||||
std::fs::rename(temp_path, owner.local_path())
|
||||
std::fs::rename(temp_path, owner.local_path(timeline))
|
||||
.with_context(|| format!("rename temporary file as correct path for {owner}"))?;
|
||||
|
||||
Ok(ResidentLayer { downloaded, owner })
|
||||
@@ -222,8 +214,8 @@ impl Layer {
|
||||
///
|
||||
/// [gc]: [`RemoteTimelineClient::schedule_gc_update`]
|
||||
/// [compaction]: [`RemoteTimelineClient::schedule_compaction_update`]
|
||||
pub(crate) fn garbage_collect_on_drop(&self) {
|
||||
self.0.garbage_collect_on_drop();
|
||||
pub(crate) fn delete_on_drop(&self) {
|
||||
self.0.delete_on_drop();
|
||||
}
|
||||
|
||||
/// Return data needed to reconstruct given page at LSN.
|
||||
@@ -309,8 +301,12 @@ impl Layer {
|
||||
&self.0.access_stats
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> &Utf8Path {
|
||||
&self.0.path
|
||||
fn local_path(&self, timeline: &Timeline) -> Utf8PathBuf {
|
||||
self.0.local_path(timeline)
|
||||
}
|
||||
|
||||
pub(crate) fn filename(&self) -> LayerFileName {
|
||||
self.0.desc.filename()
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
@@ -331,10 +327,10 @@ impl Layer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Waits until this layer has been dropped (and if needed, local garbage collection and remote
|
||||
/// Waits until this layer has been dropped (and if needed, local file deletion and remote
|
||||
/// deletion scheduling has completed).
|
||||
///
|
||||
/// Does not start garbage collection, use [`Self::garbage_collect_on_drop`] for that
|
||||
/// Does not start local deletion, use [`Self::delete_on_drop`] for that
|
||||
/// separatedly.
|
||||
#[cfg(feature = "testing")]
|
||||
pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||
@@ -402,13 +398,9 @@ impl ResidentOrWantedEvicted {
|
||||
}
|
||||
|
||||
struct LayerInner {
|
||||
/// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
|
||||
/// [`Self::path`].
|
||||
/// Only needed to check ondemand_download_behavior_treat_error_as_warn and in [`Self::local_path_from_id`]
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
/// Full path to the file; unclear if this should exist anymore.
|
||||
path: Utf8PathBuf,
|
||||
|
||||
desc: PersistentLayerDesc,
|
||||
|
||||
/// Timeline access is needed for remote timeline client and metrics.
|
||||
@@ -423,8 +415,8 @@ struct LayerInner {
|
||||
/// Initialization and deinitialization are done while holding a permit.
|
||||
inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
|
||||
|
||||
/// Do we want to garbage collect this when `LayerInner` is dropped
|
||||
wanted_garbage_collected: AtomicBool,
|
||||
/// Do we want to delete locally and remotely this when `LayerInner` is dropped
|
||||
wanted_deleted: AtomicBool,
|
||||
|
||||
/// Do we want to evict this layer as soon as possible? After being set to `true`, all accesses
|
||||
/// will try to downgrade [`ResidentOrWantedEvicted`], which will eventually trigger
|
||||
@@ -438,10 +430,6 @@ struct LayerInner {
|
||||
version: AtomicUsize,
|
||||
|
||||
/// Allow subscribing to when the layer actually gets evicted.
|
||||
///
|
||||
/// If in future we need to implement "wait until layer instances are gone and done", carrying
|
||||
/// this over to the gc spawn_blocking from LayerInner::drop will do the trick, and adding a
|
||||
/// method for "wait_gc" which will wait to this being closed.
|
||||
status: tokio::sync::broadcast::Sender<Status>,
|
||||
|
||||
/// Counter for exponential backoff with the download
|
||||
@@ -483,19 +471,39 @@ enum Status {
|
||||
|
||||
impl Drop for LayerInner {
|
||||
fn drop(&mut self) {
|
||||
if !*self.wanted_garbage_collected.get_mut() {
|
||||
if !*self.wanted_deleted.get_mut() {
|
||||
// should we try to evict if the last wish was for eviction?
|
||||
// feels like there's some hazard of overcrowding near shutdown near by, but we don't
|
||||
// run drops during shutdown (yet)
|
||||
return;
|
||||
}
|
||||
|
||||
let span = tracing::info_span!(parent: None, "layer_gc", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
|
||||
// We will only do I/O on drop if our Timeline still exists. Otherwise, we may safely
|
||||
// leave garbage layers behind to be cleaned up the next time this Timeline is instantiated.
|
||||
let Some(timeline) = self.timeline.upgrade() else {
|
||||
// no need to nag that timeline is gone: under normal situation on
|
||||
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
};
|
||||
|
||||
// We will only do I/O during drop if our Timeline's layer_gate is open: this avoids
|
||||
// the risk that we would race with Timeline::shutdown and end up doing I/O to a timeline
|
||||
// path for which the Timeline object has been torn down already.
|
||||
let _gate_guard = match timeline.layer_gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(GateError::GateClosed) => {
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// If timeline is alive, we can construct a span with IDs for this function.
|
||||
let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %timeline.tenant_shard_id.tenant_id, shard_id=%timeline.tenant_shard_id.shard_slug(), timeline_id = %timeline.timeline_id);
|
||||
let path = self.local_path(&timeline);
|
||||
|
||||
let path = std::mem::take(&mut self.path);
|
||||
let file_name = self.layer_desc().filename();
|
||||
let file_size = self.layer_desc().file_size;
|
||||
let timeline = self.timeline.clone();
|
||||
let meta = self.metadata();
|
||||
let status = self.status.clone();
|
||||
|
||||
@@ -517,38 +525,32 @@ impl Drop for LayerInner {
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to remove garbage collected layer: {e}");
|
||||
LAYER_IMPL_METRICS.inc_gc_removes_failed();
|
||||
tracing::error!("failed to remove wanted deleted layer: {e}");
|
||||
LAYER_IMPL_METRICS.inc_delete_removes_failed();
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(timeline) = timeline.upgrade() {
|
||||
if removed {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
if removed {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
// demonstrating this deadlock (without spawn_blocking): stop will drop
|
||||
// queued items, which will have ResidentLayer's, and those drops would try
|
||||
// to re-entrantly lock the RemoteTimelineClient inner state.
|
||||
if !timeline.is_active() {
|
||||
tracing::info!("scheduling deletion on drop failed: {e:#}");
|
||||
} else {
|
||||
tracing::warn!("scheduling deletion on drop failed: {e:#}");
|
||||
}
|
||||
LAYER_IMPL_METRICS.inc_gcs_failed(GcFailed::DeleteSchedulingFailed);
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
// demonstrating this deadlock (without spawn_blocking): stop will drop
|
||||
// queued items, which will have ResidentLayer's, and those drops would try
|
||||
// to re-entrantly lock the RemoteTimelineClient inner state.
|
||||
if !timeline.is_active() {
|
||||
tracing::info!("scheduling deletion on drop failed: {e:#}");
|
||||
} else {
|
||||
LAYER_IMPL_METRICS.inc_completed_gcs();
|
||||
tracing::warn!("scheduling deletion on drop failed: {e:#}");
|
||||
}
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
|
||||
} else {
|
||||
LAYER_IMPL_METRICS.inc_completed_deletes();
|
||||
}
|
||||
} else {
|
||||
// no need to nag that timeline is gone: under normal situation on
|
||||
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
|
||||
LAYER_IMPL_METRICS.inc_gcs_failed(GcFailed::TimelineGone);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -564,10 +566,6 @@ impl LayerInner {
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Self {
|
||||
let path = conf
|
||||
.timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id)
|
||||
.join(desc.filename().to_string());
|
||||
|
||||
let (inner, version) = if let Some(inner) = downloaded {
|
||||
let version = inner.version;
|
||||
let resident = ResidentOrWantedEvicted::Resident(inner);
|
||||
@@ -578,12 +576,11 @@ impl LayerInner {
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
have_remote_client: timeline.remote_client.is_some(),
|
||||
access_stats,
|
||||
wanted_garbage_collected: AtomicBool::new(false),
|
||||
wanted_deleted: AtomicBool::new(false),
|
||||
wanted_evicted: AtomicBool::new(false),
|
||||
inner,
|
||||
version: AtomicUsize::new(version),
|
||||
@@ -594,16 +591,32 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
fn garbage_collect_on_drop(&self) {
|
||||
let res = self.wanted_garbage_collected.compare_exchange(
|
||||
false,
|
||||
true,
|
||||
Ordering::Release,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
/// All call sites that need this function should already have a Timeline (e.g. from
|
||||
/// upgrading the Self::timeline weak pointer) -- it doesn't make sense to try and
|
||||
/// do anything with the local file if the Timeline isn't still alive.
|
||||
fn local_path(&self, timeline: &Timeline) -> Utf8PathBuf {
|
||||
self.local_path_from_id(&timeline.tenant_shard_id, &timeline.timeline_id)
|
||||
}
|
||||
|
||||
/// Use this instead of `local_path` if you don't have a Timeline but do have its ID: this
|
||||
/// is used by external callers such as [`crate::tenant::RemoteTimelineClient`]
|
||||
pub(crate) fn local_path_from_id(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Utf8PathBuf {
|
||||
self.conf
|
||||
.timeline_path(tenant_shard_id, timeline_id)
|
||||
.join(self.desc.filename().to_string())
|
||||
}
|
||||
|
||||
fn delete_on_drop(&self) {
|
||||
let res =
|
||||
self.wanted_deleted
|
||||
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
|
||||
|
||||
if res.is_ok() {
|
||||
LAYER_IMPL_METRICS.inc_started_gcs();
|
||||
LAYER_IMPL_METRICS.inc_started_deletes();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -671,6 +684,10 @@ impl LayerInner {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled =
|
||||
scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
|
||||
@@ -679,12 +696,14 @@ impl LayerInner {
|
||||
.upgrade()
|
||||
.ok_or_else(|| DownloadError::TimelineShutdown)?;
|
||||
|
||||
// FIXME: grab a gate
|
||||
|
||||
let can_ever_evict = timeline.remote_client.as_ref().is_some();
|
||||
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.needs_download(&timeline)
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed)?;
|
||||
|
||||
@@ -739,6 +758,8 @@ impl LayerInner {
|
||||
tracing::info!(waiters, "completing the on-demand download for other tasks");
|
||||
}
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
Ok((ResidentOrWantedEvicted::Resident(res), permit))
|
||||
};
|
||||
|
||||
@@ -832,12 +853,13 @@ impl LayerInner {
|
||||
// block tenant::mgr::remove_tenant_from_memory.
|
||||
|
||||
let this: Arc<Self> = self.clone();
|
||||
let timeline_clone = timeline.clone();
|
||||
|
||||
crate::task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
crate::task_mgr::TaskKind::RemoteDownloadTask,
|
||||
Some(self.desc.tenant_shard_id.tenant_id),
|
||||
Some(self.desc.timeline_id),
|
||||
Some(timeline.tenant_shard_id.tenant_id),
|
||||
Some(timeline.timeline_id),
|
||||
&task_name,
|
||||
false,
|
||||
async move {
|
||||
@@ -867,14 +889,13 @@ impl LayerInner {
|
||||
match res {
|
||||
(Ok(()), _) => {
|
||||
// our caller is cancellation safe so this is fine; if someone
|
||||
// else requests the layer, they'll find it already downloaded
|
||||
// or redownload.
|
||||
// else requests the layer, they'll find it already downloaded.
|
||||
//
|
||||
// however, could be that we should consider marking the layer
|
||||
// for eviction? alas, cannot: because only DownloadedLayer
|
||||
// will handle that.
|
||||
tracing::info!("layer file download completed after requester had cancelled");
|
||||
LAYER_IMPL_METRICS.inc_download_completed_without_requester();
|
||||
// See counter [`LayerImplMetrics::inc_init_needed_no_download`]
|
||||
//
|
||||
// FIXME(#6028): however, could be that we should consider marking the
|
||||
// layer for eviction? alas, cannot: because only DownloadedLayer will
|
||||
// handle that.
|
||||
},
|
||||
(Err(e), _) => {
|
||||
// our caller is cancellation safe, but we might be racing with
|
||||
@@ -894,7 +915,7 @@ impl LayerInner {
|
||||
match rx.await {
|
||||
Ok((Ok(()), permit)) => {
|
||||
if let Some(reason) = self
|
||||
.needs_download()
|
||||
.needs_download(&timeline_clone)
|
||||
.await
|
||||
.map_err(DownloadError::PostStatFailed)?
|
||||
{
|
||||
@@ -929,16 +950,26 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
match tokio::fs::metadata(&self.path).await {
|
||||
async fn needs_download(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
let path = self.local_path(timeline);
|
||||
|
||||
match tokio::fs::metadata(path).await {
|
||||
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
match self.path.metadata() {
|
||||
fn needs_download_blocking(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
let path = self.local_path(timeline);
|
||||
|
||||
match path.metadata() {
|
||||
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
|
||||
Err(e) => Err(e),
|
||||
@@ -994,14 +1025,20 @@ impl LayerInner {
|
||||
|
||||
/// `DownloadedLayer` is being dropped, so it calls this method.
|
||||
fn on_downloaded_layer_drop(self: Arc<LayerInner>, version: usize) {
|
||||
let gc = self.wanted_garbage_collected.load(Ordering::Acquire);
|
||||
let delete = self.wanted_deleted.load(Ordering::Acquire);
|
||||
let evict = self.wanted_evicted.load(Ordering::Acquire);
|
||||
let can_evict = self.have_remote_client;
|
||||
|
||||
if gc {
|
||||
// do nothing now, only in LayerInner::drop
|
||||
if delete {
|
||||
// do nothing now, only in LayerInner::drop -- this was originally implemented because
|
||||
// we could had already scheduled the deletion at the time.
|
||||
//
|
||||
// FIXME: this is not true anymore, we can safely evict wanted deleted files.
|
||||
} else if can_evict && evict {
|
||||
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, %version);
|
||||
// If timeline is alive, we can construct a span with IDs for this function.
|
||||
let span = self.timeline.upgrade().map(|timeline| {
|
||||
tracing::info_span!(parent: None, "layer_evict", tenant_id = %timeline.tenant_shard_id.tenant_id, shard_id=%timeline.tenant_shard_id.shard_slug(), timeline_id = %timeline.timeline_id)
|
||||
});
|
||||
|
||||
// downgrade for queueing, in case there's a tear down already ongoing we should not
|
||||
// hold it alive.
|
||||
@@ -1012,9 +1049,9 @@ impl LayerInner {
|
||||
// drop while the `self.inner` is being locked, leading to a deadlock.
|
||||
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
let _g = span.map(|s| s.entered());
|
||||
|
||||
// if LayerInner is already dropped here, do nothing because the garbage collection
|
||||
// if LayerInner is already dropped here, do nothing because the delete on drop
|
||||
// has already ran while we were in queue
|
||||
let Some(this) = this.upgrade() else {
|
||||
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
|
||||
@@ -1072,7 +1109,9 @@ impl LayerInner {
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
let res = match capture_mtime_and_remove(&self.path) {
|
||||
let local_path = self.local_path(&timeline);
|
||||
|
||||
let res = match capture_mtime_and_remove(&local_path) {
|
||||
Ok(local_layer_mtime) => {
|
||||
let duration = SystemTime::now().duration_since(local_layer_mtime);
|
||||
match duration {
|
||||
@@ -1224,6 +1263,11 @@ impl DownloadedLayer {
|
||||
owner: &Arc<LayerInner>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<&'a LayerKind> {
|
||||
let timeline = owner
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or(DownloadError::TimelineShutdown)?;
|
||||
|
||||
let init = || async {
|
||||
assert_eq!(
|
||||
Weak::as_ptr(&self.owner),
|
||||
@@ -1233,23 +1277,23 @@ impl DownloadedLayer {
|
||||
|
||||
let res = if owner.desc.is_delta {
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
timeline.tenant_shard_id.tenant_id,
|
||||
timeline.timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
owner.desc.lsn_range.clone(),
|
||||
));
|
||||
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
|
||||
delta_layer::DeltaLayerInner::load(&owner.local_path(&timeline), summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
} else {
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
timeline.tenant_shard_id.tenant_id,
|
||||
timeline.timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
|
||||
image_layer::ImageLayerInner::load(&owner.local_path(&timeline), lsn, summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
@@ -1373,8 +1417,14 @@ impl ResidentLayer {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> &Utf8Path {
|
||||
&self.owner.0.path
|
||||
pub(crate) fn local_path_from_id(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Utf8PathBuf {
|
||||
self.owner
|
||||
.0
|
||||
.local_path_from_id(tenant_shard_id, timeline_id)
|
||||
}
|
||||
|
||||
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
|
||||
@@ -1405,35 +1455,37 @@ impl From<ResidentLayer> for Layer {
|
||||
}
|
||||
}
|
||||
|
||||
use metrics::{IntCounter, IntCounterVec};
|
||||
use metrics::IntCounter;
|
||||
|
||||
struct LayerImplMetrics {
|
||||
pub(crate) struct LayerImplMetrics {
|
||||
started_evictions: IntCounter,
|
||||
completed_evictions: IntCounter,
|
||||
cancelled_evictions: IntCounterVec,
|
||||
cancelled_evictions: enum_map::EnumMap<EvictionCancelled, IntCounter>,
|
||||
|
||||
started_gcs: IntCounter,
|
||||
completed_gcs: IntCounter,
|
||||
failed_gcs: IntCounterVec,
|
||||
started_deletes: IntCounter,
|
||||
completed_deletes: IntCounter,
|
||||
failed_deletes: enum_map::EnumMap<DeleteFailed, IntCounter>,
|
||||
|
||||
rare_counters: IntCounterVec,
|
||||
rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
|
||||
inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
|
||||
}
|
||||
|
||||
impl Default for LayerImplMetrics {
|
||||
fn default() -> Self {
|
||||
let evictions = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_evictions_count",
|
||||
"Evictions started and completed in the Layer implementation",
|
||||
&["state"]
|
||||
use enum_map::Enum;
|
||||
|
||||
// reminder: these will be pageserver_layer_* with "_total" suffix
|
||||
|
||||
let started_evictions = metrics::register_int_counter!(
|
||||
"pageserver_layer_started_evictions",
|
||||
"Evictions started in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
let completed_evictions = metrics::register_int_counter!(
|
||||
"pageserver_layer_completed_evictions",
|
||||
"Evictions completed in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let started_evictions = evictions
|
||||
.get_metric_with_label_values(&["started"])
|
||||
.unwrap();
|
||||
let completed_evictions = evictions
|
||||
.get_metric_with_label_values(&["completed"])
|
||||
.unwrap();
|
||||
|
||||
let cancelled_evictions = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_cancelled_evictions_count",
|
||||
@@ -1442,24 +1494,36 @@ impl Default for LayerImplMetrics {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// reminder: this will be pageserver_layer_gcs_count_total with "_total" suffix
|
||||
let gcs = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_gcs_count",
|
||||
"Garbage collections started and completed in the Layer implementation",
|
||||
&["state"]
|
||||
let cancelled_evictions = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let reason = EvictionCancelled::from_usize(i);
|
||||
let s = reason.as_str();
|
||||
cancelled_evictions.with_label_values(&[s])
|
||||
}));
|
||||
|
||||
let started_deletes = metrics::register_int_counter!(
|
||||
"pageserver_layer_started_deletes",
|
||||
"Deletions on drop pending in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
let completed_deletes = metrics::register_int_counter!(
|
||||
"pageserver_layer_completed_deletes",
|
||||
"Deletions on drop completed in the Layer implementation"
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let started_gcs = gcs.get_metric_with_label_values(&["pending"]).unwrap();
|
||||
let completed_gcs = gcs.get_metric_with_label_values(&["completed"]).unwrap();
|
||||
|
||||
let failed_gcs = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_failed_gcs_count",
|
||||
"Different reasons for garbage collections to have failed",
|
||||
let failed_deletes = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_failed_deletes_count",
|
||||
"Different reasons for deletions on drop to have failed",
|
||||
&["reason"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let failed_deletes = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let reason = DeleteFailed::from_usize(i);
|
||||
let s = reason.as_str();
|
||||
failed_deletes.with_label_values(&[s])
|
||||
}));
|
||||
|
||||
let rare_counters = metrics::register_int_counter_vec!(
|
||||
"pageserver_layer_assumed_rare_count",
|
||||
"Times unexpected or assumed rare event happened",
|
||||
@@ -1467,16 +1531,29 @@ impl Default for LayerImplMetrics {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let rare_counters = enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let event = RareEvent::from_usize(i);
|
||||
let s = event.as_str();
|
||||
rare_counters.with_label_values(&[s])
|
||||
}));
|
||||
|
||||
let inits_cancelled = metrics::register_int_counter!(
|
||||
"pageserver_layer_inits_cancelled_count",
|
||||
"Times Layer initialization was cancelled",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
started_evictions,
|
||||
completed_evictions,
|
||||
cancelled_evictions,
|
||||
|
||||
started_gcs,
|
||||
completed_gcs,
|
||||
failed_gcs,
|
||||
started_deletes,
|
||||
completed_deletes,
|
||||
failed_deletes,
|
||||
|
||||
rare_counters,
|
||||
inits_cancelled,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1489,57 +1566,33 @@ impl LayerImplMetrics {
|
||||
self.completed_evictions.inc();
|
||||
}
|
||||
fn inc_eviction_cancelled(&self, reason: EvictionCancelled) {
|
||||
self.cancelled_evictions
|
||||
.get_metric_with_label_values(&[reason.as_str()])
|
||||
.unwrap()
|
||||
.inc()
|
||||
self.cancelled_evictions[reason].inc()
|
||||
}
|
||||
|
||||
fn inc_started_gcs(&self) {
|
||||
self.started_gcs.inc();
|
||||
fn inc_started_deletes(&self) {
|
||||
self.started_deletes.inc();
|
||||
}
|
||||
fn inc_completed_gcs(&self) {
|
||||
self.completed_gcs.inc();
|
||||
fn inc_completed_deletes(&self) {
|
||||
self.completed_deletes.inc();
|
||||
}
|
||||
fn inc_gcs_failed(&self, reason: GcFailed) {
|
||||
self.failed_gcs
|
||||
.get_metric_with_label_values(&[reason.as_str()])
|
||||
.unwrap()
|
||||
.inc();
|
||||
fn inc_deletes_failed(&self, reason: DeleteFailed) {
|
||||
self.failed_deletes[reason].inc();
|
||||
}
|
||||
|
||||
/// Counted separatedly from failed gcs because we will complete the gc attempt regardless of
|
||||
/// failure to delete local file.
|
||||
fn inc_gc_removes_failed(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["gc_remove_failed"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
/// Counted separatedly from failed layer deletes because we will complete the layer deletion
|
||||
/// attempt regardless of failure to delete local file.
|
||||
fn inc_delete_removes_failed(&self) {
|
||||
self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because requires a race with `evict_blocking` and
|
||||
/// `get_or_maybe_download`.
|
||||
/// Expected rare because requires a race with `evict_blocking` and `get_or_maybe_download`.
|
||||
fn inc_retried_get_or_maybe_download(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["retried_gomd"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
self.rare_counters[RareEvent::RetriedGetOrMaybeDownload].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because cancellations are unexpected
|
||||
fn inc_download_completed_without_requester(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["download_completed_without"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
}
|
||||
|
||||
/// Expected rare because cancellations are unexpected
|
||||
/// Expected rare because cancellations are unexpected, and failures are unexpected
|
||||
fn inc_download_failed_without_requester(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["download_failed_without"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
|
||||
}
|
||||
|
||||
/// The Weak in ResidentOrWantedEvicted::WantedEvicted was successfully upgraded.
|
||||
@@ -1547,37 +1600,30 @@ impl LayerImplMetrics {
|
||||
/// If this counter is always zero, we should replace ResidentOrWantedEvicted type with an
|
||||
/// Option.
|
||||
fn inc_raced_wanted_evicted_accesses(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["raced_wanted_evicted"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
self.rare_counters[RareEvent::UpgradedWantedEvicted].inc();
|
||||
}
|
||||
|
||||
/// These are only expected for [`Self::inc_download_completed_without_requester`] amount when
|
||||
/// These are only expected for [`Self::inc_init_cancelled`] amount when
|
||||
/// running with remote storage.
|
||||
fn inc_init_needed_no_download(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["init_needed_no_download"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
self.rare_counters[RareEvent::InitWithoutDownload].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because all layer files should be readable and good
|
||||
fn inc_permanent_loading_failures(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["permanent_loading_failure"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
self.rare_counters[RareEvent::PermanentLoadingFailure].inc();
|
||||
}
|
||||
|
||||
fn inc_broadcast_lagged(&self) {
|
||||
self.rare_counters
|
||||
.get_metric_with_label_values(&["broadcast_lagged"])
|
||||
.unwrap()
|
||||
.inc();
|
||||
self.rare_counters[RareEvent::EvictAndWaitLagged].inc();
|
||||
}
|
||||
|
||||
fn inc_init_cancelled(&self) {
|
||||
self.inits_cancelled.inc()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(enum_map::Enum)]
|
||||
enum EvictionCancelled {
|
||||
LayerGone,
|
||||
TimelineGone,
|
||||
@@ -1606,19 +1652,47 @@ impl EvictionCancelled {
|
||||
}
|
||||
}
|
||||
|
||||
enum GcFailed {
|
||||
#[derive(enum_map::Enum)]
|
||||
enum DeleteFailed {
|
||||
TimelineGone,
|
||||
DeleteSchedulingFailed,
|
||||
}
|
||||
|
||||
impl GcFailed {
|
||||
impl DeleteFailed {
|
||||
fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
GcFailed::TimelineGone => "timeline_gone",
|
||||
GcFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
|
||||
DeleteFailed::TimelineGone => "timeline_gone",
|
||||
DeleteFailed::DeleteSchedulingFailed => "delete_scheduling_failed",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
|
||||
#[derive(enum_map::Enum)]
|
||||
enum RareEvent {
|
||||
RemoveOnDropFailed,
|
||||
RetriedGetOrMaybeDownload,
|
||||
DownloadFailedWithoutRequester,
|
||||
UpgradedWantedEvicted,
|
||||
InitWithoutDownload,
|
||||
PermanentLoadingFailure,
|
||||
EvictAndWaitLagged,
|
||||
}
|
||||
|
||||
impl RareEvent {
|
||||
fn as_str(&self) -> &'static str {
|
||||
use RareEvent::*;
|
||||
|
||||
match self {
|
||||
RemoveOnDropFailed => "remove_on_drop_failed",
|
||||
RetriedGetOrMaybeDownload => "retried_gomd",
|
||||
DownloadFailedWithoutRequester => "download_failed_without",
|
||||
UpgradedWantedEvicted => "raced_wanted_evicted",
|
||||
InitWithoutDownload => "init_needed_no_download",
|
||||
PermanentLoadingFailure => "permanent_loading_failure",
|
||||
EvictAndWaitLagged => "broadcast_lagged",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static LAYER_IMPL_METRICS: once_cell::sync::Lazy<LayerImplMetrics> =
|
||||
once_cell::sync::Lazy::new(LayerImplMetrics::default);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use core::fmt::Display;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::ops::Range;
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::repository::Key;
|
||||
|
||||
@@ -9,16 +8,11 @@ use super::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[cfg(test)]
|
||||
use utils::id::TenantId;
|
||||
|
||||
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
|
||||
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
|
||||
/// a unified way to generate layer information like file name.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct PersistentLayerDesc {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
/// Range of keys that this layer covers
|
||||
pub key_range: Range<Key>,
|
||||
/// Inclusive start, exclusive end of the LSN range that this layer holds.
|
||||
@@ -57,8 +51,6 @@ impl PersistentLayerDesc {
|
||||
#[cfg(test)]
|
||||
pub fn new_test(key_range: Range<Key>) -> Self {
|
||||
Self {
|
||||
tenant_shard_id: TenantShardId::unsharded(TenantId::generate()),
|
||||
timeline_id: TimelineId::generate(),
|
||||
key_range,
|
||||
lsn_range: Lsn(0)..Lsn(1),
|
||||
is_delta: false,
|
||||
@@ -66,16 +58,8 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_img(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
pub fn new_img(key_range: Range<Key>, lsn: Lsn, file_size: u64) -> Self {
|
||||
Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key_range,
|
||||
lsn_range: Self::image_layer_lsn_range(lsn),
|
||||
is_delta: false,
|
||||
@@ -83,16 +67,8 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_delta(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
pub fn new_delta(key_range: Range<Key>, lsn_range: Range<Lsn>, file_size: u64) -> Self {
|
||||
Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key_range,
|
||||
lsn_range,
|
||||
is_delta: true,
|
||||
@@ -100,23 +76,10 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_filename(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
filename: LayerFileName,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
pub fn from_filename(filename: LayerFileName, file_size: u64) -> Self {
|
||||
match filename {
|
||||
LayerFileName::Image(i) => {
|
||||
Self::new_img(tenant_shard_id, timeline_id, i.key_range, i.lsn, file_size)
|
||||
}
|
||||
LayerFileName::Delta(d) => Self::new_delta(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
d.key_range,
|
||||
d.lsn_range,
|
||||
file_size,
|
||||
),
|
||||
LayerFileName::Image(i) => Self::new_img(i.key_range, i.lsn, file_size),
|
||||
LayerFileName::Delta(d) => Self::new_delta(d.key_range, d.lsn_range, file_size),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,10 +136,6 @@ impl PersistentLayerDesc {
|
||||
self.key_range.clone()
|
||||
}
|
||||
|
||||
pub fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
|
||||
/// Does this layer only contain some data for the key-range (incremental),
|
||||
/// or does it contain a version of every page? This is important to know
|
||||
/// for garbage collecting old layers: an incremental layer depends on
|
||||
@@ -192,9 +151,7 @@ impl PersistentLayerDesc {
|
||||
pub fn dump(&self) {
|
||||
if self.is_delta {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} is_incremental {} size {} ----",
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"----- delta layer keys {}-{} lsn {}-{} is_incremental {} size {} ----",
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
@@ -204,9 +161,7 @@ impl PersistentLayerDesc {
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"----- image layer key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.image_layer_lsn(),
|
||||
|
||||
@@ -313,6 +313,10 @@ pub struct Timeline {
|
||||
/// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
|
||||
pub(crate) gate: Gate,
|
||||
|
||||
/// Gate to prevent shutdown completing until all Layers for this Timeline have finished
|
||||
/// doing any background I/O such as deleting files on drop.
|
||||
pub(crate) layer_gate: Gate,
|
||||
|
||||
/// Cancellation token scoped to this timeline: anything doing long-running work relating
|
||||
/// to the timeline should drop out when this token fires.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
@@ -478,7 +482,7 @@ impl Timeline {
|
||||
.map(|ancestor| ancestor.timeline_id)
|
||||
}
|
||||
|
||||
/// Lock and get timeline's GC cuttof
|
||||
/// Lock and get timeline's GC cutoff
|
||||
pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read()
|
||||
}
|
||||
@@ -1002,8 +1006,15 @@ impl Timeline {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Finally wait until any gate-holders are complete
|
||||
// Wait until any normal gate-holders such as page_service requests are complete
|
||||
self.gate.close().await;
|
||||
|
||||
// Drop our references to layers: this should permit all layers to be dropped, and any I/O
|
||||
// in their drop() method to complete.
|
||||
self.layers.write().await.clear();
|
||||
|
||||
// Wait until any Layer gate holders such as LayerInner::drop are complete
|
||||
self.layer_gate.close().await;
|
||||
}
|
||||
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
@@ -1445,6 +1456,7 @@ impl Timeline {
|
||||
|
||||
cancel,
|
||||
gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
|
||||
layer_gate: Gate::new(format!("TimelineLayers<{tenant_shard_id}/{timeline_id}>")),
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
@@ -2176,7 +2188,7 @@ trait TraversalLayerExt {
|
||||
|
||||
impl TraversalLayerExt for Layer {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
self.local_path().to_string()
|
||||
self.filename().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2890,7 +2902,8 @@ impl Timeline {
|
||||
let _g = span.entered();
|
||||
let new_delta =
|
||||
Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
|
||||
let new_delta_path = new_delta.local_path().to_owned();
|
||||
let new_delta_path = new_delta
|
||||
.local_path_from_id(&self_clone.tenant_shard_id, &self_clone.timeline_id);
|
||||
|
||||
// Sync it to disk.
|
||||
//
|
||||
@@ -3134,7 +3147,7 @@ impl Timeline {
|
||||
// and fsync them all in parallel.
|
||||
let all_paths = image_layers
|
||||
.iter()
|
||||
.map(|layer| layer.local_path().to_owned())
|
||||
.map(|layer| layer.local_path_from_id(&self.tenant_shard_id, &self.timeline_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
par_fsync::par_fsync_async(&all_paths)
|
||||
@@ -3683,7 +3696,7 @@ impl Timeline {
|
||||
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
|
||||
let layer_paths: Vec<Utf8PathBuf> = new_layers
|
||||
.iter()
|
||||
.map(|l| l.local_path().to_owned())
|
||||
.map(|l| l.local_path_from_id(&self.tenant_shard_id, &self.timeline_id))
|
||||
.collect();
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
@@ -3971,7 +3984,7 @@ impl Timeline {
|
||||
// for details. This will block until the old value is no longer in use.
|
||||
//
|
||||
// The GC cutoff should only ever move forwards.
|
||||
{
|
||||
let waitlist = {
|
||||
let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
|
||||
ensure!(
|
||||
*write_guard <= new_gc_cutoff,
|
||||
@@ -3979,8 +3992,9 @@ impl Timeline {
|
||||
*write_guard,
|
||||
new_gc_cutoff
|
||||
);
|
||||
write_guard.store_and_unlock(new_gc_cutoff).wait();
|
||||
}
|
||||
write_guard.store_and_unlock(new_gc_cutoff)
|
||||
};
|
||||
waitlist.wait().await;
|
||||
|
||||
info!("GC starting");
|
||||
|
||||
|
||||
@@ -33,6 +33,11 @@ impl LayerManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self) {
|
||||
self.layer_map = LayerMap::default();
|
||||
self.layer_fmgr.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
|
||||
self.layer_fmgr.get_from_desc(desc)
|
||||
}
|
||||
@@ -243,7 +248,7 @@ impl LayerManager {
|
||||
// map index without actually rebuilding the index.
|
||||
updates.remove_historic(desc);
|
||||
mapping.remove(layer);
|
||||
layer.garbage_collect_on_drop();
|
||||
layer.delete_on_drop();
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, layer: &Layer) -> bool {
|
||||
@@ -271,6 +276,10 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self) {
|
||||
self.0.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, layer: &T) -> bool {
|
||||
self.0.contains_key(&layer.layer_desc().key())
|
||||
}
|
||||
|
||||
@@ -610,9 +610,11 @@ impl Drop for VirtualFile {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Close)
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
if let Some(fd) = slot_guard.file.take() {
|
||||
STORAGE_IO_TIME_METRIC
|
||||
.get(StorageIoOperation::Close)
|
||||
.observe_closure_duration(|| drop(fd));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fsm_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/md.h"
|
||||
#include "pgstat.h"
|
||||
@@ -2722,6 +2723,86 @@ smgr_init_neon(void)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, XLogRecPtr end_recptr)
|
||||
{
|
||||
BlockNumber relsize;
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
{
|
||||
if (relsize < blkno + 1)
|
||||
{
|
||||
update_cached_relsize(rinfo, forknum, blkno + 1);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Size was not cached. We populate the cache now, with the size of the
|
||||
* relation measured after this WAL record is applied.
|
||||
*
|
||||
* This length is later reused when we open the smgr to read the block,
|
||||
* which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
response = page_server_request(&request);
|
||||
|
||||
Assert(response->tag == T_NeonNblocksResponse);
|
||||
nbresponse = (NeonNblocksResponse *) response;
|
||||
|
||||
relsize = Max(nbresponse->n_blocks, blkno+1);
|
||||
|
||||
set_cached_relsize(rinfo, forknum, relsize);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", relsize);
|
||||
}
|
||||
}
|
||||
|
||||
#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4)
|
||||
|
||||
/*
|
||||
* TODO: May be it is better to make correspondent fgunctio from freespace.c public?
|
||||
*/
|
||||
static BlockNumber
|
||||
get_fsm_physical_block(BlockNumber heapblk)
|
||||
{
|
||||
BlockNumber pages;
|
||||
int leafno;
|
||||
int l;
|
||||
|
||||
/*
|
||||
* Calculate the logical page number of the first leaf page below the
|
||||
* given page.
|
||||
*/
|
||||
leafno = heapblk / SlotsPerFSMPage;
|
||||
|
||||
/* Count upper level nodes required to address the leaf page */
|
||||
pages = 0;
|
||||
for (l = 0; l < FSM_TREE_DEPTH; l++)
|
||||
{
|
||||
pages += leafno + 1;
|
||||
leafno /= SlotsPerFSMPage;
|
||||
}
|
||||
|
||||
/* Turn the page count into 0-based block number */
|
||||
return pages - 1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return whether we can skip the redo for this block.
|
||||
*
|
||||
@@ -2769,7 +2850,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
LWLock *partitionLock;
|
||||
Buffer buffer;
|
||||
bool no_redo_needed;
|
||||
BlockNumber relsize;
|
||||
|
||||
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
|
||||
return true;
|
||||
@@ -2819,49 +2899,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
|
||||
if (forknum == MAIN_FORKNUM)
|
||||
{
|
||||
if (relsize < blkno + 1)
|
||||
{
|
||||
update_cached_relsize(rinfo, forknum, blkno + 1);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
}
|
||||
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Size was not cached. We populate the cache now, with the size of the
|
||||
* relation measured after this WAL record is applied.
|
||||
*
|
||||
* This length is later reused when we open the smgr to read the block,
|
||||
* which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
response = page_server_request(&request);
|
||||
|
||||
Assert(response->tag == T_NeonNblocksResponse);
|
||||
nbresponse = (NeonNblocksResponse *) response;
|
||||
|
||||
Assert(nbresponse->n_blocks > blkno);
|
||||
|
||||
set_cached_relsize(rinfo, forknum, nbresponse->n_blocks);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
|
||||
}
|
||||
|
||||
return no_redo_needed;
|
||||
}
|
||||
|
||||
@@ -260,6 +260,14 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_reset(self, tenant_id: TenantId, drop_cache: bool):
|
||||
params = {}
|
||||
if drop_cache:
|
||||
params["drop_cache"] = "true"
|
||||
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_delete(self, tenant_id: TenantId):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
29
test_runner/regress/test_physical_replication.py
Normal file
29
test_runner/regress/test_physical_replication.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_physical_replication(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
n_records = 100000
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute(
|
||||
"CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))"
|
||||
)
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
for pk in range(n_records):
|
||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||
s_cur.execute(
|
||||
"select * from t where pk=%s", (random.randrange(1, n_records),)
|
||||
)
|
||||
@@ -840,7 +840,7 @@ def test_compaction_waits_for_upload(
|
||||
), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)"
|
||||
|
||||
def layer_deletes_completed():
|
||||
m = client.get_metric_value("pageserver_layer_gcs_count_total", {"state": "completed"})
|
||||
m = client.get_metric_value("pageserver_layer_completed_deletes_total")
|
||||
if m is None:
|
||||
return 0
|
||||
return int(m)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import enum
|
||||
import random
|
||||
import time
|
||||
from threading import Thread
|
||||
@@ -51,11 +52,20 @@ def do_gc_target(
|
||||
log.info("gc http thread returning")
|
||||
|
||||
|
||||
class ReattachMode(str, enum.Enum):
|
||||
REATTACH_EXPLICIT = "explicit"
|
||||
REATTACH_RESET = "reset"
|
||||
REATTACH_RESET_DROP = "reset"
|
||||
|
||||
|
||||
# Basic detach and re-attach test
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize(
|
||||
"mode",
|
||||
[ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
|
||||
)
|
||||
def test_tenant_reattach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
@@ -100,8 +110,15 @@ def test_tenant_reattach(
|
||||
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
|
||||
)
|
||||
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
if mode == ReattachMode.REATTACH_EXPLICIT:
|
||||
# Explicitly detach then attach the tenant as two separate API calls
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP):
|
||||
# Use the reset API to detach/attach in one shot
|
||||
pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP)
|
||||
else:
|
||||
raise NotImplementedError(mode)
|
||||
|
||||
time.sleep(1) # for metrics propagation
|
||||
|
||||
|
||||
Reference in New Issue
Block a user