Compare commits

..

18 Commits

Author SHA1 Message Date
Christian Schwarz
c5ec255ce8 implement a standalone no-op server
usable by getpage_bench_libpq by running it on a different
port than the pageserver libpq listener, and overriding
connstring for getpage_bench_libpq to point to the
noop_server
2023-11-03 11:59:51 +00:00
Christian Schwarz
ebf956115c getpage_bench_libpq: support for the no-op mode 2023-11-03 11:59:11 +00:00
Christian Schwarz
34cffd2c43 no-op pagestream request/response type (server-side impl) 2023-11-03 11:57:52 +00:00
Christian Schwarz
2d37857351 pq bench: avoid repeated conversion to_i128 2023-11-02 17:43:59 +00:00
Christian Schwarz
ddfce0cfa5 per-second RPS 2023-11-02 17:11:37 +00:00
Christian Schwarz
d52a622115 pq bench: proper shutdown 2023-11-02 17:07:00 +00:00
Christian Schwarz
a066eecda0 http bench: sligthly improved stats 2023-11-02 17:06:36 +00:00
Christian Schwarz
94e94af6c7 WIP: libpq-based client
depends on https://github.com/neondatabase/rust-postgres/pull/25
2023-11-02 16:28:16 +01:00
Christian Schwarz
df7346eaff Revert "CP tokio_epoll_uring for read path"
This reverts commit 82d9c68667.
2023-11-02 11:32:48 +01:00
Christian Schwarz
76efb1b79b Revert "CP: use hacked-together open_at for async VirtualFile open calls instead of spawn_blocking"
This reverts commit 55cdf6c7ff.
2023-11-02 11:32:41 +01:00
Christian Schwarz
2f656c6691 rename getpage_bench to getpage_bench_http 2023-11-02 10:59:54 +01:00
Christian Schwarz
bb5b5cbdac WIP: benchmark that does random getpage requests against the keyspace
backup of pageserver.toml

    d =1
    pg_distrib_dir ='/home/admin/neon-main/pg_install'
    http_auth_type ='Trust'
    pg_auth_type ='Trust'
    listen_http_addr ='127.0.0.1:9898'
    listen_pg_addr ='127.0.0.1:64000'
    broker_endpoint ='http://127.0.0.1:50051/'
    #control_plane_api ='http://127.0.0.1:1234/'

    # Initial configuration file created by 'pageserver --init'
    #listen_pg_addr = '127.0.0.1:64000'
    #listen_http_addr = '127.0.0.1:9898'

    #wait_lsn_timeout = '60 s'
    #wal_redo_timeout = '60 s'

    #max_file_descriptors = 10000
    #page_cache_size = 160000

    # initial superuser role name to use when creating a new tenant
    #initial_superuser_name = 'cloud_admin'

    #broker_endpoint = 'http://127.0.0.1:50051'

    #log_format = 'plain'

    #concurrent_tenant_size_logical_size_queries = '1'

    #metric_collection_interval = '10 min'
    #cached_metric_collection_interval = '0s'
    #synthetic_size_calculation_interval = '10 min'

    #disk_usage_based_eviction = { max_usage_pct = .., min_avail_bytes = .., period = "10s"}

    #background_task_maximum_delay = '10s'

    [tenant_config]
    #checkpoint_distance = 268435456 # in bytes
    #checkpoint_timeout = 10 m
    #compaction_target_size = 134217728 # in bytes
    #compaction_period = '20 s'
    #compaction_threshold = 10

    #gc_period = '1 hr'
    #gc_horizon = 67108864
    #image_creation_threshold = 3
    #pitr_interval = '7 days'

    #min_resident_size_override = .. # in bytes
    #evictions_low_residence_duration_metric_threshold = '24 hour'
    #gc_feedback = false

    # make it determinsitic
    gc_period = '0s'
    checkpoint_timeout = '3650 day'
    compaction_period = '20 s'
    compaction_threshold = 10
    compaction_target_size = 134217728
    checkpoint_distance = 268435456
    image_creation_threshold = 3

    [remote_storage]
    local_path = '/home/admin/neon-main/bench_repo_dir/repo/remote_storage_local_fs'
2023-10-26 17:53:03 +00:00
Christian Schwarz
55cdf6c7ff CP: use hacked-together open_at for async VirtualFile open calls instead of spawn_blocking
This makes Delta/Image ::load fns fully tokio-epoll-uring
2023-10-26 17:40:33 +00:00
Christian Schwarz
82d9c68667 CP tokio_epoll_uring for read path 2023-10-26 17:22:23 +00:00
Christian Schwarz
bc91c40f56 Revert "revert recent VirtualFile asyncification changes (#5291)"
This reverts commit ab1f37e908.
2023-10-26 17:22:10 +00:00
Christian Schwarz
c5f58ef3f7 API to duplicate a tenant 2023-10-26 16:30:11 +00:00
Christian Schwarz
bb8531d920 Revert "WIP cleanup unused RemoteStorage fields + half-baked copy_file impl"
This reverts commit 7553bbe3f5.
2023-10-26 15:44:26 +00:00
Christian Schwarz
7553bbe3f5 WIP cleanup unused RemoteStorage fields + half-baked copy_file impl 2023-10-26 15:44:03 +00:00
61 changed files with 4260 additions and 5979 deletions

29
Cargo.lock generated
View File

@@ -2932,6 +2932,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -3198,6 +3208,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "pagectl"
version = "0.1.0"
@@ -3283,10 +3299,12 @@ dependencies = [
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"tracing",
"tracing-subscriber",
"url",
"utils",
"walkdir",
@@ -3561,7 +3579,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=problame/copy-both-duplex-public#5c462bd3500e657c014ef087e4eef2c1a8f0ebda"
dependencies = [
"bytes",
"fallible-iterator",
@@ -3574,7 +3592,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=problame/copy-both-duplex-public#5c462bd3500e657c014ef087e4eef2c1a8f0ebda"
dependencies = [
"native-tls",
"tokio",
@@ -3585,7 +3603,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=problame/copy-both-duplex-public#5c462bd3500e657c014ef087e4eef2c1a8f0ebda"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -3603,7 +3621,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=problame/copy-both-duplex-public#5c462bd3500e657c014ef087e4eef2c1a8f0ebda"
dependencies = [
"bytes",
"fallible-iterator",
@@ -5407,7 +5425,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=7434d9388965a17a6d113e5dfc0e65666a03b4c2#7434d9388965a17a6d113e5dfc0e65666a03b4c2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=problame/copy-both-duplex-public#5c462bd3500e657c014ef087e4eef2c1a8f0ebda"
dependencies = [
"async-trait",
"byteorder",
@@ -5764,6 +5782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",

View File

@@ -161,11 +161,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="problame/copy-both-duplex-public" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="problame/copy-both-duplex-public" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="problame/copy-both-duplex-public" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="problame/copy-both-duplex-public" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="problame/copy-both-duplex-public" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -202,7 +202,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="7434d9388965a17a6d113e5dfc0e65666a03b4c2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="problame/copy-both-duplex-public" }
################# Binary contents sections

View File

@@ -221,21 +221,8 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
generation: 0,
});
if let Some(attaching_pageserver) = attach_req.pageserver_id.as_ref() {
if attach_req.pageserver_id.is_some() {
tenant_state.generation += 1;
tracing::info!(
"attach_hook: issuing generation {} to pageserver {}",
attaching_pageserver,
tenant_state.generation
);
} else if let Some(ps_id) = tenant_state.pageserver {
tracing::info!(
"attach_hook: dropping pageserver {} in generation {}",
ps_id,
tenant_state.generation
);
} else {
tracing::info!("attach_hook: no-op: tenant already has no pageserver");
}
tenant_state.pageserver = attach_req.pageserver_id;
let generation = tenant_state.generation;

View File

@@ -18,7 +18,7 @@ use utils::{
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
@@ -612,15 +612,18 @@ pub enum PagestreamFeMessage {
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
NoOp,
}
// Wrapped in libpq CopyData
#[derive(Debug)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
NoOp,
}
#[derive(Debug, PartialEq, Eq)]
@@ -719,6 +722,10 @@ impl PagestreamFeMessage {
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
Self::NoOp => {
bytes.put_u8(4);
}
}
bytes.into()
@@ -769,6 +776,7 @@ impl PagestreamFeMessage {
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::NoOp),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -803,10 +811,46 @@ impl PagestreamBeMessage {
bytes.put_u8(104); /* tag from pagestore_client.h */
bytes.put_i64(resp.db_size);
}
Self::NoOp => {
bytes.put_u8(105);
}
}
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
match msg_tag {
100 => todo!(),
101 => todo!(),
102 => {
let buf = buf.get_ref();
/* TODO use constant */
if buf.len() == 8192 {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page: buf.clone(),
}))
} else {
anyhow::bail!("invalid page size: {}", buf.len());
}
}
103 => {
let buf = buf.get_ref();
let cstr = std::ffi::CStr::from_bytes_until_nul(&buf)?;
let rust_str = cstr.to_str()?;
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
message: rust_str.to_owned(),
}))
}
104 => todo!(),
105 => {
Ok(PagestreamBeMessage::NoOp)
},
_ => bail!("unknown tag: {:?}", msg_tag),
}
}
}
#[cfg(test)]

View File

@@ -112,7 +112,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join(&self, segment: &Utf8Path) -> Self {
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
Self(self.0.join(segment))
}

View File

@@ -73,8 +73,6 @@ pub mod completion;
/// Reporting utilities
pub mod error;
pub mod sync;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -1 +0,0 @@
pub mod heavier_once_cell;

View File

@@ -1,306 +0,0 @@
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
/// for the duration of initialization.
///
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].
///
/// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit
pub struct OnceCell<T> {
inner: Mutex<Inner<T>>,
}
impl<T> Default for OnceCell<T> {
/// Create new uninitialized [`OnceCell`].
fn default() -> Self {
Self {
inner: Default::default(),
}
}
}
/// Semaphore is the current state:
/// - open semaphore means the value is `None`, not yet initialized
/// - closed semaphore means the value has been initialized
#[derive(Debug)]
struct Inner<T> {
init_semaphore: Arc<Semaphore>,
value: Option<T>,
}
impl<T> Default for Inner<T> {
fn default() -> Self {
Self {
init_semaphore: Arc::new(Semaphore::new(1)),
value: None,
}
}
}
impl<T> OnceCell<T> {
/// Creates an already initialized `OnceCell` with the given value.
pub fn new(value: T) -> Self {
let sem = Semaphore::new(1);
sem.close();
Self {
inner: Mutex::new(Inner {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
}
}
/// Returns a guard to an existing initialized value, or uniquely initializes the value before
/// returning the guard.
///
/// Initializing might wait on any existing [`Guard::take_and_deinit`] deinitialization.
///
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let sem = {
let guard = self.inner.lock().unwrap();
if guard.value.is_some() {
return Ok(Guard(guard));
}
guard.init_semaphore.clone()
};
let permit = sem.acquire_owned().await;
if permit.is_err() {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
} else {
// now we try
let value = factory().await?;
let mut guard = self.inner.lock().unwrap();
assert!(
guard.value.is_none(),
"we won permit, must not be initialized"
);
guard.value = Some(value);
guard.init_semaphore.close();
Ok(Guard(guard))
}
}
/// Returns a guard to an existing initialized value, if any.
pub fn get(&self) -> Option<Guard<'_, T>> {
let guard = self.inner.lock().unwrap();
if guard.value.is_some() {
Some(Guard(guard))
} else {
None
}
}
}
/// Uninteresting guard object to allow short-lived access to inspect or clone the held,
/// initialized value.
#[derive(Debug)]
pub struct Guard<'a, T>(MutexGuard<'a, Inner<T>>);
impl<T> std::ops::Deref for Guard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0
.value
.as_ref()
.expect("guard is not created unless value has been initialized")
}
}
impl<T> std::ops::DerefMut for Guard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
.value
.as_mut()
.expect("guard is not created unless value has been initialized")
}
}
impl<'a, T> Guard<'a, T> {
/// Take the current value, and a new permit for it's deinitialization.
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
.clone()
.try_acquire_owned()
.expect("we just created this");
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, permit))
.expect("guard is not created unless value has been initialized")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
convert::Infallible,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
#[tokio::test]
async fn many_initializers() {
#[derive(Default, Debug)]
struct Counters {
factory_got_to_run: AtomicUsize,
future_polled: AtomicUsize,
winners: AtomicUsize,
}
let initializers = 100;
let cell = Arc::new(OnceCell::default());
let counters = Arc::new(Counters::default());
let barrier = Arc::new(tokio::sync::Barrier::new(initializers + 1));
let mut js = tokio::task::JoinSet::new();
for i in 0..initializers {
js.spawn({
let cell = cell.clone();
let counters = counters.clone();
let barrier = barrier.clone();
async move {
barrier.wait().await;
let won = {
let g = cell
.get_or_init(|| {
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
async {
counters.future_polled.fetch_add(1, Ordering::Relaxed);
Ok::<_, Infallible>(i)
}
})
.await
.unwrap();
*g == i
};
if won {
counters.winners.fetch_add(1, Ordering::Relaxed);
}
}
});
}
barrier.wait().await;
while let Some(next) = js.join_next().await {
next.expect("no panics expected");
}
let mut counters = Arc::try_unwrap(counters).unwrap();
assert_eq!(*counters.factory_got_to_run.get_mut(), 1);
assert_eq!(*counters.future_polled.get_mut(), 1);
assert_eq!(*counters.winners.get_mut(), 1);
}
#[tokio::test(start_paused = true)]
async fn reinit_waits_for_deinit() {
// with the tokio::time paused, we will "sleep" for 1s while holding the reinitialization
let sleep_for = Duration::from_secs(1);
let initial = 42;
let reinit = 1;
let cell = Arc::new(OnceCell::new(initial));
let deinitialization_started = Arc::new(tokio::sync::Barrier::new(2));
let jh = tokio::spawn({
let cell = cell.clone();
let deinitialization_started = deinitialization_started.clone();
async move {
let (answer, _permit) = cell.get().expect("initialized to value").take_and_deinit();
assert_eq!(answer, initial);
deinitialization_started.wait().await;
tokio::time::sleep(sleep_for).await;
}
});
deinitialization_started.wait().await;
let started_at = tokio::time::Instant::now();
cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) })
.await
.unwrap();
let elapsed = started_at.elapsed();
assert!(
elapsed >= sleep_for,
"initialization should had taken at least the time time slept with permit"
);
jh.await.unwrap();
assert_eq!(*cell.get().unwrap(), reinit);
}
#[tokio::test]
async fn initialization_attemptable_until_ok() {
let cell = OnceCell::default();
for _ in 0..10 {
cell.get_or_init(|| async { Err("whatever error") })
.await
.unwrap_err();
}
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("finally success") })
.await
.unwrap();
assert_eq!(*g, "finally success");
}
#[tokio::test]
async fn initialization_is_cancellation_safe() {
let cell = OnceCell::default();
let barrier = tokio::sync::Barrier::new(2);
let initializer = cell.get_or_init(|| async {
barrier.wait().await;
futures::future::pending::<()>().await;
Ok::<_, Infallible>("never reached")
});
tokio::select! {
_ = initializer => { unreachable!("cannot complete; stuck in pending().await") },
_ = barrier.wait() => {}
};
// now initializer is dropped
assert!(cell.get().is_none());
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("now initialized") })
.await
.unwrap();
assert_eq!(*g, "now initialized");
}
}

View File

@@ -82,6 +82,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tokio-stream.workspace = true
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
[dev-dependencies]
criterion.workspace = true

View File

@@ -0,0 +1,245 @@
use clap::Parser;
use hyper::client::conn::Parts;
use hyper::client::HttpConnector;
use hyper::{Body, Client, Uri};
use pageserver::{repository, tenant};
use rand::prelude::*;
use std::env::args;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinHandle;
struct Key(repository::Key);
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
repository::Key::from_hex(s).map(Key)
}
}
struct KeyRange {
start: Key,
end: Key,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end.0.to_i128() - self.start.0.to_i128()
}
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, default_value = "http://localhost:9898")]
ps_endpoint: String,
// tenant_id: String,
// timeline_id: String,
num_tasks: usize,
num_requests: usize,
tenants: Option<Vec<String>>,
#[clap(long)]
pick_n_tenants: Option<usize>,
}
#[derive(Debug, Default)]
struct Stats {
completed_requests: AtomicU64,
}
impl Stats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[tokio::main]
async fn main() {
let args: &'static Args = Box::leak(Box::new(Args::parse()));
let client = Client::new();
let tenants = if let Some(tenants) = &args.tenants {
tenants.clone()
} else {
// let tenant_id = "b97965931096047b2d54958756baee7b";
// let timeline_id = "2868f84a8d166779e4c651b116c45059";
let resp = client
.get(Uri::try_from(&format!("{}/v1/tenant", args.ps_endpoint)).unwrap())
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
let mut out = Vec::new();
for t in tenants.as_array().unwrap() {
if let Some(limit) = args.pick_n_tenants {
if out.len() >= limit {
break;
}
}
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
}
if let Some(limit) = args.pick_n_tenants {
assert_eq!(out.len(), limit);
}
out
};
let mut tenant_timelines = Vec::new();
for tenant_id in tenants {
let resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline",
args.ps_endpoint, tenant_id
))
.unwrap(),
)
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
for t in timelines.as_array().unwrap() {
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
tenant_timelines.push((tenant_id.clone(), timeline_id));
}
}
println!("tenant_timelines:\n{:?}", tenant_timelines);
let mut stats = Arc::new(Stats::default());
tokio::spawn({
let stats = Arc::clone(&stats);
async move {
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut tasks = Vec::new();
for (tenant_id, timeline_id) in tenant_timelines {
let t = tokio::spawn(timeline(
args,
client.clone(),
tenant_id,
timeline_id,
Arc::clone(&stats),
));
tasks.push(t);
}
for t in tasks {
t.await.unwrap();
}
}
fn timeline(
args: &'static Args,
client: Client<HttpConnector, Body>,
tenant_id: String,
timeline_id: String,
stats: Arc<Stats>,
) -> impl Future<Output = ()> {
async move {
let mut resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline/{}/keyspace",
args.ps_endpoint, tenant_id, timeline_id
))
.unwrap(),
)
.await
.unwrap();
if !resp.status().is_success() {
panic!("Failed to get keyspace: {resp:?}");
}
let body = hyper::body::to_bytes(resp).await.unwrap();
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
let lsn = Arc::new(keyspace["at_lsn"].as_str().unwrap().to_owned());
let ranges = keyspace["keys"]
.as_array()
.unwrap()
.iter()
.map(|r| {
let r = r.as_array().unwrap();
assert_eq!(r.len(), 2);
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
KeyRange { start, end }
})
.collect::<Vec<_>>();
// weighted ranges
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let (tx, mut rx) = channel::<i32>(1000);
let tx = Arc::new(AsyncMutex::new(tx));
let mut tasks = Vec::<JoinHandle<()>>::new();
let start = std::time::Instant::now();
for i in 0..args.num_tasks {
let ranges = ranges.clone();
let weights = weights.clone();
let lsn = lsn.clone();
let client = client.clone();
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let stats = Arc::clone(&stats);
let task = tokio::spawn(async move {
for i in 0..args.num_requests {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128()));
key
};
let url = format!(
"{}/v1/tenant/{}/timeline/{}/getpage?key={:036x}&lsn={}",
args.ps_endpoint, tenant_id, timeline_id, key, lsn
);
let uri = url.parse::<Uri>().unwrap();
let resp = client.get(uri).await.unwrap();
stats.inc();
}
});
tasks.push(task);
}
drop(tx);
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
(args.num_requests * args.num_tasks) as f64 / elapsed.as_secs_f64()
);
}
}

View File

@@ -0,0 +1,411 @@
use anyhow::Context;
use clap::Parser;
use futures::{SinkExt, TryStreamExt};
use hyper::client::conn::Parts;
use hyper::client::HttpConnector;
use hyper::{Client, Uri};
use pageserver::page_cache::PAGE_SZ;
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
use pageserver::{repository, tenant};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
};
use pageserver_api::reltag::RelTag;
use rand::prelude::*;
use scopeguard::defer;
use std::env::args;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinHandle;
use tokio_stream::{Stream, StreamExt};
use utils::completion;
use utils::lsn::Lsn;
struct Key(repository::Key);
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
repository::Key::from_hex(s).map(Key)
}
}
struct KeyRange {
start: i128,
end: i128,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end - self.start
}
}
struct RelTagBlockNo {
rel_tag: RelTag,
block_no: u32,
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, default_value = "http://localhost:9898")]
ps_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
pq_client_connstring: String,
// tenant_id: String,
// timeline_id: String,
num_tasks: usize,
num_requests: usize,
tenants: Option<Vec<String>>,
#[clap(long)]
pick_n_tenants: Option<usize>,
#[clap(subcommand)]
mode: Mode,
}
#[derive(clap::Parser, Clone)]
enum Mode {
GetPage,
NoOp,
}
#[derive(Debug, Default)]
struct Stats {
completed_requests: AtomicU64,
}
impl Stats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[tokio::main]
async fn main() {
let args: &'static Args = Box::leak(Box::new(Args::parse()));
// std::env::set_var("RUST_LOG", "info,tokio_postgres=trace");
// tracing_subscriber::fmt::init();
let client = Client::new();
let tenants = if let Some(tenants) = &args.tenants {
tenants.clone()
} else {
// let tenant_id = "b97965931096047b2d54958756baee7b";
// let timeline_id = "2868f84a8d166779e4c651b116c45059";
let resp = client
.get(Uri::try_from(&format!("{}/v1/tenant", args.ps_endpoint)).unwrap())
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
let mut out = Vec::new();
for t in tenants.as_array().unwrap() {
if let Some(limit) = args.pick_n_tenants {
if out.len() >= limit {
break;
}
}
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
}
if let Some(limit) = args.pick_n_tenants {
assert_eq!(out.len(), limit);
}
out
};
let mut tenant_timelines = Vec::new();
for tenant_id in tenants {
let resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline",
args.ps_endpoint, tenant_id
))
.unwrap(),
)
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
for t in timelines.as_array().unwrap() {
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
tenant_timelines.push((tenant_id.clone(), timeline_id));
}
}
println!("tenant_timelines:\n{:?}", tenant_timelines);
let mut stats = Arc::new(Stats::default());
tokio::spawn({
let stats = Arc::clone(&stats);
async move {
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut tasks = Vec::new();
for (tenant_id, timeline_id) in tenant_timelines {
let stats = Arc::clone(&stats);
let t = tokio::spawn(timeline(
args,
client.clone(),
tenant_id,
timeline_id,
stats,
));
tasks.push(t);
}
for t in tasks {
t.await.unwrap();
}
}
fn timeline(
args: &'static Args,
http_client: Client<HttpConnector, hyper::Body>,
tenant_id: String,
timeline_id: String,
stats: Arc<Stats>,
) -> impl Future<Output = ()> + Send + Sync {
async move {
let mut resp = http_client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline/{}/keyspace",
args.ps_endpoint, tenant_id, timeline_id
))
.unwrap(),
)
.await
.unwrap();
if !resp.status().is_success() {
panic!("Failed to get keyspace: {resp:?}");
}
let body = hyper::body::to_bytes(resp).await.unwrap();
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap();
let ranges = keyspace["keys"]
.as_array()
.unwrap()
.iter()
.filter_map(|r| {
let r = r.as_array().unwrap();
assert_eq!(r.len(), 2);
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
// filter out non-relblock keys
match (is_rel_block_key(start.0), is_rel_block_key(end.0)) {
(true, true) => Some(KeyRange {
start: start.0.to_i128(),
end: end.0.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
}
(false, false) => None,
}
})
.collect::<Vec<_>>();
// weighted ranges
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let mut tasks = Vec::<JoinHandle<()>>::new();
let start = std::time::Instant::now();
for i in 0..args.num_tasks {
let ranges = ranges.clone();
let weights = weights.clone();
let client = http_client.clone();
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let task = tokio::spawn({
let stats = Arc::clone(&stats);
async move {
let mut client = getpage_client::Client::new(
args.pq_client_connstring.clone(),
tenant_id.clone(),
timeline_id.clone(),
)
.await
.unwrap();
for i in 0..args.num_requests {
match args.mode {
Mode::GetPage => {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key: i128 = rng.gen_range((r.start..r.end));
let key = repository::Key::from_i128(key);
// XXX filter these out when we iterate the keyspace
assert!(
is_rel_block_key(key),
"we filter non-relblock keys out above"
);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we just checked");
RelTagBlockNo { rel_tag, block_no }
};
client
.getpage(key, lsn)
.await
.with_context(|| {
format!(
"getpage for tenant {} timeline {}",
tenant_id, timeline_id
)
})
.unwrap();
}
Mode::NoOp => {
client.noop().await.unwrap();
}
}
stats.inc();
}
client.shutdown().await;
}
});
tasks.push(task);
}
for task in tasks {
task.await.unwrap();
}
}
}
mod getpage_client {
use std::pin::Pin;
use futures::SinkExt;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
};
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use crate::RelTagBlockNo;
pub(crate) struct Client {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
impl Client {
pub fn new(
connstring: String,
tenant_id: String,
timeline_id: String,
) -> impl std::future::Future<Output = anyhow::Result<Self>> + Send {
async move {
let (client, connection) =
tokio_postgres::connect(&connstring, postgres::NoTls).await?;
let conn_task_cancel = CancellationToken::new();
let conn_task = tokio::spawn({
let conn_task_cancel = conn_task_cancel.clone();
async move {
tokio::select! {
_ = conn_task_cancel.cancelled() => {
return;
}
res = connection => {
res.unwrap();
}
}
}
});
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.await?;
Ok(Self {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
})
}
}
pub async fn shutdown(mut self) {
let _ = self.cancel_on_client_drop.take();
self.conn_task.await.unwrap();
}
pub async fn getpage(
&mut self,
key: RelTagBlockNo,
lsn: Lsn,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamGetPageRequest {
latest: false,
rel: key.rel_tag,
blkno: key.block_no,
lsn,
};
let req = PagestreamFeMessage::GetPage(req);
match self.do_request(req).await? {
PagestreamBeMessage::GetPage(p) => Ok(p),
x => anyhow::bail!("Unexpected response: {:?}", x),
}
}
pub async fn noop(&mut self) -> anyhow::Result<()> {
match self.do_request(PagestreamFeMessage::NoOp).await? {
PagestreamBeMessage::NoOp => Ok(()),
x => anyhow::bail!("Unexpected response: {:?}", x),
}
}
async fn do_request(
&mut self,
req: PagestreamFeMessage,
) -> Result<PagestreamBeMessage, anyhow::Error> {
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next = next.unwrap().unwrap();
match PagestreamBeMessage::deserialize(next)? {
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
x => Ok(x),
}
}
}
}

View File

@@ -0,0 +1,109 @@
use anyhow::Context;
use bytes::Buf;
use clap::Parser;
use pageserver_api::models::{PagestreamBeMessage, PagestreamErrorResponse, PagestreamFeMessage};
use postgres_backend::{AuthType, PostgresBackend, QueryError};
use pq_proto::{BeMessage, FeMessage};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
#[derive(clap::Parser)]
struct Args {
bind: String,
}
#[tokio::main]
async fn main() {
let args = Args::parse();
let listener = tokio::net::TcpListener::bind(&args.bind).await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
handle_connection(socket).await.unwrap();
});
}
}
async fn handle_connection(socket: tokio::net::TcpStream) -> anyhow::Result<()> {
socket
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let peer_addr = socket.peer_addr().context("get peer address")?;
let socket = tokio_io_timeout::TimeoutReader::new(socket);
tokio::pin!(socket);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, AuthType::Trust, None)?;
let mut conn_handler = NoOpHandler;
let cancel = CancellationToken::new();
pgbackend
.run(&mut conn_handler, || {
let cancel = cancel.clone();
async move { cancel.cancelled().await }
})
.await?;
anyhow::Ok(())
}
struct NoOpHandler;
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for NoOpHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
fn startup(
&mut self,
_pgb: &mut PostgresBackend<IO>,
_sm: &pq_proto::FeStartupPacket,
) -> Result<(), QueryError> {
Ok(())
}
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
if !query_string.starts_with("pagestream ") {
return Err(QueryError::Other(anyhow::anyhow!("not a pagestream query")));
}
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
pgb.flush().await?;
loop {
let msg = pgb.read_message().await?;
let copy_data_bytes = match msg {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => return Ok(()),
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => return Ok(()), // client disconnected
};
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let response = match neon_fe_msg {
PagestreamFeMessage::NoOp => Ok(PagestreamBeMessage::NoOp),
x => Err(QueryError::Other(anyhow::anyhow!(
"this server only supports no-op: {x:?}"
))),
};
let response = response.unwrap_or_else(|e| {
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
}
}

View File

@@ -14,7 +14,7 @@ use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use pageserver::tenant::TenantSharedResources;
use remote_storage::GenericRemoteStorage;
use tokio::time::Instant;
use tracing::*;
@@ -408,7 +408,7 @@ fn start_pageserver(
// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
@@ -418,7 +418,6 @@ fn start_pageserver(
order,
shutdown_pageserver.clone(),
))?;
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx;
@@ -524,18 +523,6 @@ fn start_pageserver(
}
});
let secondary_controller = if let Some(remote_storage) = &remote_storage {
secondary::spawn_tasks(
conf,
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
)
} else {
secondary::null_controller()
};
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -547,7 +534,6 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
}
@@ -560,13 +546,11 @@ fn start_pageserver(
let router_state = Arc::new(
http::routes::State::new(
conf,
tenant_manager,
http_auth.clone(),
remote_storage.clone(),
broker_client.clone(),
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
)
.context("Failed to initialize router state")?,
);

View File

@@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker(
continue;
}
if let Ok(tenant) = mgr::get_tenant(tenant_id, true) {
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
// TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
// We can put in some prioritization for consumption metrics.
// Same for the loop that fetches computed metrics.

View File

@@ -206,6 +206,7 @@ pub(super) async fn collect_all_metrics(
None
} else {
crate::tenant::mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
}

View File

@@ -1,2 +0,0 @@
Checking pageserver v0.1.0 (/home/neon/neon/pageserver)
Finished dev [optimized + debuginfo] target(s) in 7.62s

View File

@@ -48,26 +48,19 @@ use std::{
};
use anyhow::Context;
use camino::Utf8Path;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::{
completion,
id::{TenantId, TenantTimelineId},
};
use utils::{id::TimelineId, serde_percent::Percent};
use utils::completion;
use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
mgr::TenantManager,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer},
Timeline,
},
tenant::{self, storage_layer::PersistentLayer, timeline::EvictionError, Timeline},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -90,7 +83,6 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
tenant_manager: Arc<TenantManager>,
background_jobs_barrier: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
@@ -116,7 +108,8 @@ pub fn launch_disk_usage_global_eviction_task(
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, &storage, tenant_manager, cancel).await;
disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel)
.await;
Ok(())
},
);
@@ -128,8 +121,8 @@ pub fn launch_disk_usage_global_eviction_task(
async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
_storage: &GenericRemoteStorage,
tenant_manager: Arc<TenantManager>,
storage: GenericRemoteStorage,
tenants_dir: &Utf8Path,
cancel: CancellationToken,
) {
scopeguard::defer! {
@@ -152,9 +145,14 @@ async fn disk_usage_eviction_task(
let start = Instant::now();
async {
let res =
disk_usage_eviction_task_iteration(state, task_config, &tenant_manager, &cancel)
.await;
let res = disk_usage_eviction_task_iteration(
state,
task_config,
&storage,
tenants_dir,
&cancel,
)
.await;
match res {
Ok(()) => {}
@@ -185,14 +183,13 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
tenant_manager: &Arc<TenantManager>,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let tenants_dir = tenant_manager.get_conf().tenants_path();
let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config)
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res =
disk_usage_eviction_task_iteration_impl(state, usage_pre, tenant_manager, cancel).await;
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
@@ -202,7 +199,7 @@ async fn disk_usage_eviction_task_iteration(
}
IterationOutcome::Finished(outcome) => {
// Verify with statvfs whether we made any real progress
let after = filesystem_level_usage::get(&tenants_dir, task_config)
let after = filesystem_level_usage::get(tenants_dir, task_config)
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
.context("get filesystem-level disk usage after evictions")?;
@@ -276,8 +273,8 @@ struct LayerCount {
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
storage: &GenericRemoteStorage,
usage_pre: U,
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
// use tokio's mutex to get a Sync guard (instead of std::sync::Mutex)
@@ -297,7 +294,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
"running disk usage based eviction due to pressure"
);
let candidates = match collect_eviction_candidates(tenant_manager, cancel).await? {
let candidates = match collect_eviction_candidates(cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
@@ -333,16 +330,9 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
// Evictions for attached tenants, batched by timeline
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
// Evictions for secondary locations, batched by tenant
let mut secondary_by_tenant: HashMap<TenantId, Vec<(TimelineId, Layer)>> = HashMap::new();
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
let mut max_batch_size = 0;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
if !usage_planned.has_pressure() {
debug!(
@@ -359,26 +349,10 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
// FIXME: batching makes no sense anymore because of no layermap locking, should just spawn
// tasks to evict all seen layers until we have evicted enough
match candidate.source {
EvictionCandidateSource::Attached(timeline) => {
let batch = batched.entry(TimelineKey(timeline)).or_default();
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
}
}
EvictionCandidateSource::Secondary(ttid) => {
let batch = secondary_by_tenant.entry(ttid.tenant_id).or_default();
batch.push((ttid.timeline_id, candidate.layer));
}
}
batched
.entry(TimelineKey(candidate.timeline))
.or_default()
.push(candidate.layer);
}
let usage_planned = match warned {
@@ -393,116 +367,71 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
};
debug!(?usage_planned, "usage planned");
// phase2 (secondary tenants): evict victims batched by tenant
for (tenant_id, timeline_layers) in secondary_by_tenant {
// Q: Why do we go via TenantManager again rather than just deleting files, or keeping
// an Arc ref to the secondary state?
// A: It's because a given tenant's local storage **belongs** to whoever is currently
// live in the TenantManager. We must avoid a race where we might plan an eviction
// for secondary, and then execute it when the tenant is actually in an attached state.
tenant_manager
.evict_tenant_layers(&tenant_id, timeline_layers)
.instrument(tracing::info_span!("evict_batch", %tenant_id))
.await;
}
// phase2 (attached tenants): evict victims batched by timeline
let mut js = tokio::task::JoinSet::new();
// ratelimit to 1k files or any higher max batch size
let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size)));
// phase2: evict victims batched by timeline
// After the loop, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
for (timeline, batch) in batched {
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
let batch_size =
u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning");
// I dislike naming of `available_permits` but it means current total amount of permits
// because permits can be added
assert!(batch_size as usize <= limit.available_permits());
let batch_size = batch.len();
debug!(%timeline_id, "evicting batch for timeline");
let evict = {
let limit = limit.clone();
let cancel = cancel.clone();
async move {
let mut evicted_bytes = 0;
let mut evictions_failed = LayerCount::default();
async {
let results = timeline.evict_layers(storage, &batch, cancel.clone()).await;
let Ok(_permit) = limit.acquire_many_owned(batch_size).await else {
// semaphore closing means cancelled
return (evicted_bytes, evictions_failed);
};
let results = timeline.evict_layers(&batch, &cancel).await;
match results {
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
evicted_bytes += file_size;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
}
match results {
Err(e) => {
warn!("failed to evict batch: {:#}", e);
}
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
usage_assumed.add_available_bytes(file_size);
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
}
Some(Err(EvictionError::FileNotFound)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_)
| e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(%layer, "failed to evict layer: {e}");
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Some(Err(EvictionError::MetadataInconsistency(detail))) => {
warn!(%layer, "failed to evict layer: {detail}");
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
return;
}
}
}
Err(e) => {
warn!("failed to evict batch: {:#}", e);
}
}
(evicted_bytes, evictions_failed)
}
}
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size));
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size))
.await;
js.spawn(evict);
// spwaning multiple thousands of these is essentially blocking, so give already spawned a
// chance of making progress
tokio::task::yield_now().await;
}
let join_all = async move {
// After the evictions, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
while let Some(res) = js.join_next().await {
match res {
Ok((evicted_bytes, failed)) => {
usage_assumed.add_available_bytes(evicted_bytes);
evictions_failed.file_sizes += failed.file_sizes;
evictions_failed.count += failed.count;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
}
(usage_assumed, evictions_failed)
};
let (usage_assumed, evictions_failed) = tokio::select! {
tuple = join_all => { tuple },
_ = cancel.cancelled() => {
// close the semaphore to stop any pending acquires
limit.close();
if cancel.is_cancelled() {
return Ok(IterationOutcome::Cancelled);
}
};
}
Ok(IterationOutcome::Finished(IterationOutcomeFinished {
before: usage_pre,
@@ -514,19 +443,10 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
}))
}
// An eviction candidate might originate from either an attached tenant
// with a [`Tenant`] and [`Timeline`] object, or from a secondary tenant
// location. These differ in how we will execute the eviction.
#[derive(Clone)]
enum EvictionCandidateSource {
Attached(Arc<Timeline>),
Secondary(TenantTimelineId),
}
#[derive(Clone)]
struct EvictionCandidate {
source: EvictionCandidateSource,
layer: Layer,
timeline: Arc<Timeline>,
layer: Arc<dyn PersistentLayer>,
last_activity_ts: SystemTime,
}
@@ -575,18 +495,27 @@ enum EvictionCandidates {
/// after exhauting the `Above` partition.
/// So, we did not respect each tenant's min_resident_size.
async fn collect_eviction_candidates(
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<EvictionCandidates> {
// get a snapshot of the list of tenants
let tenants = tenant::mgr::list_tenants()
.await
.context("get list of tenants")?;
let mut candidates = Vec::new();
let tenants = tenant_manager.get_attached_tenants();
for tenant in tenants {
for (tenant_id, _state) in &tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
debug!("failed to get tenant: {e:#}");
continue;
}
};
// collect layers from all timelines in this tenant
//
@@ -649,7 +578,7 @@ async fn collect_eviction_candidates(
for (timeline, layer_info) in tenant_candidates.into_iter() {
let file_size = layer_info.file_size();
let candidate = EvictionCandidate {
source: EvictionCandidateSource::Attached(timeline),
timeline,
last_activity_ts: layer_info.last_activity_ts,
layer: layer_info.layer,
};
@@ -663,43 +592,6 @@ async fn collect_eviction_candidates(
}
}
// FIXME: this is a long loop over all secondary locations. At the least, respect
// cancellation here, but really we need to break up the loop. We could extract the
// Arc<SecondaryTenant>s and iterate over them with some tokio yields in there. Ideally
// though we should just reduce the total amount of work: our eviction goals do not require
// listing absolutely every layer in every tenant: we could sample this.
tenant_manager.foreach_secondary_tenants(
|tenant_id: &TenantId, state: &Arc<SecondaryTenant>| {
let mut tenant_candidates = Vec::new();
for (timeline_id, layer_info) in state.get_layers_for_eviction() {
debug!(tenant_id=%tenant_id, timeline_id=%timeline_id, "timeline resident layers (secondary) count: {}", layer_info.resident_layers.len());
tenant_candidates.extend(
layer_info.resident_layers
.into_iter()
.map(|layer_infos| (timeline_id, layer_infos)),
);
}
tenant_candidates
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
candidates.extend(tenant_candidates.into_iter().map(|(timeline_id, candidate)| {
(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
EvictionCandidate {
source: EvictionCandidateSource::Secondary(TenantTimelineId { tenant_id: *tenant_id, timeline_id}),
last_activity_ts: candidate.last_activity_ts,
layer: candidate.layer,
}
)
}));
},
);
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
candidates

View File

@@ -4,7 +4,6 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
@@ -37,10 +36,8 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::Timeline;
@@ -66,27 +63,22 @@ use super::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
}
impl State {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
@@ -94,14 +86,12 @@ impl State {
.collect::<Vec<_>>();
Ok(Self {
conf,
tenant_manager,
auth,
allowlist_routes,
remote_storage,
broker_client,
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
})
}
@@ -157,60 +147,28 @@ impl From<PageReconstructError> for ApiError {
impl From<TenantMapInsertError> for ApiError {
fn from(tmie: TenantMapInsertError) -> ApiError {
match tmie {
TenantMapInsertError::SlotError(e) => e.into(),
TenantMapInsertError::SlotUpsertError(e) => e.into(),
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::ResourceUnavailable(format!("{tmie}").into())
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
}
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
}
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}
impl From<TenantSlotError> for ApiError {
fn from(e: TenantSlotError) -> ApiError {
use TenantSlotError::*;
match e {
NotFound(tenant_id) => {
ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
}
e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
e @ Conflict(_) => ApiError::Conflict(format!("{e}")),
InProgress => {
ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
}
MapState(e) => e.into(),
}
}
}
impl From<TenantSlotUpsertError> for ApiError {
fn from(e: TenantSlotUpsertError) -> ApiError {
use TenantSlotUpsertError::*;
match e {
InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
MapState(e) => e.into(),
}
}
}
impl From<TenantMapError> for ApiError {
fn from(e: TenantMapError) -> ApiError {
use TenantMapError::*;
match e {
StillInitializing | ShuttingDown => {
ApiError::ResourceUnavailable(format!("{e}").into())
}
}
}
}
impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
TenantStateError::SlotError(e) => e.into(),
TenantStateError::SlotUpsertError(e) => e.into(),
TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
}
}
}
@@ -285,9 +243,6 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
Get(g) => ApiError::from(g),
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
Timeline(t) => ApiError::from(t),
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
SlotError(e) => e.into(),
SlotUpsertError(e) => e.into(),
Other(o) => ApiError::InternalServerError(o),
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
}
@@ -414,7 +369,7 @@ async fn timeline_create_handler(
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true)?;
let tenant = mgr::get_tenant(tenant_id, true).await?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -461,7 +416,7 @@ async fn timeline_list_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true)?;
let tenant = mgr::get_tenant(tenant_id, true).await?;
let timelines = tenant.list_timelines();
let mut response_data = Vec::with_capacity(timelines.len());
@@ -500,7 +455,7 @@ async fn timeline_detail_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline_info = async {
let tenant = mgr::get_tenant(tenant_id, true)?;
let tenant = mgr::get_tenant(tenant_id, true).await?;
let timeline = tenant
.get_timeline(timeline_id, false)
@@ -726,6 +681,45 @@ async fn tenant_ignore_handler(
json_response(StatusCode::OK, ())
}
async fn tenant_duplicate_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let src_tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let new_tenant_id = request_data.new_tenant_id;
check_permission(&request, None)?;
let _timer = STORAGE_TIME_GLOBAL
.get_metric_with_label_values(&[StorageTimeOperation::DuplicateTenant.into()])
.expect("bug")
.start_timer();
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
let generation = get_request_generation(state, request_data.generation)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
mgr::duplicate_tenant(
state.conf,
tenant_conf,
src_tenant_id,
new_tenant_id,
generation,
state.tenant_resources(),
&ctx,
)
.instrument(info_span!("tenant_duplicate", %src_tenant_id, tenant_id = %new_tenant_id))
.await?;
json_response(StatusCode::CREATED, TenantCreateResponse(new_tenant_id))
}
async fn tenant_list_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -758,7 +752,7 @@ async fn tenant_status(
check_permission(&request, Some(tenant_id))?;
let tenant_info = async {
let tenant = mgr::get_tenant(tenant_id, false)?;
let tenant = mgr::get_tenant(tenant_id, false).await?;
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
@@ -821,7 +815,7 @@ async fn tenant_size_handler(
let headers = request.headers();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true)?;
let tenant = mgr::get_tenant(tenant_id, true).await?;
// this can be long operation
let inputs = tenant
@@ -1080,7 +1074,7 @@ async fn get_tenant_config_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, false)?;
let tenant = mgr::get_tenant(tenant_id, false).await?;
let response = HashMap::from([
(
@@ -1124,9 +1118,6 @@ async fn put_tenant_location_config_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
@@ -1142,7 +1133,7 @@ async fn put_tenant_location_config_handler(
.await
{
match e {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
TenantStateError::NotFound(_) => {
// This API is idempotent: a NotFound on a detach is fine.
}
_ => return Err(e.into()),
@@ -1154,14 +1145,20 @@ async fn put_tenant_location_config_handler(
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
state
.tenant_manager
.upsert_location(tenant_id, location_conf, flush, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}
@@ -1174,6 +1171,7 @@ async fn handle_tenant_break(
let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned()).await;
@@ -1246,7 +1244,7 @@ async fn timeline_compact_handler(
timeline
.compact(&cancel, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
@@ -1271,7 +1269,7 @@ async fn timeline_checkpoint_handler(
timeline
.compact(&cancel, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
@@ -1478,7 +1476,7 @@ async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {
let tenant = mgr::get_tenant(tenant_id, true)?;
let tenant = mgr::get_tenant(tenant_id, true).await?;
tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))
@@ -1541,18 +1539,17 @@ async fn disk_usage_eviction_run(
let state = get_state(&r);
if state.remote_storage.as_ref().is_none() {
let Some(storage) = state.remote_storage.clone() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run eviction iteration"
)));
}
};
let eviction_state = state.disk_usage_eviction_state.clone();
let state = state.disk_usage_eviction_state.clone();
let cancel = CancellationToken::new();
let child_cancel = cancel.clone();
let _g = cancel.drop_guard();
let tenant_manager = state.tenant_manager.clone();
crate::task_mgr::spawn(
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
@@ -1563,9 +1560,9 @@ async fn disk_usage_eviction_run(
false,
async move {
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&eviction_state,
&state,
&storage,
usage,
&tenant_manager,
&child_cancel,
)
.await;
@@ -1583,36 +1580,6 @@ async fn disk_usage_eviction_run(
json_response(StatusCode::OK, response)
}
async fn secondary_download_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
state
.secondary_controller
.download_tenant(tenant_id)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn secondary_upload_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
state
.secondary_controller
.upload_tenant(tenant_id)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -1800,6 +1767,9 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/ignore", |r| {
api_handler(r, tenant_ignore_handler)
})
.post("/v1/tenant/:tenant_id/duplicate", |r| {
api_handler(r, tenant_duplicate_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_detail_handler)
})
@@ -1849,16 +1819,6 @@ pub fn make_router(
.put("/v1/deletion_queue/flush", |r| {
api_handler(r, deletion_queue_flush)
})
.post("/v1/secondary/:tenant_id/upload", |r| {
testing_api_handler("force heatmap upload", r, secondary_upload_handler)
})
.post("/v1/secondary/:tenant_id/download", |r| {
testing_api_handler(
"force secondary layer download",
r,
secondary_download_handler,
)
})
.put("/v1/tenant/:tenant_id/break", |r| {
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
})

View File

@@ -51,6 +51,9 @@ pub enum StorageTimeOperation {
#[strum(serialize = "create tenant")]
CreateTenant,
#[strum(serialize = "duplicate tenant")]
DuplicateTenant,
}
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
@@ -757,6 +760,7 @@ pub enum SmgrQueryType {
GetRelSize,
GetPageAtLsn,
GetDbSize,
NoOp,
}
#[derive(Debug)]
@@ -1404,7 +1408,7 @@ impl TimelineMetrics {
crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.add(sz);
}
pub(crate) fn resident_physical_size_get(&self) -> u64 {
pub fn resident_physical_size_get(&self) -> u64 {
self.resident_physical_size_gauge.get()
}
}

View File

@@ -488,6 +488,11 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::NoOp => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::NoOp);
let span = tracing::info_span!("no_op");
(Ok(PagestreamBeMessage::NoOp), span)
}
};
let response = response.unwrap_or_else(|e| {
@@ -1314,7 +1319,7 @@ async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false) {
let tenant = match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {

View File

@@ -1202,8 +1202,7 @@ impl<'a> DatadirModification<'a> {
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
Ok(buf) => AuxFilesDirectory::des(&buf)?,
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
warn!("Failed to get info about AUX files: {}", e);
AuxFilesDirectory {
files: HashMap::new(),
}
@@ -1695,6 +1694,7 @@ const AUX_FILES_KEY: Key = Key {
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
Ok(match key.field1 {
0x00 => (
@@ -1710,7 +1710,8 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
})
}
fn is_rel_block_key(key: Key) -> bool {
/// See [[key_to_rel_block]].
pub fn is_rel_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}

View File

@@ -257,12 +257,6 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
/// See [`crate::tenant::secondary`].
SecondaryDownloads,
/// See [`crate::tenant::secondary`].
SecondaryUploads,
// Initial logical size calculation
InitialLogicalSizeCalculation,

View File

@@ -63,7 +63,6 @@ use self::timeline::TimelineResources;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::metrics::TENANT_ACTIVATION;
@@ -131,7 +130,6 @@ pub mod storage_layer;
pub mod config;
pub mod delete;
pub mod mgr;
pub mod secondary;
pub mod tasks;
pub mod upload_queue;
@@ -140,7 +138,9 @@ pub(crate) mod timeline;
pub mod size;
pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
// re-export for use in remote_timeline_client.rs
pub use crate::tenant::metadata::save_metadata;
@@ -194,45 +194,6 @@ struct TimelinePreload {
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
}
/// To include the HeatmapWriter in tenant shutdown, we provide a hook
/// for it to publish a barrier when upload is going on. We will take
/// this and wait on it during shutdown, ensuring that there is no
/// upload going on once shutdown() returns.
pub struct HeatmapHook {
// Mutually exclude shutdown and any in-flight uploads
//
// If this is None, we are shutting down
in_progress: Option<Arc<tokio::sync::Mutex<()>>>,
}
impl Default for HeatmapHook {
fn default() -> Self {
Self {
in_progress: Some(Arc::default()),
}
}
}
impl HeatmapHook {
pub(crate) fn enter(&self) -> Option<tokio::sync::OwnedMutexGuard<()>> {
self.in_progress.as_ref().map(|l| {
l.clone()
.try_lock_owned()
// expect: shutdown cannot have started yet or in_progress would have been None,
// so we expect that only one HeatmapWriter may take this lock at once.
// Depends on the invariant that HeatmapWriter is the only thing that calls
// enter(), and that it will never try and do uploads concurrently for the same
// tenant.
.expect("Tried to double-lock HeatmapHook")
})
}
/// Returns a lock that the caller should wait on before proceeding with shutdown
fn shutdown(&mut self) -> Arc<tokio::sync::Mutex<()>> {
self.in_progress.take().expect("Called shutdown twice")
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -284,16 +245,6 @@ pub struct Tenant {
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
pub(crate) heatmap_hook: Mutex<HeatmapHook>,
pub(crate) cancel: CancellationToken,
}
impl std::fmt::Debug for Tenant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.tenant_id, self.current_state())
}
}
pub(crate) enum WalRedoManager {
@@ -574,7 +525,7 @@ impl Tenant {
tenant_id: TenantId,
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
expect_marker: AttachMarkerMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -941,7 +892,7 @@ impl Tenant {
attached_conf: AttachedTenantConf,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Arc<Tenant> {
span::debug_assert_current_span_has_tenant_id();
@@ -1873,11 +1824,6 @@ impl Tenant {
pitr: Duration,
ctx: &RequestContext,
) -> anyhow::Result<GcResult> {
// Don't start doing work during shutdown
if let TenantState::Stopping { .. } = self.current_state() {
return Ok(GcResult::default());
}
// there is a global allowed_error for this
anyhow::ensure!(
self.is_active(),
@@ -1906,12 +1852,6 @@ impl Tenant {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Don't start doing work during shutdown
if let TenantState::Stopping { .. } = self.current_state() {
return Ok(());
}
// We should only be called once the tenant has activated.
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
@@ -2074,8 +2014,6 @@ impl Tenant {
// It's mesed up.
// we just ignore the failure to stop
tracing::debug!("shutting down...");
self.cancel.cancel();
match self.set_stopping(shutdown_progress, false, false).await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
@@ -2083,7 +2021,6 @@ impl Tenant {
}
Err(SetStoppingError::AlreadyStopping(other)) => {
// give caller the option to wait for this this shutdown
info!("Tenant::shutdown: AlreadyStopping");
return Err(other);
}
};
@@ -2097,7 +2034,6 @@ impl Tenant {
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
})
};
tracing::debug!("shutdown waiting for timelines...");
while let Some(res) = js.join_next().await {
match res {
Ok(()) => {}
@@ -2107,23 +2043,12 @@ impl Tenant {
}
}
let heatmap_hook_lock = {
let mut hook = self.heatmap_hook.lock().unwrap();
hook.shutdown()
};
tracing::debug!("shutdown waiting heatmap uploads...");
// Take & drop lock to ensure any heatmap upload is complete.
drop(heatmap_hook_lock.lock().await);
tracing::debug!("shutdown waiting for tasks...");
// shutdown all tenant and timeline tasks: gc, compaction, page service
// No new tasks will be started for this tenant because it's in `Stopping` state.
//
// this will additionally shutdown and await all timeline tasks.
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
tracing::debug!("shutdown complete");
Ok(())
}
@@ -2373,9 +2298,6 @@ where
}
impl Tenant {
pub fn get_tenant_id(&self) -> TenantId {
self.tenant_id
}
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
}
@@ -2622,8 +2544,6 @@ impl Tenant {
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
heatmap_hook: Mutex::default(),
cancel: CancellationToken::new(),
}
}
@@ -3433,30 +3353,6 @@ impl Tenant {
pub fn cached_synthetic_size(&self) -> u64 {
self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
}
/// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
///
/// This function can take a long time: callers should wrap it in a timeout if calling
/// from an external API handler.
pub async fn flush_remote(&self) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for (timeline_id, timeline) in timelines {
tracing::info!(%timeline_id, "Flushing...");
timeline.freeze_and_flush().await?;
tracing::info!(%timeline_id, "Waiting for uploads...");
if let Some(client) = &timeline.remote_client {
client.wait_completion().await?;
}
}
match self.deletion_queue_client.flush_execute().await {
Ok(_) => {}
Err(DeletionQueueError::ShuttingDown) => {}
}
Ok(())
}
}
fn remove_timeline_and_uninit_mark(
@@ -4432,7 +4328,6 @@ mod tests {
#[tokio::test]
async fn delta_layer_dumping() -> anyhow::Result<()> {
use storage_layer::AsLayerDesc;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -4440,18 +4335,16 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map
.layer_map()
.get_level0_deltas()?
.into_iter()
.map(|desc| layer_map.get_from_desc(&desc))
.collect::<Vec<_>>();
let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
assert!(!level0_deltas.is_empty());
for delta in level0_deltas {
let delta = layer_map.get_from_desc(&delta);
// Ensure we are dumping a delta layer here
assert!(delta.layer_desc().is_delta);
let delta = delta.downcast_delta_layer().unwrap();
delta.dump(false, &ctx).await.unwrap();
delta.dump(true, &ctx).await.unwrap();
}
@@ -4486,7 +4379,7 @@ mod tests {
metadata_bytes[8] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
let err = harness.try_load(&ctx).await.expect_err("should fail");
let err = harness.try_load(&ctx).await.err().expect("should fail");
// get all the stack with all .context, not only the last one
let message = format!("{err:#}");
let expected = "failed to load metadata";

View File

@@ -21,7 +21,7 @@ use crate::{
};
use super::{
mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap},
mgr::{GetTenantError, TenantsMap},
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
span,
timeline::delete::DeleteTimelineFlow,
@@ -35,21 +35,12 @@ pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
#[error("Tenant not attached")]
NotAttached,
#[error("Invalid state {0}. Expected Active or Broken")]
InvalidState(TenantState),
#[error("Tenant deletion is already in progress")]
AlreadyInProgress,
#[error("Tenant map slot error {0}")]
SlotError(#[from] TenantSlotError),
#[error("Tenant map slot upsert error {0}")]
SlotUpsertError(#[from] TenantSlotUpsertError),
#[error("Timeline {0}")]
Timeline(#[from] DeleteTimelineError),
@@ -310,12 +301,12 @@ impl DeleteTenantFlow {
pub(crate) async fn run(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(), DeleteTenantError> {
span::debug_assert_current_span_has_tenant_id();
let mut guard = Self::prepare(&tenant).await?;
let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?;
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
tenant.set_broken(format!("{e:#}")).await;
@@ -420,7 +411,7 @@ impl DeleteTenantFlow {
guard: DeletionGuard,
tenant: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -457,7 +448,7 @@ impl DeleteTenantFlow {
pub(crate) async fn resume_from_attach(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -483,8 +474,15 @@ impl DeleteTenantFlow {
}
async fn prepare(
tenant: &Arc<Tenant>,
) -> Result<tokio::sync::OwnedMutexGuard<Self>, DeleteTenantError> {
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(Arc<Tenant>, tokio::sync::OwnedMutexGuard<Self>), DeleteTenantError> {
let m = tenants.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
// so at least for now allow deletions only for active tenants. TODO recheck
// Broken and Stopping is needed for retries.
@@ -518,14 +516,14 @@ impl DeleteTenantFlow {
)));
}
Ok(guard)
Ok((Arc::clone(tenant), guard))
}
fn schedule_background(
guard: OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) {
let tenant_id = tenant.tenant_id;
@@ -558,7 +556,7 @@ impl DeleteTenantFlow {
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant: &Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
@@ -606,7 +604,7 @@ impl DeleteTenantFlow {
.await
.context("cleanup_remaining_fs_traces")?;
let mut locked = tenants.write().unwrap();
let mut locked = tenants.write().await;
if locked.remove(&tenant.tenant_id).is_none() {
warn!("Tenant got removed from tenants map during deletion");
};

View File

@@ -639,10 +639,147 @@ impl LayerMap {
}
println!("historic_layers:");
for desc in self.iter_historic_layers() {
desc.dump();
for layer in self.iter_historic_layers() {
layer.dump(verbose, ctx)?;
}
println!("End dump LayerMap");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::LayerMap;
use crate::tenant::storage_layer::LayerFileName;
use std::str::FromStr;
use std::sync::Arc;
mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{AsLayerDesc, PersistentLayerDesc},
timeline::layer_manager::LayerFileManager,
};
use super::*;
struct LayerObject(PersistentLayerDesc);
impl AsLayerDesc for LayerObject {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.0
}
}
impl LayerObject {
fn new(desc: PersistentLayerDesc) -> Self {
LayerObject(desc)
}
}
type TestLayerFileManager = LayerFileManager<LayerObject>;
#[test]
fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
}
#[test]
fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
}
#[test]
fn for_image() {
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
}
#[test]
fn replacing_missing_l0_is_notfound() {
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
// however only happen for precondition failures.
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = PersistentLayerDesc::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(LayerObject::new(layer.clone()));
let new_version = Arc::new(LayerObject::new(layer));
// after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let mut mapping = TestLayerFileManager::new();
mapping
.replace_and_verify(not_found, new_version)
.unwrap_err();
}
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
let name = LayerFileName::from_str(layer_name).unwrap();
let skeleton = PersistentLayerDesc::from(name);
let remote = Arc::new(LayerObject::new(skeleton.clone()));
let downloaded = Arc::new(LayerObject::new(skeleton));
let mut map = LayerMap::default();
let mut mapping = LayerFileManager::new();
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update()
.insert_historic(remote.layer_desc().clone());
mapping.insert(remote.clone());
assert_eq!(
count_layer_in(&map, remote.layer_desc()),
expected_in_counts
);
mapping
.replace_and_verify(remote, downloaded.clone())
.expect("name derived attributes are the same");
assert_eq!(
count_layer_in(&map, downloaded.layer_desc()),
expected_in_counts
);
map.batch_update().remove_historic(downloaded.layer_desc());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
}
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
let historic = map
.iter_historic_layers()
.filter(|x| x.key() == layer.key())
.count();
let l0s = map
.get_level0_deltas()
.expect("why does this return a result");
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
(historic, l0)
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -57,7 +57,8 @@ pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
fsync_in_thread_pool(paths)
}
/// Parallel fsync asynchronously.
/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
/// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
const MAX_CONCURRENT_FSYNC: usize = 64;
let mut next = paths.iter().peekable();

View File

@@ -167,6 +167,8 @@
//! - download their remote [`IndexPart`]s
//! - create `Timeline` struct and a `RemoteTimelineClient`
//! - initialize the client's upload queue with its `IndexPart`
//! - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
//! for layers that are referenced by `IndexPart` but not present locally
//! - schedule uploads for layers that are only present locally.
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
//! the local filesystem, write the remote metadata to the local filesystem
@@ -202,14 +204,15 @@
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
pub(crate) mod download;
mod download;
pub mod index;
mod upload;
use anyhow::Context;
use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use utils::backoff::{
@@ -234,7 +237,7 @@ use crate::metrics::{
};
use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::upload_queue::Delete;
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
@@ -252,13 +255,10 @@ use utils::id::{TenantId, TimelineId};
use self::index::IndexPart;
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
use super::storage_layer::LayerFileName;
use super::upload_queue::SetDeletedFlagProgress;
use super::Generation;
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
pub(crate) use index::LayerFileMetadata;
// Occasional network issues and such can cause remote operations to fail, and
// that's expected. If a download fails, we log it at info-level, and retry.
// But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
@@ -629,12 +629,13 @@ impl RemoteTimelineClient {
///
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
layer_file_name: &LayerFileName,
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_layer_file_upload0(upload_queue, layer);
self.schedule_layer_file_upload0(upload_queue, layer_file_name, layer_metadata);
self.launch_queued_tasks(upload_queue);
Ok(())
}
@@ -642,19 +643,18 @@ impl RemoteTimelineClient {
fn schedule_layer_file_upload0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
layer: ResidentLayer,
layer_file_name: &LayerFileName,
layer_metadata: &LayerFileMetadata,
) {
let metadata = layer.metadata();
upload_queue
.latest_files
.insert(layer.layer_desc().filename(), metadata.clone());
.insert(layer_file_name.clone(), layer_metadata.clone());
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
info!("scheduled layer file upload {layer}");
let op = UploadOp::UploadLayer(layer, metadata);
let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file upload {layer_file_name}");
}
/// Launch a delete operation in the background.
@@ -667,13 +667,13 @@ impl RemoteTimelineClient {
/// successfully.
pub fn schedule_layer_file_deletion(
self: &Arc<Self>,
names: &[LayerFileName],
names: Vec<LayerFileName>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let with_generations =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, &names);
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
@@ -687,17 +687,17 @@ impl RemoteTimelineClient {
///
/// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
/// is invoked on them.
pub(crate) fn schedule_gc_update(self: &Arc<Self>, gc_layers: &[Layer]) -> anyhow::Result<()> {
#[allow(unused)] // will be used by PR#4938
pub(crate) fn schedule_unlinking_of_layers_from_index_part(
self: &Arc<Self>,
names: Vec<LayerFileName>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
// just forget the return value; after uploading the next index_part.json, we can consider
// the layer files as "dangling". this is fine, at worst case we create work for the
// scrubber.
let names = gc_layers.iter().map(|x| x.layer_desc().filename());
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
// the layer files as "dangling". this is fine however.
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, &names);
self.launch_queued_tasks(upload_queue);
@@ -706,28 +706,26 @@ impl RemoteTimelineClient {
/// Update the remote index file, removing the to-be-deleted files from the index,
/// allowing scheduling of actual deletions later.
fn schedule_unlinking_of_layers_from_index_part0<I>(
fn schedule_unlinking_of_layers_from_index_part0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
names: I,
) -> Vec<(LayerFileName, Generation)>
where
I: IntoIterator<Item = LayerFileName>,
{
names: &[LayerFileName],
) -> Vec<(LayerFileName, Generation)> {
// Deleting layers doesn't affect the values stored in TimelineMetadata,
// so we don't need update it. Just serialize it.
let metadata = upload_queue.latest_metadata.clone();
// Decorate our list of names with each name's generation, dropping
// names that are unexpectedly missing from our metadata.
// makes that are unexpectedly missing from our metadata.
let with_generations: Vec<_> = names
.into_iter()
.iter()
.filter_map(|name| {
let meta = upload_queue.latest_files.remove(&name);
// Remove from latest_files, learning the file's remote generation in the process
let meta = upload_queue.latest_files.remove(name);
if let Some(meta) = meta {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
Some((name, meta.generation))
Some((name.to_owned(), meta.generation))
} else {
// This can only happen if we forgot to to schedule the file upload
// before scheduling the delete. Log it because it is a rare/strange
@@ -750,7 +748,8 @@ impl RemoteTimelineClient {
}
/// Schedules deletion for layer files which have previously been unlinked from the
/// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
/// `index_part.json` with [`Self::schedule_unlinking_of_layers_from_index_part`].
#[allow(unused)] // will be used by Layer::drop in PR#4938
pub(crate) fn schedule_deletion_of_unlinked(
self: &Arc<Self>,
layers: Vec<(LayerFileName, Generation)>,
@@ -785,20 +784,18 @@ impl RemoteTimelineClient {
/// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
pub(crate) fn schedule_compaction_update(
self: &Arc<Self>,
compacted_from: &[Layer],
compacted_to: &[ResidentLayer],
compacted_from: &[LayerFileName],
compacted_to: &[(LayerFileName, LayerFileMetadata)],
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
for layer in compacted_to {
self.schedule_layer_file_upload0(upload_queue, layer.clone());
for (name, m) in compacted_to {
self.schedule_layer_file_upload0(upload_queue, name, m);
}
let names = compacted_from.iter().map(|x| x.layer_desc().filename());
let with_generations =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, compacted_from);
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
self.launch_queued_tasks(upload_queue);
@@ -1173,12 +1170,16 @@ impl RemoteTimelineClient {
}
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
let path = layer.local_path();
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
let path = self
.conf
.timeline_path(&self.tenant_id, &self.timeline_id)
.join(layer_file_name.file_name());
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
path,
&path,
layer_metadata,
self.generation,
)
@@ -1496,21 +1497,11 @@ impl RemoteTimelineClient {
}
}
pub(crate) fn get_layers_metadata(
pub(crate) fn get_layer_metadata(
&self,
layers: Vec<LayerFileName>,
) -> anyhow::Result<Vec<Option<LayerFileMetadata>>> {
let q = self.upload_queue.lock().unwrap();
let q = match &*q {
UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", q.as_str())
}
UploadQueue::Initialized(inner) => inner,
};
let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned());
Ok(decorated.collect())
name: &LayerFileName,
) -> anyhow::Result<Option<LayerFileMetadata>> {
self.upload_queue.lock().unwrap().get_layer_metadata(name)
}
}
@@ -1552,13 +1543,6 @@ pub fn remote_index_path(
.expect("Failed to construct path")
}
pub const HEATMAP_BASENAME: &str = "heatmap";
pub fn remote_heatmap_path(tenant_id: &TenantId) -> RemotePath {
RemotePath::from_string(&format!("tenants/{tenant_id}/{HEATMAP_BASENAME}-v01"))
.expect("Failed to construct path")
}
/// Given the key of an index, parse out the generation part of the name
pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
let file_name = match path.get_path().file_name() {
@@ -1606,7 +1590,6 @@ mod tests {
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::Layer,
Generation, Tenant, Timeline,
},
DEFAULT_PG_VERSION,
@@ -1775,29 +1758,32 @@ mod tests {
let generation = harness.generation;
// Create a couple of dummy files, schedule upload for them
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
let content_1 = dummy_contents("foo");
let content_2 = dummy_contents("bar");
let content_3 = dummy_contents("baz");
let layers = [
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
]
.into_iter()
.map(|(name, contents): (LayerFileName, Vec<u8>)| {
std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
Layer::for_resident(
harness.conf,
&timeline,
name,
LayerFileMetadata::new(contents.len() as u64, generation),
)
}).collect::<Vec<_>>();
for (filename, content) in [
(&layer_file_name_1, &content_1),
(&layer_file_name_2, &content_2),
(&layer_file_name_3, &content_3),
] {
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
}
client
.schedule_layer_file_upload(layers[0].clone())
.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64, generation),
)
.unwrap();
client
.schedule_layer_file_upload(layers[1].clone())
.schedule_layer_file_upload(
&layer_file_name_2,
&LayerFileMetadata::new(content_2.len() as u64, generation),
)
.unwrap();
// Check that they are started immediately, not queued
@@ -1851,42 +1837,38 @@ mod tests {
.collect(),
&[
&initial_layer.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
],
);
assert_eq!(index_part.metadata, metadata);
// Schedule upload and then a deletion. Check that the deletion is queued
client
.schedule_layer_file_upload(layers[2].clone())
.schedule_layer_file_upload(
&layer_file_name_3,
&LayerFileMetadata::new(content_3.len() as u64, generation),
)
.unwrap();
// this is no longer consistent with how deletion works with Layer::drop, but in this test
// keep using schedule_layer_file_deletion because we don't have a way to wait for the
// spawn_blocking started by the drop.
client
.schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
.schedule_layer_file_deletion([layer_file_name_1.clone()].to_vec())
.unwrap();
{
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
// Deletion schedules upload of the index file, and the file deletion itself
assert_eq!(upload_queue.queued_operations.len(), 2);
assert_eq!(upload_queue.inprogress_tasks.len(), 1);
assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
assert_eq!(upload_queue.num_inprogress_deletions, 0);
assert_eq!(
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
0
);
assert!(upload_queue.queued_operations.len() == 2);
assert!(upload_queue.inprogress_tasks.len() == 1);
assert!(upload_queue.num_inprogress_layer_uploads == 1);
assert!(upload_queue.num_inprogress_deletions == 0);
assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
}
assert_remote_files(
&[
&initial_layer.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
"index_part.json",
],
&remote_timeline_dir,
@@ -1900,8 +1882,8 @@ mod tests {
assert_remote_files(
&[
&initial_layer.file_name(),
&layers[1].layer_desc().filename().file_name(),
&layers[2].layer_desc().filename().file_name(),
&layer_file_name_2.file_name(),
&layer_file_name_3.file_name(),
"index_part.json",
],
&remote_timeline_dir,
@@ -1930,13 +1912,6 @@ mod tests {
)
.unwrap();
let layer_file_1 = Layer::for_resident(
harness.conf,
&timeline,
layer_file_name_1.clone(),
LayerFileMetadata::new(content_1.len() as u64, harness.generation),
);
#[derive(Debug, PartialEq, Clone, Copy)]
struct BytesStartedFinished {
started: Option<usize>,
@@ -1972,7 +1947,10 @@ mod tests {
let actual_a = get_bytes_started_stopped();
client
.schedule_layer_file_upload(layer_file_1.clone())
.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64, harness.generation),
)
.unwrap();
let actual_b = get_bytes_started_stopped();

View File

@@ -72,8 +72,6 @@ pub(super) async fn upload_timeline_layer<'a>(
// upload. However, a nonexistent file can also be indicative of
// something worse, like when a file is scheduled for upload before
// it has been written to disk yet.
//
// This is tested against `test_compaction_delete_before_upload`
info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(());
}

View File

@@ -1,268 +0,0 @@
pub mod downloader;
pub mod heatmap;
pub mod heatmap_writer;
use std::{sync::Arc, time::SystemTime};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
};
use self::{
downloader::{downloader_task, SecondaryDetail},
heatmap_writer::heatmap_writer_task,
};
use super::{
mgr::TenantManager,
storage_layer::{AsLayerDesc, Layer},
timeline::DiskUsageEvictionInfo,
};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
enum DownloadCommand {
Download(TenantId),
}
enum UploadCommand {
Upload(TenantId),
}
struct CommandRequest<T> {
payload: T,
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
}
struct CommandResponse {
result: anyhow::Result<()>,
}
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
// we do for secondary tenant locations: where we are not serving clients or
// ingesting WAL, but we are maintaining a warm cache of layer files.
//
// This type is all about the _download_ path for secondary mode. The upload path
// runs while a regular attached `Tenant` exists.
//
// This structure coordinates TenantManager and SecondaryDownloader,
// so that the downloader can indicate which tenants it is currently
// operating on, and the manager can indicate when a particular
// secondary tenant should cancel any work in flight.
#[derive(Debug)]
pub(crate) struct SecondaryTenant {
/// Cancellation token indicates to SecondaryDownloader that it should stop doing
/// any work for this tenant at the next opportunity.
pub(crate) cancel: CancellationToken,
/// Lock must be held by SecondaryDownloader at any time that it might be operating
/// on the local filesystem directory for this tenant ID.
// Ordering: the TenantManager must set the cancellation token _before_
// taking the lock. The SecondaryDownloader must always check the cancellation
// token immediately _after_ taking the lock (and at appropriate intervals
// while holding it).
pub(crate) busy: Arc<tokio::sync::Mutex<()>>,
detail: std::sync::Mutex<SecondaryDetail>,
// TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads
}
impl SecondaryTenant {
pub(crate) fn new() -> Arc<Self> {
// TODO; consider whether we really need to Arc this
Arc::new(Self {
busy: Arc::new(tokio::sync::Mutex::new(())),
// todo: shall we make this a descendent of the
// main cancellation token, or is it sufficient that
// on shutdown we walk the tenants and fire their
// individual cancellations?
cancel: CancellationToken::new(),
detail: std::sync::Mutex::default(),
})
}
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
// Wait for any secondary downloader work to complete: once we
// acquire this lock, we are guaranteed that the secondary downloader
// won't touch the local filesystem again for this instance: it is safe
// to e.g. construct a `Tenant` for the same TenantId
drop(self.busy.lock().await);
}
pub(crate) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
self.detail.lock().unwrap().get_layers_for_eviction()
}
pub(crate) async fn evict_layers(
&self,
_guard: tokio::sync::OwnedMutexGuard<()>,
conf: &PageServerConf,
tenant_id: &TenantId,
layers: Vec<(TimelineId, Layer)>,
) {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
if self.cancel.is_cancelled() {
// Eviction is a no-op if shutdown() was already called.
tracing::info!(
"Dropping {} layer evictions, secondary tenant shutting down",
layers.len()
);
return;
}
let now = SystemTime::now();
for (timeline_id, layer) in layers {
let layer_name = layer.layer_desc().filename();
let path = conf
.timeline_path(tenant_id, &timeline_id)
.join(&layer_name.file_name());
// We tolerate ENOENT, because between planning eviction and executing
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
tokio::fs::remove_file(path)
.await
.or_else(fs_ext::ignore_not_found)
.expect("TODO: terminate process on local I/O errors");
// TODO: batch up updates instead of acquiring lock in inner loop
let mut detail = self.detail.lock().unwrap();
// If there is no timeline detail for what we just deleted, that indicates that
// the secondary downloader did some work (perhaps removing all)
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
timeline_detail.on_disk_layers.remove(&layer_name);
timeline_detail.evicted_at.insert(layer_name, now);
}
}
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
}
impl SecondaryController {
async fn dispatch<T>(
&self,
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
payload: T,
) -> anyhow::Result<()> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
queue
.send(CommandRequest {
payload,
response_tx,
})
.await
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
let response = response_rx
.await
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
response.result
}
pub async fn download_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.download_req_tx, DownloadCommand::Download(tenant_id))
.await
}
pub async fn upload_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_id))
.await
}
}
pub fn spawn_tasks(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> SecondaryController {
let mgr_clone = tenant_manager.clone();
let storage_clone = remote_storage.clone();
let cancel_clone = cancel.clone();
let bg_jobs_clone = background_jobs_can_start.clone();
let (download_req_tx, download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"secondary tenant downloads",
false,
async move {
downloader_task(
conf,
mgr_clone,
storage_clone,
download_req_rx,
bg_jobs_clone,
cancel_clone,
)
.await
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"heatmap uploads",
false,
async move {
heatmap_writer_task(
tenant_manager,
remote_storage,
upload_req_rx,
background_jobs_can_start,
cancel,
)
.await
},
);
SecondaryController {
download_req_tx,
upload_req_tx,
}
}
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
pub fn null_controller() -> SecondaryController {
let (download_req_tx, _download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, _upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
SecondaryController {
upload_req_tx,
download_req_tx,
}
}

View File

@@ -1,579 +0,0 @@
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use crate::{
config::PageServerConf,
tenant::{
remote_timeline_client::index::LayerFileMetadata,
secondary::CommandResponse,
storage_layer::{Layer, LayerFileName},
timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction},
},
METADATA_FILE_NAME,
};
use super::SecondaryTenant;
use crate::tenant::{
mgr::TenantManager,
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
use anyhow::Context;
use chrono::format::{DelayedFormat, StrftimeItems};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
use super::{
heatmap::{HeatMapTenant, HeatMapTimeline},
CommandRequest, DownloadCommand,
};
/// Interval between checking if any Secondary tenants have download work to do:
/// note that this is _not_ the frequency with which we actually freshen the tenants,
/// just the frequency with which we wake up to decide whether anyone needs freshening.
///
/// Making this somewhat infrequent reduces the load on mutexes inside TenantManager
/// and SecondaryTenant for reads when checking for work to do.
const DOWNLOAD_CHECK_INTERVAL: Duration = Duration::from_millis(10000);
/// For each tenant, how long must have passed since the last freshen_tenant call before
/// calling it again. This is approximately the time by which local data is allowed
/// to fall behind remote data.
///
/// TODO: this should be an upper bound, and tenants that are uploading regularly
/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second
/// should not wait a minute between freshens)
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
#[derive(Debug, Clone)]
pub(super) struct OnDiskState {
layer: Layer,
access_time: SystemTime,
}
impl OnDiskState {
fn new(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
name: LayerFileName,
metadata: LayerFileMetadata,
access_time: SystemTime,
) -> Self {
Self {
layer: Layer::for_secondary(conf, tenant_id, timeline_id, name, metadata),
access_time,
}
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct SecondaryDetailTimeline {
pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
/// We remember when layers were evicted, to prevent re-downloading them.
/// TODO: persist this, so that we don't try and re-download everything on restart.
pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
}
/// This state is written by the secondary downloader, it is opaque
/// to TenantManager
#[derive(Default, Debug)]
pub(super) struct SecondaryDetail {
freshened_at: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
/// Helper for logging SystemTime
fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
datetime.format("%d/%m/%Y %T")
}
impl SecondaryDetail {
pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
let mut result = Vec::new();
for (timeline_id, timeline_detail) in &self.timelines {
let layers: Vec<_> = timeline_detail
.on_disk_layers
.values()
.map(|ods| LocalLayerInfoForDiskUsageEviction {
layer: ods.layer.clone(),
last_activity_ts: ods.access_time,
})
.collect();
let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max();
result.push((
*timeline_id,
DiskUsageEvictionInfo {
resident_layers: layers,
max_layer_size,
},
))
}
result
}
}
/// Keep trying to do downloads until the cancellation token is fired. Remote storage
/// errors are handled internally: any error returned by this function is an unexpected
/// internal error of some kind.
pub(super) async fn downloader_task(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let downloader = SecondaryDownloader {
conf,
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
downloader.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(DOWNLOAD_CHECK_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = downloader.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct SecondaryDownloader {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
struct TenantJob {
tenant_id: TenantId,
secondary_state: Arc<SecondaryTenant>,
// This mutex guard conveys the right to write to the tenant's local directory: it must
// be taken before doing downloads, and TenantManager must ensure it has been released
// before it considers shutdown complete for the secondary state -- [`SecondaryDownloader`]
// will thereby never be racing with [`Tenant`] for access to local files.
_guard: tokio::sync::OwnedMutexGuard<()>,
}
impl SecondaryDownloader {
async fn iteration(&self) -> anyhow::Result<()> {
// Step 1: identify some tenants that we may work on
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
});
// Step 2: prioritized selection of next batch of tenants to freshen
let now = Instant::now();
let candidates = candidates.into_iter().filter(|c| {
let detail = c.secondary_state.detail.lock().unwrap();
match detail.freshened_at {
None => true, // Not yet freshened, therefore elegible to run
Some(t) => {
let since = now.duration_since(t);
since > DOWNLOAD_FRESHEN_INTERVAL
}
}
});
// TODO: don't just cut down the list, prioritize it to freshen the stalest tenants first
// TODO: bounded parallelism
// Step 3: spawn freshen_tenant tasks
for job in candidates {
if job.secondary_state.cancel.is_cancelled() {
continue;
}
async {
if let Err(e) = self.freshen_tenant(&job).await {
tracing::info!("Failed to freshen secondary content: {e:#}")
};
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
// take priority to run again.
let mut detail = job.secondary_state.detail.lock().unwrap();
detail.freshened_at = Some(Instant::now());
}
.instrument(tracing::info_span!(
"freshen_tenant",
tenant_id = %job.tenant_id
))
.await;
}
Ok(())
}
async fn handle_command(&self, command: DownloadCommand) -> anyhow::Result<()> {
match command {
DownloadCommand::Download(req_tenant_id) => {
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
tracing::info!("foreach_secondary: {tenant_id} ({req_tenant_id})");
if tenant_id == &req_tenant_id {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
}
});
let tenant_job = if candidates.len() != 1 {
anyhow::bail!("Tenant not found in secondary mode");
} else {
candidates.pop().unwrap()
};
self.freshen_tenant(&tenant_job).await
}
}
}
async fn download_heatmap(&self, tenant_id: &TenantId) -> anyhow::Result<HeatMapTenant> {
// TODO: make download conditional on ETag having changed since last download
let heatmap_path = remote_heatmap_path(tenant_id);
// TODO: wrap this download in a select! that checks self.cancel
let mut download = self.remote_storage.download(&heatmap_path).await?;
let mut heatmap_bytes = Vec::new();
let _size = tokio::io::copy(&mut download.download_stream, &mut heatmap_bytes)
.await
.with_context(|| format!("download heatmap {heatmap_path:?}"))?;
Ok(serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?)
}
async fn init_timeline_state(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
heatmap: &HeatMapTimeline,
) -> anyhow::Result<SecondaryDetailTimeline> {
let timeline_path = self.conf.timeline_path(tenant_id, timeline_id);
let mut detail = SecondaryDetailTimeline::default();
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
Ok(d) => d,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
tracing::info!("Creating timeline directory {timeline_path}");
tokio::fs::create_dir(&timeline_path).await?;
// No entries to report: drop out.
return Ok(detail);
} else {
return Err(e.into());
}
}
};
let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect();
while let Some(dentry) = dir.next_entry().await? {
let dentry_file_name = dentry.file_name();
let file_name = dentry_file_name.to_string_lossy();
let local_meta = dentry.metadata().await?;
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
if file_name == METADATA_FILE_NAME {
continue;
}
match LayerFileName::from_str(&file_name) {
Ok(name) => {
let remote_meta = heatmap_metadata.get(&name);
match remote_meta {
Some(remote_meta) => {
// TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
if local_meta.len() != remote_meta.metadata.file_size {
// This should not happen, because we do crashsafe write-then-rename when downloading
// layers, and layers in remote storage are immutable. Remove the local file because
// we cannot trust it.
tracing::warn!("Removing local layer {name} with unexpected local size {} != {}",
local_meta.len(), remote_meta.metadata.file_size);
} else {
// We expect the access time to be initialized immediately afterwards, when
// the latest heatmap is applied to the state.
detail.on_disk_layers.insert(
name.clone(),
OnDiskState::new(
self.conf,
tenant_id,
timeline_id,
name,
LayerFileMetadata::from(&remote_meta.metadata),
remote_meta.access_time,
),
);
}
}
None => {
// FIXME: consider some optimization when transitioning from attached to secondary: maybe
// wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
// we will end up deleting any layers which were created+uploaded more recently than the heatmap.
tracing::info!(
"Removing secondary local layer {} because it's absent in heatmap",
name
);
tokio::fs::remove_file(dentry.path()).await?;
}
}
}
Err(_) => {
// Ignore it.
tracing::warn!("Unexpected file in timeline directory: {file_name}");
}
}
}
Ok(detail)
}
async fn freshen_timeline(
&self,
job: &TenantJob,
timeline: HeatMapTimeline,
) -> anyhow::Result<()> {
let timeline_path = self
.conf
.timeline_path(&job.tenant_id, &timeline.timeline_id);
// Accumulate updates to the state
let mut touched = Vec::new();
// Clone a view of what layers already exist on disk
let timeline_state = job
.secondary_state
.detail
.lock()
.unwrap()
.timelines
.get(&timeline.timeline_id)
.cloned();
let timeline_state = match timeline_state {
Some(t) => t,
None => {
// We have no existing state: need to scan local disk for layers first.
self.init_timeline_state(&job.tenant_id, &timeline.timeline_id, &timeline)
.await?
}
};
let layers_in_heatmap = timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
let local_path = timeline_path.join(layer.to_string());
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.cancel.is_cancelled() {
return Ok(());
}
// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata)
|| on_disk.access_time != layer.access_time
{
// We already have this layer on disk. Update its access time.
tracing::trace!(
"Access time updated for layer {}: {} -> {}",
layer.name,
strftime(&on_disk.access_time),
strftime(&layer.access_time)
);
touched.push(layer);
}
continue;
}
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
// recently than it was evicted.
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
if &layer.access_time > evicted_at {
tracing::info!(
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
} else {
tracing::trace!(
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
continue;
}
}
match download_layer_file(
self.conf,
&self.remote_storage,
job.tenant_id,
timeline.timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
)
.await
{
Ok(downloaded_bytes) => {
if downloaded_bytes != layer.metadata.file_size {
let local_path = timeline_path.join(layer.name.to_string());
tracing::error!(
"Downloaded layer {} with unexpected size {} != {}",
layer.name,
downloaded_bytes,
layer.metadata.file_size
);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
touched.push(layer)
}
Err(e) => {
// No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect
// that on some future call to freshen the download will work.
// TODO: refine this behavior.
tracing::info!("Failed to download layer {}: {}", layer.name, e);
}
}
}
// Write updates to state to record layers we just downloaded or touched.
{
let mut detail = job.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
for t in touched {
use std::collections::hash_map::Entry;
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = t.access_time;
}
Entry::Vacant(e) => {
e.insert(OnDiskState::new(
self.conf,
&job.tenant_id,
&timeline.timeline_id,
t.name,
LayerFileMetadata::from(&t.metadata),
t.access_time,
));
}
}
}
}
Ok(())
}
async fn freshen_tenant(&self, job: &TenantJob) -> anyhow::Result<()> {
// Download the tenant's heatmap
let heatmap = self.download_heatmap(&job.tenant_id).await?;
// Download the layers in the heatmap
for timeline in heatmap.timelines {
if self.cancel.is_cancelled() {
return Ok(());
}
self.freshen_timeline(job, timeline).await?;
}
Ok(())
}
}

View File

@@ -1,57 +0,0 @@
use std::time::SystemTime;
use crate::tenant::{
remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::TimelineId;
#[derive(Serialize, Deserialize)]
pub(super) struct HeatMapTenant {
pub(super) timelines: Vec<HeatMapTimeline>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(super) name: LayerFileName,
pub(super) metadata: IndexLayerMetadata,
pub(super) access_time: SystemTime,
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}
impl HeatMapLayer {
pub(crate) fn new(
name: LayerFileName,
metadata: IndexLayerMetadata,
access_time: SystemTime,
) -> Self {
Self {
name,
metadata,
access_time,
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(super) timeline_id: TimelineId,
pub(super) layers: Vec<HeatMapLayer>,
}
impl HeatMapTimeline {
pub(crate) fn new(timeline_id: TimelineId, layers: Vec<HeatMapLayer>) -> Self {
Self {
timeline_id,
layers,
}
}
}

View File

@@ -1,207 +0,0 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::tenant::{
mgr::TenantManager, remote_timeline_client::remote_heatmap_path, secondary::CommandResponse,
Tenant,
};
use pageserver_api::models::TenantState;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{backoff, completion::Barrier};
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000);
pub(super) async fn heatmap_writer_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let writer = HeatmapWriter {
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
writer.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(HEATMAP_UPLOAD_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = writer.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct HeatmapWriter {
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
impl HeatmapWriter {
async fn iteration(&self) -> anyhow::Result<()> {
let tenants = self.tenant_manager.get_attached_tenants();
for tenant in tenants {
if self.cancel.is_cancelled() {
return Ok(());
}
if tenant.current_state() != TenantState::Active {
continue;
}
// TODO: add a mechanism to check whether the active layer set has
// changed since our last write
// TODO: add a minimum time between uploads
match self
.write_tenant(&tenant)
.instrument(tracing::info_span!(
"write_tenant",
tenant_id = %tenant.get_tenant_id()
))
.await
{
Ok(()) => {}
Err(e) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
tenant.get_tenant_id(),
)
}
}
}
Ok(())
}
async fn handle_command(&self, command: UploadCommand) -> anyhow::Result<()> {
match command {
UploadCommand::Upload(tenant_id) => {
let tenants = self.tenant_manager.get_attached_tenants();
let map = tenants
.iter()
.map(|t| (t.get_tenant_id(), t))
.collect::<HashMap<_, _>>();
match map.get(&tenant_id) {
Some(tenant) => self.write_tenant(tenant).await,
None => {
anyhow::bail!("Tenant is not attached");
}
}
}
}
}
async fn write_tenant(&self, tenant: &Arc<Tenant>) -> anyhow::Result<()> {
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
};
let timelines = tenant.timelines.lock().unwrap().clone();
let tenant_cancel = tenant.cancel.clone();
// Ensure that Tenant::shutdown waits for any upload in flight
let _guard = {
let hook = tenant.heatmap_hook.lock().unwrap();
match hook.enter() {
Some(g) => g,
None => {
// Tenant is shutting down
tracing::info!("Skipping, tenant is shutting down");
return Ok(());
}
}
};
for (timeline_id, timeline) in timelines {
let heatmap_timeline = timeline.generate_heatmap().await;
match heatmap_timeline {
None => {
tracing::debug!(
"Skipping heatmap upload because timeline {timeline_id} is not ready"
);
return Ok(());
}
Some(heatmap_timeline) => {
heatmap.timelines.push(heatmap_timeline);
}
}
}
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap)?;
let size = bytes.len();
let path = remote_heatmap_path(&tenant.get_tenant_id());
// Write the heatmap.
tracing::debug!("Uploading {size} byte heatmap to {path}");
if let Err(e) = backoff::retry(
|| async {
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone()));
let bytes = Box::new(bytes);
self.remote_storage
.upload_storage_object(bytes, size, &path)
.await
},
|_| false,
3,
u32::MAX,
"Uploading heatmap",
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
{
if tenant_cancel.is_cancelled() {
return Ok(());
} else {
return Err(e);
}
}
tracing::info!("Successfully uploading {size} byte heatmap to {path}");
Ok(())
}
}

View File

@@ -4,21 +4,26 @@ pub mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
mod layer;
mod layer_desc;
mod remote_layer;
use crate::config::PageServerConf;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Key;
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use camino::Utf8PathBuf;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::ops::Range;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
@@ -34,8 +39,7 @@ pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
pub use remote_layer::RemoteLayer;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
@@ -70,7 +74,7 @@ pub struct ValueReconstructState {
pub img: Option<(Lsn, Bytes)>,
}
/// Return value from [`Layer::get_value_reconstruct_data`]
/// Return value from Layer::get_page_reconstruct_data
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {
/// Got all the data needed to reconstruct the requested page
@@ -175,6 +179,26 @@ impl LayerAccessStats {
new
}
/// Creates a clone of `self` and records `new_status` in the clone.
///
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
///
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn clone_for_residence_change(
&self,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
new
}
/// Record a change in layer residency.
///
/// Recording the event must happen while holding the layer map lock to
@@ -297,12 +321,95 @@ impl LayerAccessStats {
}
}
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`](super::layer_map::LayerMap).
///
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
///
/// Return data needed to reconstruct given page at LSN.
///
/// It is up to the caller to collect more data from previous layer and
/// perform WAL redo, if necessary.
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call, or a struct with a cached older image of the page if one
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
}
/// Get a layer descriptor from a layer.
pub trait AsLayerDesc {
/// Get the layer descriptor.
fn layer_desc(&self) -> &PersistentLayerDesc;
}
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
/// range of LSNs.
///
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
/// layers are used to ingest incoming WAL, and provide fast access to the
/// recent page versions. On-disk layers are stored as files on disk, and are
/// immutable. This trait presents the common functionality of in-memory and
/// on-disk layers.
///
/// Furthermore, there are two kinds of on-disk layers: delta and image layers.
/// A delta layer contains all modifications within a range of LSNs and keys.
/// An image layer is a snapshot of all the data in a key-range, at a single
/// LSN.
pub trait PersistentLayer: Layer + AsLayerDesc {
/// File name used for this layer, both in the pageserver's local filesystem
/// state as well as in the remote storage.
fn filename(&self) -> LayerFileName {
self.layer_desc().filename()
}
// Path to the layer file in the local filesystem.
// `None` for `RemoteLayer`.
fn local_path(&self) -> Option<Utf8PathBuf>;
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
pub mod tests {
use super::*;
@@ -340,6 +447,19 @@ pub mod tests {
}
}
/// Helper enum to hold a PageServerConf, or a path
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.
enum PathOrConf {
Path(Utf8PathBuf),
Conf(&'static PageServerConf),
}
/// Range wrapping newtype, which uses display to render Debug.
///
/// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.

View File

@@ -34,17 +34,18 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::fs::{self, File};
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
@@ -58,7 +59,10 @@ use utils::{
lsn::Lsn,
};
use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
use super::{
AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf,
PersistentLayerDesc,
};
///
/// Header stored in the beginning of the file
@@ -178,12 +182,20 @@ impl DeltaKey {
}
}
/// This is used only from `pagectl`. Within pageserver, all layers are
/// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
/// DeltaLayer is the in-memory data structure associated with an on-disk delta
/// file.
///
/// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
/// Otherwise the struct is just a placeholder for a file that exists on disk,
/// and it needs to be loaded before using it in queries.
pub struct DeltaLayer {
path: Utf8PathBuf,
path_or_conf: PathOrConf,
pub desc: PersistentLayerDesc,
access_stats: LayerAccessStats,
inner: OnceCell<Arc<DeltaLayerInner>>,
}
@@ -200,8 +212,6 @@ impl std::fmt::Debug for DeltaLayer {
}
}
/// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
/// file.
pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
@@ -211,6 +221,12 @@ pub struct DeltaLayerInner {
file: FileBlockReader,
}
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
fn as_ref(&self) -> &DeltaLayerInner {
self
}
}
impl std::fmt::Debug for DeltaLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeltaLayerInner")
@@ -220,6 +236,19 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for DeltaLayer {
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
.await
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for DeltaLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -233,9 +262,40 @@ impl AsLayerDesc for DeltaLayer {
}
}
impl PersistentLayer for DeltaLayer {
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
Some(self)
}
fn local_path(&self) -> Option<Utf8PathBuf> {
self.local_path()
}
fn delete_resident_layer_file(&self) -> Result<()> {
self.delete_resident_layer_file()
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
self.info(reset)
}
fn access_stats(&self) -> &LayerAccessStats {
self.access_stats()
}
}
impl DeltaLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
self.desc.dump();
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.desc.lsn_range.start,
self.desc.lsn_range.end,
self.desc.file_size,
);
if !verbose {
return Ok(());
@@ -243,7 +303,119 @@ impl DeltaLayer {
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
inner.dump(ctx).await
println!(
"index_start_blk: {}, root {}",
inner.index_start_blk, inner.index_root_blk
);
let file = &inner.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
tree_reader.dump().await?;
let keys = DeltaLayerInner::load_keys(&inner, ctx).await?;
// A subroutine to dump a single blob
async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)
}
};
Ok(desc)
}
for entry in keys {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(val, ctx).await {
Ok(desc) => desc,
Err(err) => {
let err: anyhow::Error = err;
format!("ERROR: {err}")
}
};
println!(" key {key} at {lsn}: {desc}");
}
Ok(())
}
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
ensure!(self.desc.key_range.contains(&key));
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
inner
.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
.await
}
pub(crate) fn local_path(&self) -> Option<Utf8PathBuf> {
Some(self.path())
}
pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.layer_desc().filename().file_name();
let lsn_range = self.layer_desc().lsn_range.clone();
let access_stats = self.access_stats.as_api_model(reset);
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: self.desc.file_size,
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: false,
access_stats,
}
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
fn path_for(
path_or_conf: &PathOrConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
fname: &DeltaFileName,
) -> Utf8PathBuf {
match path_or_conf {
PathOrConf::Path(path) => path.clone(),
PathOrConf::Conf(conf) => conf
.timeline_path(tenant_id, timeline_id)
.join(fname.to_string()),
}
}
fn temp_path_for(
@@ -289,21 +461,52 @@ impl DeltaLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, ctx).await?;
let summary = match &self.path_or_conf {
PathOrConf::Conf(_) => Some(Summary::from(self)),
PathOrConf::Path(_) => None,
};
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
let expected_filename = self.layer_desc().filename().file_name();
let loaded = DeltaLayerInner::load(&path, summary, ctx).await?;
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
}
Ok(Arc::new(loaded))
}
/// Create a DeltaLayer struct representing an existing file on disk.
pub fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
filename: &DeltaFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> DeltaLayer {
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
desc: PersistentLayerDesc::new_delta(
tenant_id,
timeline_id,
filename.key_range.clone(),
filename.lsn_range.clone(),
file_size,
),
access_stats,
inner: OnceCell::new(),
}
}
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
@@ -317,7 +520,7 @@ impl DeltaLayer {
.context("get file metadata to determine size")?;
Ok(DeltaLayer {
path: path.to_path_buf(),
path_or_conf: PathOrConf::Path(path.to_path_buf()),
desc: PersistentLayerDesc::new_delta(
summary.tenant_id,
summary.timeline_id,
@@ -330,9 +533,29 @@ impl DeltaLayer {
})
}
fn layer_name(&self) -> DeltaFileName {
self.desc.delta_file_name()
}
/// Path to the layer file in pageserver workdir.
fn path(&self) -> Utf8PathBuf {
self.path.clone()
pub fn path(&self) -> Utf8PathBuf {
Self::path_for(
&self.path_or_conf,
&self.desc.tenant_id,
&self.desc.timeline_id,
&self.layer_name(),
)
}
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.await
.context("load delta layer keys")?;
DeltaLayerInner::load_keys(inner, ctx)
.await
.context("Layer index is corrupted")
}
}
@@ -437,7 +660,7 @@ impl DeltaLayerWriterInner {
///
/// Finish writing the delta layer.
///
async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
async fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -494,21 +717,37 @@ impl DeltaLayerWriterInner {
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
let desc = PersistentLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
self.key_start..key_end,
self.lsn_range.clone(),
metadata.len(),
);
let layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(self.conf),
desc: PersistentLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
self.key_start..key_end,
self.lsn_range.clone(),
metadata.len(),
),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: OnceCell::new(),
};
// fsync the file
file.sync_all().await?;
// Rename the file to its final name
//
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let final_path = DeltaLayer::path_for(
&PathOrConf::Conf(self.conf),
&self.tenant_id,
&self.timeline_id,
&DeltaFileName {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range,
},
);
std::fs::rename(self.path, &final_path)?;
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
trace!("created delta layer {}", layer.local_path());
trace!("created delta layer {final_path}");
Ok(layer)
}
@@ -589,12 +828,8 @@ impl DeltaLayerWriter {
///
/// Finish writing the delta layer.
///
pub(crate) async fn finish(
mut self,
key_end: Key,
timeline: &Arc<Timeline>,
) -> anyhow::Result<ResidentLayer> {
self.inner.take().unwrap().finish(key_end, timeline).await
pub async fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
self.inner.take().unwrap().finish(key_end).await
}
}
@@ -609,6 +844,49 @@ impl Drop for DeltaLayerWriter {
}
}
impl DeltaLayer {
/// Assume the file at `path` is corrupt if this function returns with an error.
pub(crate) async fn rewrite_tenant_timeline(
path: &Utf8Path,
new_tenant: TenantId,
new_timeline: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let mut file = file.file;
if actual_summary.magic != DELTA_FILE_MAGIC {
bail!("File '{}' is not a delta layer", path);
}
let new_summary = Summary {
tenant_id: new_tenant,
timeline_id: new_timeline,
..actual_summary
};
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf)?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
anyhow::bail!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,
@@ -732,17 +1010,15 @@ impl DeltaLayerInner {
}
}
pub(super) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,
pub(super) async fn load_keys<'a, 'b, T: AsRef<DeltaLayerInner> + Clone>(
this: &'a T,
ctx: &'b RequestContext,
) -> Result<Vec<DeltaEntry<'a>>> {
let file = &self.file;
let dl = this.as_ref();
let file = &dl.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
let tree_reader =
DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
@@ -755,7 +1031,7 @@ impl DeltaLayerInner {
let val_ref = ValueRef {
blob_ref: BlobRef(value),
reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
Adapter(self),
Adapter(dl),
)),
};
let pos = BlobRef(value).pos();
@@ -782,61 +1058,10 @@ impl DeltaLayerInner {
if let Some(last) = all_keys.last_mut() {
// Last key occupies all space till end of value storage,
// which corresponds to beginning of the index
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
}
Ok(all_keys)
}
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
println!(
"index_start_blk: {}, root {}",
self.index_start_blk, self.index_root_blk
);
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
tree_reader.dump().await?;
let keys = self.load_keys(ctx).await?;
async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)
}
};
Ok(desc)
}
for entry in keys {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(val, ctx).await {
Ok(desc) => desc,
Err(err) => {
format!("ERROR: {err}")
}
};
println!(" key {key} at {lsn}: {desc}");
}
Ok(())
}
}
/// A set of data associated with a delta layer key and its value
@@ -876,9 +1101,3 @@ impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
self.0.as_ref().file.read_blk(blknum, ctx).await
}
}
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
fn as_ref(&self) -> &DeltaLayerInner {
self
}
}

View File

@@ -31,23 +31,21 @@ use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::fs::{self, File};
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tracing::*;
@@ -58,7 +56,7 @@ use utils::{
};
use super::filename::ImageFileName;
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLayerDesc};
///
/// Header stored in the beginning of the file
@@ -116,14 +114,22 @@ impl Summary {
}
}
/// This is used only from `pagectl`. Within pageserver, all layers are
/// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
/// ImageLayer is the in-memory data structure associated with an on-disk image
/// file.
///
/// We keep an ImageLayer in memory for each file, in the LayerMap. If a layer
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
/// Otherwise the struct is just a placeholder for a file that exists on disk,
/// and it needs to be loaded before using it in queries.
pub struct ImageLayer {
path: Utf8PathBuf,
path_or_conf: PathOrConf,
pub desc: PersistentLayerDesc,
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
pub lsn: Lsn,
access_stats: LayerAccessStats,
inner: OnceCell<ImageLayerInner>,
}
@@ -140,8 +146,6 @@ impl std::fmt::Debug for ImageLayer {
}
}
/// ImageLayer is the in-memory data structure associated with an on-disk image
/// file.
pub struct ImageLayerInner {
// values copied from summary
index_start_blk: u32,
@@ -162,11 +166,73 @@ impl std::fmt::Debug for ImageLayerInner {
}
}
impl ImageLayerInner {
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let file = &self.file;
#[async_trait::async_trait]
impl Layer for ImageLayer {
/// Look up given page in the file
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
.await
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for ImageLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl AsLayerDesc for ImageLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl PersistentLayer for ImageLayer {
fn local_path(&self) -> Option<Utf8PathBuf> {
self.local_path()
}
fn delete_resident_layer_file(&self) -> Result<()> {
self.delete_resident_layer_file()
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
self.info(reset)
}
fn access_stats(&self) -> &LayerAccessStats {
self.access_stats()
}
}
impl ImageLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.lsn,
self.desc.is_incremental(),
self.desc.file_size
);
if !verbose {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
let file = &inner.file;
let tree_reader =
DiskBtreeReader::<_, KEY_SIZE>::new(self.index_start_blk, self.index_root_blk, file);
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
tree_reader.dump().await?;
@@ -184,36 +250,69 @@ impl ImageLayerInner {
Ok(())
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for ImageLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
inner
.get_value_reconstruct_data(key, reconstruct_state, ctx)
.await
// FIXME: makes no sense to dump paths
.with_context(|| format!("read {}", self.path()))
}
}
impl AsLayerDesc for ImageLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
pub(crate) fn local_path(&self) -> Option<Utf8PathBuf> {
Some(self.path())
}
}
impl ImageLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
self.desc.dump();
if !verbose {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
inner.dump(ctx).await?;
pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.layer_desc().filename().file_name();
let lsn_start = self.layer_desc().image_layer_lsn();
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: self.desc.file_size,
lsn_start,
remote: false,
access_stats: self.access_stats.as_api_model(reset),
}
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
fn path_for(
path_or_conf: &PathOrConf,
timeline_id: TimelineId,
tenant_id: TenantId,
fname: &ImageFileName,
) -> Utf8PathBuf {
match path_or_conf {
PathOrConf::Path(path) => path.to_path_buf(),
PathOrConf::Conf(conf) => conf
.timeline_path(&tenant_id, &timeline_id)
.join(fname.to_string()),
}
}
fn temp_path_for(
conf: &PageServerConf,
timeline_id: TimelineId,
@@ -249,21 +348,54 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx).await?;
let expected_summary = match &self.path_or_conf {
PathOrConf::Conf(_) => Some(Summary::from(self)),
PathOrConf::Path(_) => None,
};
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
let expected_filename = self.layer_desc().filename().file_name();
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary, ctx)
.await?;
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
}
Ok(loaded)
}
/// Create an ImageLayer struct representing an existing file on disk
pub fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
filename: &ImageFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> ImageLayer {
ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
desc: PersistentLayerDesc::new_img(
tenant_id,
timeline_id,
filename.key_range.clone(),
filename.lsn,
file_size,
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: filename.lsn,
access_stats,
inner: OnceCell::new(),
}
}
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
@@ -275,7 +407,7 @@ impl ImageLayer {
.metadata()
.context("get file metadata to determine size")?;
Ok(ImageLayer {
path: path.to_path_buf(),
path_or_conf: PathOrConf::Path(path.to_path_buf()),
desc: PersistentLayerDesc::new_img(
summary.tenant_id,
summary.timeline_id,
@@ -289,8 +421,61 @@ impl ImageLayer {
})
}
fn path(&self) -> Utf8PathBuf {
self.path.clone()
fn layer_name(&self) -> ImageFileName {
self.desc.image_file_name()
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> Utf8PathBuf {
Self::path_for(
&self.path_or_conf,
self.desc.timeline_id,
self.desc.tenant_id,
&self.layer_name(),
)
}
}
impl ImageLayer {
/// Assume the file at `path` is corrupt if this function returns with an error.
pub(crate) async fn rewrite_tenant_timeline(
path: &Utf8Path,
new_tenant: TenantId,
new_timeline: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let mut file = file.file;
if actual_summary.magic != IMAGE_FILE_MAGIC {
bail!("File '{}' is not a delta layer", path);
}
let new_summary = Summary {
tenant_id: new_tenant,
timeline_id: new_timeline,
..actual_summary
};
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf)?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
anyhow::bail!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
@@ -462,7 +647,7 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
async fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
async fn finish(self) -> anyhow::Result<ImageLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -516,14 +701,33 @@ impl ImageLayerWriterInner {
// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
let layer = ImageLayer {
path_or_conf: PathOrConf::Conf(self.conf),
desc,
lsn: self.lsn,
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: OnceCell::new(),
};
// fsync the file
file.sync_all().await?;
// FIXME: why not carry the virtualfile here, it supports renaming?
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
// Rename the file to its final name
//
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let final_path = ImageLayer::path_for(
&PathOrConf::Conf(self.conf),
self.timeline_id,
self.tenant_id,
&ImageFileName {
key_range: self.key_range.clone(),
lsn: self.lsn,
},
);
std::fs::rename(self.path, final_path)?;
trace!("created image layer {}", layer.local_path());
trace!("created image layer {}", layer.path());
Ok(layer)
}
@@ -585,11 +789,8 @@ impl ImageLayerWriter {
///
/// Finish writing the image layer.
///
pub(crate) async fn finish(
mut self,
timeline: &Arc<Timeline>,
) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline).await
pub async fn finish(mut self) -> anyhow::Result<ImageLayer> {
self.inner.take().unwrap().finish().await
}
}

View File

@@ -10,12 +10,11 @@ use crate::repository::{Key, Value};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::walrecord;
use anyhow::{ensure, Result};
use pageserver_api::models::InMemoryLayerInfo;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::sync::OnceLock;
use tracing::*;
use utils::{
bin_ser::BeSer,
@@ -29,7 +28,7 @@ use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::RwLock;
use super::{DeltaLayerWriter, ResidentLayer};
use super::{DeltaLayer, DeltaLayerWriter, Layer};
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -208,6 +207,20 @@ impl InMemoryLayer {
}
}
#[async_trait::async_trait]
impl Layer for InMemoryLayer {
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
self.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
.await
}
}
impl std::fmt::Display for InMemoryLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let end_lsn = self.end_lsn_or_max();
@@ -216,13 +229,17 @@ impl std::fmt::Display for InMemoryLayer {
}
impl InMemoryLayer {
///
/// Get layer size.
///
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
}
///
/// Create a new, empty, in-memory layer
///
pub async fn create(
conf: &'static PageServerConf,
timeline_id: TimelineId,
@@ -314,11 +331,7 @@ impl InMemoryLayer {
/// Write this frozen in-memory layer to disk.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub(crate) async fn write_to_disk(
&self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<ResidentLayer> {
pub(crate) async fn write_to_disk(&self, ctx: &RequestContext) -> Result<DeltaLayer> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -363,8 +376,7 @@ impl InMemoryLayer {
}
}
// MAX is used here because we identify L0 layers by full key range
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
let delta_layer = delta_layer_writer.finish(Key::MAX).await?;
Ok(delta_layer)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,4 @@
use anyhow::Result;
use core::fmt::Display;
use std::ops::Range;
use utils::{
@@ -5,7 +6,7 @@ use utils::{
lsn::Lsn,
};
use crate::repository::Key;
use crate::{context::RequestContext, repository::Key};
use super::{DeltaFileName, ImageFileName, LayerFileName};
@@ -99,22 +100,6 @@ impl PersistentLayerDesc {
}
}
pub fn from_filename(
tenant_id: TenantId,
timeline_id: TimelineId,
filename: LayerFileName,
file_size: u64,
) -> Self {
match filename {
LayerFileName::Image(i) => {
Self::new_img(tenant_id, timeline_id, i.key_range, i.lsn, file_size)
}
LayerFileName::Delta(d) => {
Self::new_delta(tenant_id, timeline_id, d.key_range, d.lsn_range, file_size)
}
}
}
/// Get the LSN that the image layer covers.
pub fn image_layer_lsn(&self) -> Lsn {
assert!(!self.is_delta);
@@ -188,31 +173,21 @@ impl PersistentLayerDesc {
self.is_delta
}
pub fn dump(&self) {
if self.is_delta {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} is_incremental {} size {} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end,
self.is_incremental(),
self.file_size,
);
} else {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.image_layer_lsn(),
self.is_incremental(),
self.file_size
);
}
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end,
self.is_delta,
self.is_incremental(),
self.file_size,
);
Ok(())
}
pub fn file_size(&self) -> u64 {

View File

@@ -0,0 +1,216 @@
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
//! in remote storage.
//!
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::layer_manager::LayerManager;
use anyhow::{bail, Result};
use camino::Utf8PathBuf;
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
use std::sync::Arc;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use super::filename::{DeltaFileName, ImageFileName};
use super::{
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`DeltaLayer`].
///
/// RemoteLayer might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`.
///
/// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayer {
pub desc: PersistentLayerDesc,
pub layer_metadata: LayerFileMetadata,
access_stats: LayerAccessStats,
pub(crate) ongoing_download: Arc<tokio::sync::Semaphore>,
/// Has `LayerMap::replace` failed for this (true) or not (false).
///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed.
///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
/// a possible fast loop between `Timeline::get_reconstruct_data` and
/// `Timeline::download_remote_layer`, which also logs.
///
/// [`ongoing_download`]: Self::ongoing_download
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayer")
.field("file_name", &self.desc.filename())
.field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.desc.is_incremental())
.finish()
}
}
#[async_trait::async_trait]
impl Layer for RemoteLayer {
async fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
Err(anyhow::anyhow!("layer {self} needs to be downloaded"))
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl AsLayerDesc for RemoteLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl PersistentLayer for RemoteLayer {
fn local_path(&self) -> Option<Utf8PathBuf> {
None
}
fn delete_resident_layer_file(&self) -> Result<()> {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.layer_desc().filename().file_name();
let lsn_range = self.layer_desc().lsn_range.clone();
if self.desc.is_delta {
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: self.layer_metadata.file_size(),
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: true,
access_stats: self.access_stats.as_api_model(reset),
}
} else {
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: self.layer_metadata.file_size(),
lsn_start: lsn_range.start,
remote: true,
access_stats: self.access_stats.as_api_model(reset),
}
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl RemoteLayer {
pub fn new_img(
tenantid: TenantId,
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
desc: PersistentLayerDesc::new_img(
tenantid,
timelineid,
fname.key_range.clone(),
fname.lsn,
layer_metadata.file_size(),
),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
download_replacement_failure: std::sync::atomic::AtomicBool::default(),
access_stats,
}
}
pub fn new_delta(
tenantid: TenantId,
timelineid: TimelineId,
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
desc: PersistentLayerDesc::new_delta(
tenantid,
timelineid,
fname.key_range.clone(),
fname.lsn_range.clone(),
layer_metadata.file_size(),
),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
download_replacement_failure: std::sync::atomic::AtomicBool::default(),
access_stats,
}
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub(crate) fn create_downloaded_layer(
&self,
_layer_map_lock_held_witness: &LayerManager,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {
if self.desc.is_delta {
let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new(
conf,
self.desc.timeline_id,
self.desc.tenant_id,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
} else {
let fname = self.desc.image_file_name();
Arc::new(ImageLayer::new(
conf,
self.desc.timeline_id,
self.desc.tenant_id,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -29,6 +29,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
tasks::{BackgroundLoopKind, RateLimitError},
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
@@ -209,26 +210,15 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<_> = {
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = guard.get_from_desc(&hist_layer);
// guard against eviction while we inspect it; it might be that eviction_task and
// disk_usage_eviction_task both select the same layers to be evicted, and
// seemingly free up double the space. both succeeding is of no consequence.
let guard = match hist_layer.keep_resident().await {
Ok(Some(l)) => l,
Ok(None) => continue,
Err(e) => {
// these should not happen, but we cannot make them statically impossible right
// now.
tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
continue;
}
};
if hist_layer.is_remote_layer() {
continue;
}
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
@@ -259,7 +249,7 @@ impl Timeline {
}
};
if no_activity_for > p.threshold {
candidates.push(guard.drop_eviction_guard())
candidates.push(hist_layer)
}
}
candidates
@@ -278,7 +268,7 @@ impl Timeline {
};
let results = match self
.evict_layer_batch(remote_client, &candidates, cancel)
.evict_layer_batch(remote_client, &candidates[..], cancel.clone())
.await
{
Err(pre_err) => {
@@ -289,7 +279,7 @@ impl Timeline {
Ok(results) => results,
};
assert_eq!(results.len(), candidates.len());
for result in results {
for (l, result) in candidates.iter().zip(results) {
match result {
None => {
stats.skipped_for_shutdown += 1;
@@ -297,10 +287,24 @@ impl Timeline {
Some(Ok(())) => {
stats.evicted += 1;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
stats.not_evictable += 1;
}
Some(Err(EvictionError::FileNotFound)) => {
// compaction/gc removed the file while we were waiting on layer_removal_cs
stats.not_evictable += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(layer = %l, "failed to evict layer: {e}");
stats.not_evictable += 1;
}
Some(Err(EvictionError::MetadataInconsistency(detail))) => {
warn!(layer = %l, "failed to evict layer: {detail}");
stats.not_evictable += 1;
}
}
}
if stats.candidates == stats.not_evictable {
@@ -344,7 +348,20 @@ impl Timeline {
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
// imitated accesses that the winner made.
let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) {
//
// It is critical we are responsive to cancellation here. Otherwise, we deadlock with
// tenant deletion (holds TENANTS in read mode) any other task that attempts to
// acquire TENANTS in write mode before we here call get_tenant.
// See https://github.com/neondatabase/neon/issues/5284.
let res = tokio::select! {
_ = cancel.cancelled() => {
return ControlFlow::Break(());
}
res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => {
res
}
};
let tenant = match res {
Ok(t) => t,
Err(_) => {
return ControlFlow::Break(());

View File

@@ -12,16 +12,27 @@ use crate::{
tenant::{
layer_map::{BatchedUpdates, LayerMap},
storage_layer::{
AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
ResidentLayer,
AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer,
PersistentLayerDesc, PersistentLayerKey,
},
timeline::compare_arced_layers,
},
};
/// Provides semantic APIs to manipulate the layer map.
pub(crate) struct LayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager<Layer>,
layer_fmgr: LayerFileManager,
}
/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
/// scheduling deletes in remote client.
pub(crate) struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>);
impl ApplyGcResultGuard<'_> {
pub(crate) fn flush(self) {
self.0.flush();
}
}
impl LayerManager {
@@ -32,7 +43,7 @@ impl LayerManager {
}
}
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
self.layer_fmgr.get_from_desc(desc)
}
@@ -44,12 +55,21 @@ impl LayerManager {
&self.layer_map
}
/// Replace layers in the layer file manager, used in evictions and layer downloads.
pub(crate) fn replace_and_verify(
&mut self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
self.layer_fmgr.replace_and_verify(expected, new)
}
/// Called from `load_layer_map`. Initialize the layer manager with:
/// 1. all on-disk layers
/// 2. next open layer (with disk disk_consistent_lsn LSN)
pub(crate) fn initialize_local_layers(
&mut self,
on_disk_layers: Vec<Layer>,
on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
next_open_layer_at: Lsn,
) {
let mut updates = self.layer_map.batch_update();
@@ -144,19 +164,10 @@ impl LayerManager {
}
/// Add image layers to the layer map, called from `create_image_layers`.
pub(crate) fn track_new_image_layers(
&mut self,
image_layers: &[ResidentLayer],
metrics: &TimelineMetrics,
) {
pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
let mut updates = self.layer_map.batch_update();
for layer in image_layers {
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
// record these here instead of Layer::finish_creating because otherwise partial
// failure with create_image_layers would balloon up the physical size gauge. downside
// is that all layers need to be created before metrics are updated.
metrics.record_new_file_metrics(layer.layer_desc().file_size);
Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
}
updates.flush();
}
@@ -164,71 +175,76 @@ impl LayerManager {
/// Flush a frozen layer and add the written delta layer to the layer map.
pub(crate) fn finish_flush_l0_layer(
&mut self,
delta_layer: Option<&ResidentLayer>,
delta_layer: Option<DeltaLayer>,
frozen_layer_for_check: &Arc<InMemoryLayer>,
metrics: &TimelineMetrics,
) {
let inmem = self
.layer_map
.frozen_layers
.pop_front()
.expect("there must be a inmem layer to flush");
let l = self.layer_map.frozen_layers.pop_front();
let mut updates = self.layer_map.batch_update();
// Only one task may call this function at a time (for this
// timeline). If two tasks tried to flush the same frozen
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
if let Some(l) = delta_layer {
let mut updates = self.layer_map.batch_update();
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
updates.flush();
if let Some(delta_layer) = delta_layer {
Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
}
updates.flush();
}
/// Called when compaction is completed.
pub(crate) fn finish_compact_l0(
&mut self,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: &[Layer],
compact_to: &[ResidentLayer],
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: Vec<Arc<dyn PersistentLayer>>,
compact_to: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
) {
) -> Result<()> {
let mut updates = self.layer_map.batch_update();
for l in compact_to {
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
metrics.record_new_file_metrics(l.layer_desc().file_size);
Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}
for l in compact_from {
Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr);
// NB: the layer file identified by descriptor `l` is guaranteed to be present
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
// time, even though we dropped `Timeline::layers` inbetween.
Self::delete_historic_layer(
layer_removal_cs.clone(),
l,
&mut updates,
metrics,
&mut self.layer_fmgr,
)?;
}
updates.flush();
Ok(())
}
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
pub(crate) fn finish_gc_timeline(
&mut self,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
gc_layers: Vec<Layer>,
) {
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
gc_layers: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
) -> Result<ApplyGcResultGuard> {
let mut updates = self.layer_map.batch_update();
for doomed_layer in gc_layers {
Self::delete_historic_layer(
layer_removal_cs,
&doomed_layer,
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
metrics,
&mut self.layer_fmgr,
);
)?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
}
updates.flush()
Ok(ApplyGcResultGuard(updates))
}
/// Helper function to insert a layer into the layer map and file manager.
fn insert_historic_layer(
layer: Layer,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager<Layer>,
mapping: &mut LayerFileManager,
) {
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(layer);
@@ -238,12 +254,17 @@ impl LayerManager {
/// Remote storage is not affected by this operation.
fn delete_historic_layer(
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: &Layer,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager<Layer>,
) {
metrics: &TimelineMetrics,
mapping: &mut LayerFileManager,
) -> anyhow::Result<()> {
let desc = layer.layer_desc();
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
metrics.resident_physical_size_sub(desc.file_size);
}
// TODO Removing from the bottom of the layer map is expensive.
// Maybe instead discard all layer map historic versions that
@@ -252,18 +273,21 @@ impl LayerManager {
// map index without actually rebuilding the index.
updates.remove_historic(desc);
mapping.remove(layer);
layer.garbage_collect_on_drop();
Ok(())
}
pub(crate) fn contains(&self, layer: &Layer) -> bool {
pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
self.layer_fmgr.contains(layer)
}
}
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
pub(crate) struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
HashMap<PersistentLayerKey, Arc<T>>,
);
impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.0
@@ -273,14 +297,14 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
.clone()
}
pub(crate) fn insert(&mut self, layer: T) {
pub(crate) fn insert(&mut self, layer: Arc<T>) {
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
if present.is_some() && cfg!(debug_assertions) {
panic!("overwriting a layer: {:?}", layer.layer_desc())
}
}
pub(crate) fn contains(&self, layer: &T) -> bool {
pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
self.0.contains_key(&layer.layer_desc().key())
}
@@ -288,7 +312,7 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
Self(HashMap::new())
}
pub(crate) fn remove(&mut self, layer: &T) {
pub(crate) fn remove(&mut self, layer: Arc<T>) {
let present = self.0.remove(&layer.layer_desc().key());
if present.is_none() && cfg!(debug_assertions) {
panic!(
@@ -297,4 +321,39 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
)
}
}
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
let key = expected.layer_desc().key();
let other = new.layer_desc().key();
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"layermap-replace-notfound"
));
anyhow::ensure!(
key == other,
"expected and new layer have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = self.0.get_mut(&key) {
anyhow::ensure!(
compare_arced_layers(&expected, layer),
"another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!("layer was not found");
}
}
}

View File

@@ -1,5 +1,4 @@
use super::storage_layer::LayerFileName;
use super::storage_layer::ResidentLayer;
use super::Generation;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
@@ -204,6 +203,18 @@ impl UploadQueue {
UploadQueue::Stopped(stopped) => Ok(stopped),
}
}
pub(crate) fn get_layer_metadata(
&self,
name: &LayerFileName,
) -> anyhow::Result<Option<LayerFileMetadata>> {
match self {
UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Initialized(inner) => Ok(inner.latest_files.get(name).cloned()),
}
}
}
/// An in-progress upload or delete task.
@@ -226,7 +237,7 @@ pub(crate) struct Delete {
#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file
UploadLayer(ResidentLayer, LayerFileMetadata),
UploadLayer(LayerFileName, LayerFileMetadata),
/// Upload the metadata file
UploadMetadata(IndexPart, Lsn),
@@ -241,13 +252,13 @@ pub(crate) enum UploadOp {
impl std::fmt::Display for UploadOp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
UploadOp::UploadLayer(layer, metadata) => {
UploadOp::UploadLayer(path, metadata) => {
write!(
f,
"UploadLayer({}, size={:?}, gen={:?})",
layer,
path.file_name(),
metadata.file_size(),
metadata.generation
metadata.generation,
)
}
UploadOp::UploadMetadata(_, lsn) => {

View File

@@ -18,7 +18,8 @@ use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -110,7 +111,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -142,7 +143,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
slot_guard = slot.inner.write().await;
index = next;
break;
}
@@ -153,7 +154,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
@@ -208,6 +209,29 @@ impl CrashsafeOverwriteError {
}
}
/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
@@ -244,11 +268,9 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -331,22 +353,24 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
let open_files = get_open_files();
let mut handle_guard = {
@@ -356,27 +380,23 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
let mut handle = *self.handle.read().await;
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
let handle_guard = self.handle.write().await;
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -390,17 +410,10 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
// Open the physical file
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -408,7 +421,9 @@ impl VirtualFile {
*handle_guard = handle;
Ok(result)
return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
}
pub fn remove(self) {
@@ -423,11 +438,9 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -515,9 +528,9 @@ impl VirtualFile {
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
@@ -527,9 +540,9 @@ impl VirtualFile {
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
@@ -539,6 +552,18 @@ impl VirtualFile {
}
}
struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
@@ -571,20 +596,39 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
let handle = self.handle.get_mut();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
}
}

View File

@@ -0,0 +1,43 @@
# Usage from top of repo:
# poetry run python3 test_runner/duplicate_tenant.py b97965931096047b2d54958756baee7b 10
from queue import Queue
import sys
import threading
import requests
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import TenantId
initial_tenant = sys.argv[1]
ncopies = int(sys.argv[2])
numthreads = int(sys.argv[3])
# class DuckTypedNeonEnv:
# pass
# cli = NeonCli(DuckTypedNeonEnv())
q = Queue()
for i in range(0, ncopies):
q.put(i)
for i in range(0, numthreads):
q.put(None)
def create():
while True:
if q.get() == None:
break
new_tenant = TenantId.generate()
res = requests.post(
f"http://localhost:9898/v1/tenant/{initial_tenant}/duplicate",
json={"new_tenant_id": str(new_tenant)},
)
res.raise_for_status()
for i in range(0, numthreads):
threading.Thread(target=create).start()

View File

@@ -22,11 +22,6 @@ https://docs.pytest.org/en/6.2.x/logging.html
# log format is specified in pytest.ini file
LOGGING = {
"version": 1,
"formatters": {
"standard": {
"datefmt": "%m/%d/%Y %I:%M:%SZ %p %Z",
}
},
"loggers": {
"root": {"level": "INFO"},
"root.safekeeper_async": {"level": "INFO"}, # a lot of logs on DEBUG level

View File

@@ -1616,7 +1616,7 @@ class NeonPageserver(PgProtocol):
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
".*compaction_loop.*Compaction failed, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
@@ -1707,7 +1707,7 @@ class NeonPageserver(PgProtocol):
@property
def workdir(self) -> Path:
return self.env.repo_dir / f"pageserver_{self.id}"
return Path(os.path.join(self.env.repo_dir, f"pageserver_{self.id}"))
def assert_no_errors(self):
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
@@ -1773,16 +1773,6 @@ class NeonPageserver(PgProtocol):
client = self.http_client()
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
# This API is only for use when generations are enabled
assert self.env.attachment_service is not None
if config["mode"].startswith("Attached") and "generation" not in config:
config["generation"] = self.env.attachment_service.attach_hook(tenant_id, self.id)
client = self.http_client()
return client.tenant_location_conf(tenant_id, config, **kwargs)
def append_pageserver_param_overrides(
params_to_update: List[str],
@@ -2639,7 +2629,6 @@ class EndpointFactory:
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
pageserver_id: Optional[int] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2659,7 +2648,6 @@ class EndpointFactory:
lsn=lsn,
hot_standby=hot_standby,
config_lines=config_lines,
pageserver_id=pageserver_id,
)
def stop_all(self) -> "EndpointFactory":

View File

@@ -215,6 +215,25 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_duplicate(
self, src_tenant_id: TenantId, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{src_tenant_id}/duplicate",
json={
"new_tenant_id": str(new_tenant_id),
**(conf or {}),
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(
self,
tenant_id: TenantId,
@@ -247,23 +266,6 @@ class PageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_location_conf(
self, tenant_id: TenantId, location_conf=dict[str, Any], flush_ms=None
):
body = location_conf.copy()
body["tenant_id"] = str(tenant_id)
params = {}
if flush_ms is not None:
params["flush_ms"] = str(flush_ms)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config",
json=body,
params=params,
)
self.verbose_error(res)
def tenant_delete(self, tenant_id: TenantId):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)
@@ -667,14 +669,6 @@ class PageserverHttpClient(requests.Session):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)
def secondary_tenant_upload(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/upload")
self.verbose_error(res)
def secondary_tenant_download(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/download")
self.verbose_error(res)
def post_tracing_event(self, level: str, message: str):
res = self.post(
f"http://localhost:{self.port}/v1/tracing/event",

View File

@@ -249,7 +249,7 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
# this has been seen in the wild by tests with the below contradicting logging
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5322/6207777020/index.html#suites/3556ed71f2d69272a7014df6dcb02317/53b5c368b5a68865
# this seems like a mock_s3 issue
log.warning(
log.warn(
f"contrading ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
)
keys = 0
@@ -257,7 +257,7 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
# this has been seen in one case with mock_s3:
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
log.warning(
log.warn(
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
)

View File

@@ -1,133 +0,0 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
class Workload:
"""
This is not a general purpose load generator: it exists for storage tests that need to inject some
high level types of storage work via the postgres interface:
- layer writes (`write_rows`)
- work for compaction (`churn_rows`)
- reads, checking we get the right data (`validate`)
"""
def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
self.env = env
self.tenant_id = tenant_id
self.timeline_id = timeline_id
self.table = "foo"
self.expect_rows = 0
self.churn_cursor = 0
self.endpoints: dict[int, Endpoint] = {}
def endpoint(self, pageserver_id: int):
if pageserver_id not in self.endpoints:
self.endpoints[pageserver_id] = self.env.endpoints.create(
"main",
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id=f"ep-{pageserver_id}",
)
endpoint = self.endpoints[pageserver_id]
assert not endpoint.running
endpoint.start(pageserver_id=pageserver_id)
return endpoint
def init(self, pageserver_id: int):
with self.endpoint(pageserver_id) as endpoint:
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n, pageserver_id):
with self.endpoint(pageserver_id) as endpoint:
start = self.expect_rows
end = start + n - 1
self.expect_rows += n
dummy_value = "blah"
endpoint.safe_psql(
f"""
INSERT INTO {self.table} (id, val)
SELECT g, '{dummy_value}'
FROM generate_series({start}, {end}) g
"""
)
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def churn_rows(self, n, pageserver_id, upload=True):
assert self.expect_rows >= n
max_iters = 10
with self.endpoint(pageserver_id) as endpoint:
todo = n
i = 0
while todo > 0:
i += 1
if i > max_iters:
raise RuntimeError("oops")
start = self.churn_cursor % self.expect_rows
n_iter = min((self.expect_rows - start), todo)
todo -= n_iter
end = start + n_iter - 1
log.info(
f"start,end = {start},{end}, cursor={self.churn_cursor}, expect_rows={self.expect_rows}"
)
assert end < self.expect_rows
self.churn_cursor += n_iter
dummy_value = "blah"
endpoint.safe_psql_many(
[
f"""
INSERT INTO {self.table} (id, val)
SELECT g, '{dummy_value}'
FROM generate_series({start}, {end}) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
f"VACUUM {self.table}",
]
)
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = self.env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id)
wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
def validate(self, pageserver_id):
with self.endpoint(pageserver_id) as endpoint:
result = endpoint.safe_psql_many(
[
"select clear_buffer_cache()",
f"""
SELECT COUNT(*) FROM {self.table}
""",
]
)
log.info(f"validate({self.expect_rows}): {result}")
assert result == [[("",)], [(self.expect_rows,)]]

View File

@@ -19,7 +19,7 @@ def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
# eviction might be the first one after an attach to access the layers
env.pageserver.allowed_errors.append(
".*unexpectedly on-demand downloading remote layer .* for task kind Eviction"
".*unexpectedly on-demand downloading remote layer remote.* for task kind Eviction"
)
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
return env

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.extend(
[
".*layer loading failed:.*",
".*Failed to load delta layer.*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
@@ -87,7 +87,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# Second timeline will fail during basebackup, because the local layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline)
with pytest.raises(Exception, match="layer loading failed:") as err:
with pytest.raises(Exception, match="Failed to load delta layer") as err:
pg2.start()
log.info(
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"

View File

@@ -247,34 +247,34 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
ps_http.evict_all_layers(tenant_id, timeline_id)
def ensure_resident_and_remote_size_metrics():
log.info("ensure that all the layers are gone")
resident_layers = list(env.pageserver.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
# we have disabled all background loops, so, this should hold
assert len(resident_layers) == 0, "ensure that all the layers are gone"
assert len(resident_layers) == 0
info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", info)
log.info("ensure that resident_physical_size metric is zero")
resident_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_resident_physical_size"
)
assert (
resident_physical_size_metric == 0
), "ensure that resident_physical_size metric is zero"
assert resident_physical_size_metric == 0
log.info("ensure that resident_physical_size metric corresponds to layer map dump")
assert resident_physical_size_metric == sum(
layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote
), "ensure that resident_physical_size metric corresponds to layer map dump"
[layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote]
)
log.info("ensure that remote_physical_size metric matches layer map")
remote_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_remote_physical_size"
)
log.info("ensure that remote_physical_size metric corresponds to layer map dump")
assert remote_physical_size_metric == sum(
layer.layer_file_size or 0 for layer in info.historic_layers if layer.remote
), "ensure that remote_physical_size metric corresponds to layer map dump"
)
log.info("before runnning GC, ensure that remote_physical size is zero")
# leaving index_part.json upload from successful compaction out will show
# up here as a mismatch between remove_physical_size and summed up layermap
# size
ensure_resident_and_remote_size_metrics()
log.info("run GC")

View File

@@ -12,12 +12,13 @@ from fixtures.neon_fixtures import (
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.utils import (
assert_tenant_state,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
wait_until_tenant_state,
)
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.types import Lsn
@@ -383,7 +384,7 @@ def test_download_remote_layers_api(
env.pageserver.start(extra_env_vars={"FAILPOINTS": "remote-storage-download-pre-rename=return"})
env.pageserver.allowed_errors.extend(
[
".*download failed: downloading evicted layer file failed.*",
f".*download_all_remote_layers.*{tenant_id}.*{timeline_id}.*layer download failed.*remote-storage-download-pre-rename failpoint",
f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size",
]
)
@@ -636,5 +637,56 @@ def test_compaction_downloads_on_demand_with_image_creation(neon_env_builder: Ne
assert dict(kinds_after) == {"Delta": 4, "Image": 1}
def test_ondemand_download_failure_to_replace(neon_env_builder: NeonEnvBuilder):
"""
Make sure that we fail on being unable to replace a RemoteLayer instead of for example livelocking.
See: https://github.com/neondatabase/neon/issues/3533
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# disable gc and compaction via default tenant config because config is lost while detaching
# so that compaction will not be the one to download the layer but the http handler is
neon_env_builder.pageserver_config_override = (
"""tenant_config={gc_period = "0s", compaction_period = "0s"}"""
)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert timeline_id is not None
pageserver_http = env.pageserver.http_client()
# remove layers so that they will be redownloaded
pageserver_http.tenant_detach(tenant_id)
pageserver_http.tenant_attach(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
pageserver_http.configure_failpoints(("layermap-replace-notfound", "return"))
# requesting details with non-incremental size should trigger a download of the only layer
# this will need to be adjusted if an index for logical sizes is ever implemented
with pytest.raises(PageserverApiException):
# PageserverApiException is expected because of the failpoint (timeline_detail building does something)
# ReadTimeout can happen on our busy CI, but it should not, because there is no more busylooping
# but should it be added back, we would wait for 15s here.
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=15)
actual_message = ".* ERROR .*layermap-replace-notfound"
assert env.pageserver.log_contains(actual_message) is not None
env.pageserver.allowed_errors.append(actual_message)
env.pageserver.allowed_errors.append(
".* ERROR .*Error processing HTTP request: InternalServerError\\(get local timeline info"
)
# this might get to run and attempt on-demand, but not always
env.pageserver.allowed_errors.append(".* ERROR .*Task 'initial size calculation'")
# if the above returned, then we didn't have a livelock, and all is well
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))

View File

@@ -24,19 +24,12 @@ from fixtures.neon_fixtures import (
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_tenant_state,
list_prefix,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pageserver.utils import list_prefix
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import TenantId, TimelineId
from fixtures.utils import print_gc_result, wait_until
from fixtures.workload import Workload
# A tenant configuration that is convenient for generating uploads and deletions
# without a large amount of postgres traffic.
@@ -539,91 +532,3 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
read_all(env, tenant_id, timeline_id)
evict_all_layers(env, tenant_id, timeline_id)
read_all(env, tenant_id, timeline_id)
def test_multi_attach(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
pageservers = env.pageservers
http_clients = list([p.http_client() for p in pageservers])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# We will intentionally create situations where stale deletions happen from non-latest-generation
# nodes when the tenant is multiply-attached
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
# Initially, the tenant will be attached to the pageserver a (first is default in our test harness)
wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[0], tenant_id, "Active"))
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)
with pytest.raises(PageserverApiException):
http_clients[1].timeline_detail(tenant_id, timeline_id)
with pytest.raises(PageserverApiException):
http_clients[2].timeline_detail(tenant_id, timeline_id)
workload = Workload(env, tenant_id, timeline_id)
workload.init(pageservers[0].id)
workload.write_rows(1000, pageservers[0].id)
# Attach the tenant to the other two pageservers
pageservers[1].tenant_attach(env.initial_tenant)
pageservers[2].tenant_attach(env.initial_tenant)
wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[1], tenant_id, "Active"))
wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[2], tenant_id, "Active"))
# Now they all have it attached
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)
_detail = http_clients[1].timeline_detail(tenant_id, timeline_id)
_detail = http_clients[2].timeline_detail(tenant_id, timeline_id)
# The endpoint can use any pageserver to service its reads
for pageserver in pageservers:
workload.validate(pageserver.id)
# If we write some more data, all the nodes can see it, including stale ones
wrote_lsn = workload.write_rows(1000, pageservers[0].id)
for ps_http in http_clients:
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, wrote_lsn)
# ...and indeed endpoints can see it via any of the pageservers
for pageserver in pageservers:
workload.validate(pageserver.id)
# Prompt all the pageservers, including stale ones, to upload ingested layers to remote storage
for ps_http in http_clients:
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, wrote_lsn)
# Now, the contents of remote storage will be a set of layers from each pageserver, but with unique
# generation numbers
# TODO: validate remote storage contents
# Stop all pageservers
for ps in pageservers:
ps.stop()
# Returning to a normal healthy state: all pageservers will start, but only the one most
# recently attached via the control plane will re-attach on startup
for ps in pageservers:
ps.start()
with pytest.raises(PageserverApiException):
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)
with pytest.raises(PageserverApiException):
_detail = http_clients[1].timeline_detail(tenant_id, timeline_id)
_detail = http_clients[2].timeline_detail(tenant_id, timeline_id)
# All data we wrote while multi-attached remains readable
workload.validate(pageservers[2].id)

View File

@@ -1,468 +0,0 @@
import random
from pathlib import Path
from typing import Any, Dict, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.utils import assert_prefix_empty, tenant_delete_wait_completed
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
# A tenant configuration that is convenient for generating uploads and deletions
# without a large amount of postgres traffic.
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_target_size": f"{128 * 1024}",
"compaction_threshold": "1",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
}
def evict_random_layers(
rng: random.Random, pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId
):
"""
Evict 50% of the layers on a pageserver
"""
timeline_path = pageserver.timeline_dir(tenant_id, timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)
client = pageserver.http_client()
for layer in initial_local_layers:
if "ephemeral" in layer.name:
continue
if rng.choice([True, False]):
log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer.name}")
client.evict_layer(tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer.name)
@pytest.mark.parametrize("seed", [1, 2, 3])
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
"""
Issue many location configuration changes, ensure that tenants
remain readable & we don't get any unexpected errors. We should
have no ERROR in the log, and no 500s in the API.
The location_config API is intentionally designed so that all destination
states are valid, so that we may test it in this way: the API should always
work as long as the tenant exists.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
pageservers = env.pageservers
list([p.http_client() for p in pageservers])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# We will make no effort to avoid stale attachments
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
# these can happen, if we shutdown at a good time. to be fixed as part of #5172.
message = ".*duplicated L1 layer layer=.*"
ps.allowed_errors.append(message)
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
workload.write_rows(256, env.pageservers[0].id)
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.
rng = random.Random(seed)
initial_generation = 1
last_state = {
env.pageservers[0].id: ("AttachedSingle", initial_generation),
env.pageservers[1].id: ("Detached", None),
env.pageservers[2].id: ("Detached", None),
}
latest_attached = env.pageservers[0].id
for _i in range(0, 64):
# Pick a pageserver
pageserver = rng.choice(env.pageservers)
# Pick a pseudorandom state
modes = [
"AttachedSingle",
"AttachedMulti",
"AttachedStale",
"Secondary",
"Detached",
"_Evictions",
"_Restart",
]
mode = rng.choice(modes)
last_state_ps = last_state[pageserver.id]
if mode == "_Evictions":
if last_state_ps[0].startswith("Attached"):
log.info(f"Action: evictions on pageserver {pageserver.id}")
evict_random_layers(rng, pageserver, tenant_id, timeline_id)
else:
log.info(
f"Action: skipping evictions on pageserver {pageserver.id}, is not attached"
)
elif mode == "_Restart":
log.info(f"Action: restarting pageserver {pageserver.id}")
pageserver.stop()
pageserver.start()
if last_state_ps[0].startswith("Attached") and latest_attached == pageserver.id:
log.info("Entering postgres...")
workload.churn_rows(rng.randint(128, 256), pageserver.id)
workload.validate(pageserver.id)
elif last_state_ps[0].startswith("Attached"):
# The `attachment_service` will only re-attach on startup when a pageserver was the
# holder of the latest generation: otherwise the pageserver will revert to detached
# state if it was running attached with a stale generation
last_state[pageserver.id] = ("Detached", None)
else:
secondary_conf: Optional[Dict[str, Any]] = None
if mode == "Secondary":
secondary_conf = {"warm": rng.choice([True, False])}
location_conf: Dict[str, Any] = {
"mode": mode,
"secondary_conf": secondary_conf,
"tenant_conf": {},
}
log.info(f"Action: Configuring pageserver {pageserver.id} to {location_conf}")
# Select a generation number
if mode.startswith("Attached"):
if last_state_ps[1] is not None:
if rng.choice([True, False]):
# Move between attached states, staying in the same generation
generation = last_state_ps[1]
else:
# Switch generations, while also jumping between attached states
generation = env.attachment_service.attach_hook(tenant_id, pageserver.id)
latest_attached = pageserver.id
else:
generation = env.attachment_service.attach_hook(tenant_id, pageserver.id)
latest_attached = pageserver.id
else:
generation = None
location_conf["generation"] = generation
pageserver.tenant_location_configure(tenant_id, location_conf)
last_state[pageserver.id] = (mode, generation)
if mode.startswith("Attached"):
# TODO: a variant of this test that runs background endpoint workloads, as well as
# the inter-step workloads.
workload.churn_rows(
rng.randint(128, 256), pageserver.id, upload=mode != "AttachedStale"
)
workload.validate(pageserver.id)
# Attach all pageservers
for ps in env.pageservers:
location_conf = {"mode": "AttachedMulti", "secondary_conf": None, "tenant_conf": {}}
ps.tenant_location_configure(tenant_id, location_conf)
# Confirm that all are readable
for ps in env.pageservers:
workload.validate(ps.id)
# Detach all pageservers
for ps in env.pageservers:
location_conf = {"mode": "Detached", "secondary_conf": None, "tenant_conf": {}}
ps.tenant_location_configure(tenant_id, location_conf)
# Confirm that all local disk state was removed on detach
# TODO
def test_live_migration(neon_env_builder: NeonEnvBuilder):
"""
Test the sequence of location states that are used in a live migration.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
pageserver_a = env.pageservers[0]
pageserver_b = env.pageservers[1]
initial_generation = 1
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
workload.write_rows(256, env.pageservers[0].id)
# Make the destination a secondary location
pageserver_b.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
workload.churn_rows(64, pageserver_a.id, upload=False)
# Set origin attachment to stale
log.info("Setting origin to AttachedStale")
pageserver_a.tenant_location_configure(
tenant_id,
{
"mode": "AttachedStale",
"secondary_conf": None,
"tenant_conf": {},
"generation": initial_generation,
},
flush_ms=5000,
)
migrated_generation = env.attachment_service.attach_hook(tenant_id, pageserver_b.id)
log.info(f"Acquired generation {migrated_generation} for destination pageserver")
assert migrated_generation == initial_generation + 1
# Writes and reads still work in AttachedStale.
workload.validate(pageserver_a.id)
workload.validate(pageserver_a.id)
# Ensure that secondary location's timeline directory is populated: we will then
# do some more writes on top of that to ensure that the newly attached pageserver
# properly makes use of the downloaded layers as well as ingesting WAL to catch up.
pageserver_a.http_client().secondary_tenant_upload(tenant_id)
pageserver_b.http_client().secondary_tenant_download(tenant_id)
# Generate some more dirty writes
workload.churn_rows(64, pageserver_a.id)
# Attach the destination
log.info("Setting destination to AttachedMulti")
pageserver_b.tenant_location_configure(
tenant_id,
{
"mode": "AttachedMulti",
"secondary_conf": None,
"tenant_conf": {},
"generation": migrated_generation,
},
)
# Wait for destination LSN to catch up with origin
origin_lsn = pageserver_a.http_client().timeline_detail(tenant_id, timeline_id)[
"last_record_lsn"
]
def caught_up():
destination_lsn = pageserver_b.http_client().timeline_detail(tenant_id, timeline_id)[
"last_record_lsn"
]
log.info(
f"Waiting for LSN to catch up: origin {origin_lsn} vs destination {destination_lsn}"
)
assert destination_lsn >= origin_lsn
wait_until(100, 0.1, caught_up)
# The destination should accept writes
workload.churn_rows(64, pageserver_b.id)
# Dual attached: both are readable.
workload.validate(pageserver_a.id)
workload.validate(pageserver_b.id)
# Revert the origin to secondary
log.info("Setting origin to Secondary")
pageserver_a.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
workload.churn_rows(64, pageserver_b.id)
# Put the destination into final state
pageserver_b.tenant_location_configure(
tenant_id,
{
"mode": "AttachedSingle",
"secondary_conf": None,
"tenant_conf": {},
"generation": migrated_generation,
},
)
workload.churn_rows(64, pageserver_b.id)
workload.validate(pageserver_b.id)
def list_layers(pageserver, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
"""
Inspect local storage on a pageserver to discover which layer files are present.
:return: list of relative paths to layers, from the timeline root.
"""
timeline_path = pageserver.timeline_dir(tenant_id, timeline_id)
def relative(p: Path) -> Path:
return p.relative_to(timeline_path)
return sorted(
list(
map(
relative,
filter(
lambda path: path.name != "metadata"
and "ephemeral" not in path.name
and "temp" not in path.name,
timeline_path.glob("*"),
),
)
)
)
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
"""
Test the overall data flow in secondary mode:
- Heatmap uploads from the attached location
- Heatmap & layer downloads from the secondary location
- Eviction of layers on the attached location results in deletion
on the secondary location as well.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps_attached = env.pageservers[0]
ps_secondary = env.pageservers[1]
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
workload.write_rows(256, ps_attached.id)
# Configure a secondary location
log.info("Setting up secondary location...")
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
# Explicit upload/download cycle
# ==============================
log.info("Synchronizing after initial write...")
ps_attached.http_client().secondary_tenant_upload(tenant_id)
ps_secondary.http_client().secondary_tenant_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Make changes on attached pageserver, check secondary downloads them
# ===================================================================
log.info("Synchronizing after subsequent write...")
workload.churn_rows(128, ps_attached.id)
ps_attached.http_client().secondary_tenant_upload(tenant_id)
ps_secondary.http_client().secondary_tenant_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Do evictions on attached pageserver, check secondary follows along
# ==================================================================
log.info("Evicting a layer...")
layer_to_evict = list_layers(ps_attached, tenant_id, timeline_id)[0]
ps_attached.http_client().evict_layer(tenant_id, timeline_id, layer_name=layer_to_evict.name)
log.info("Synchronizing after eviction...")
ps_attached.http_client().secondary_tenant_upload(tenant_id)
ps_secondary.http_client().secondary_tenant_download(tenant_id)
assert layer_to_evict not in list_layers(ps_attached, tenant_id, timeline_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Scrub the remote storage
# ========================
# This confirms that the scrubber isn't upset by the presence of the heatmap
# TODO: depends on `jcsp/scrubber-index-part` branch.
# Detach secondary and delete tenant
# ===================================
# This confirms that the heatmap gets cleaned up as well as other normal content.
log.info("Detaching secondary location...")
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
},
)
log.info("Deleting tenant...")
tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
# def test_secondary_download_loop
# Configure some short check intervals, and validate that layers are downloaded by secondary
# without any explicit admin API calls.
# def test_secondary_eviction(neon_env_builder: NeonEnvBuilder):
#

View File

@@ -586,7 +586,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
log.info("sending delete request")
checkpoint_allowed_to_fail.set()
env.pageserver.allowed_errors.append(
".* ERROR .*Error processing HTTP request: InternalServerError\\(The timeline or pageserver is shutting down"
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
".* ERROR .*[Cc]ould not flush frozen layer.*"
)

View File

@@ -0,0 +1,54 @@
import time
from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import TenantId
from fixtures.log_helper import log
def test_tenant_duplicate(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep_main:
ep_main.safe_psql("CREATE TABLE foo (i int);")
ep_main.safe_psql("INSERT INTO foo VALUES (1), (2), (3);")
last_flush_lsn = last_flush_lsn_upload(
env, ep_main, env.initial_tenant, env.initial_timeline
)
new_tenant_id = TenantId.generate()
# timeline id remains unchanged with tenant_duplicate
# TODO: implement a remapping scheme so timeline ids remain globally unique
new_timeline_id = env.initial_timeline
log.info(f"Duplicate tenant/timeline will be: {new_tenant_id}/{new_timeline_id}")
ps_http = env.pageserver.http_client()
ps_http.tenant_duplicate(env.initial_tenant, new_tenant_id)
ps_http.tenant_delete(env.initial_tenant)
env.neon_cli.map_branch("duplicate", new_tenant_id, new_timeline_id)
# start read-only replicate and validate
with env.endpoints.create_start(
"duplicate", tenant_id=new_tenant_id, lsn=last_flush_lsn
) as ep_dup:
with ep_dup.connect() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM foo ORDER BY i;")
cur.fetchall() == [(1,), (2,), (3,)]
# ensure restarting PS works
env.pageserver.stop()
env.pageserver.start()

View File

@@ -227,7 +227,9 @@ def test_tenant_redownloads_truncated_file_on_startup(
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
env.pageserver.allowed_errors.append(".*removing local file .* because .*")
env.pageserver.allowed_errors.append(
".*removing local file .* because it has unexpected length.*"
)
# FIXME: Are these expected?
env.pageserver.allowed_errors.append(

View File

@@ -74,6 +74,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
prev = Some(req);
}
PagestreamFeMessage::DbSize(_) => {}
PagestreamFeMessage::NoOp => {},
};
}