Compare commits

...

49 Commits

Author SHA1 Message Date
Christian Schwarz
218af17c29 turn Timeline::layers into tokio::sync::RwLock
(cherry picked from commit 2001c31a14)
2023-05-26 20:11:05 +02:00
Christian Schwarz
4d03689d9a (does not compile): make TimelineWriter Send by using tokio::sync Mutex internally
fails with

cs@devvm:[~/src/neon]: cargo check -p pageserver  --features testing
    Checking pageserver v0.1.0 (/home/cs/src/neon/pageserver)
error: future cannot be sent between threads safely
   --> pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:426:33
    |
426 |         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
    |                                 ^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `Instrumented<[async block@pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:427:13: 439:14]>`, the trait `std::marker::Send` is not implemented for `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>`
note: future is not `Send` as this value is used across an await
   --> pageserver/src/tenant/timeline.rs:872:46
    |
850 |         let layers = self.layers.read().unwrap();
    |             ------ has type `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>` which is not `Send`
...
872 |                 self.freeze_inmem_layer(true).await;
    |                                              ^^^^^^ await occurs here, with `layers` maybe used later
...
881 |     }
    |     - `layers` is later dropped here
note: required by a bound in `TaskHandle::<E>::spawn`
   --> pageserver/src/tenant/timeline/walreceiver.rs:196:52
    |
192 |     fn spawn<Fut>(
    |        ----- required by a bound in this
...
196 |         Fut: Future<Output = anyhow::Result<()>> + Send,
    |                                                    ^^^^ required by this bound in `TaskHandle::<E>::spawn`

error: could not compile `pageserver` due to previous error
(cherry picked from commit 7de3799e66)
2023-05-26 20:03:13 +02:00
Christian Schwarz
47647433c3 basebackup import: pre-lock the layer map for the flush() calls
The checkpointer loop isn't running anyway, so, there's no risk of
blocking it through the pre-lock.

(cherry picked from commit 1b2663350c)
(cherry picked from commit a1680b185f)
2023-05-26 20:00:40 +02:00
Christian Schwarz
24df184a4e controversial but necessary: keep holding layer map lock inside compact_level0_phase1
Without this, the seocnd read().unwrap() becomes an await point,
which makes the future not-Send, but, we require it to be Send
because it runs inside task_mgr::spawn, which requires the Fut's to be Send

(cherry picked from commit a1ae23b827)
2023-05-26 20:00:34 +02:00
Christian Schwarz
223aba4c09 move load_layer_map to before initialize_with_lock outside of Tenant::timelines held code 2023-05-26 19:59:55 +02:00
Christian Schwarz
aa9240af3f timeline_init_and_sync: don't hold Tenant::timelines while load_layer_map
This patch inlines `initialize_with_lock` and then reorganizes the code
such that we can `load_layer_map` without holding the
`Tenant::timelines` lock.

As a nice aside, we can get rid of the dummy() uninit mark, which has
always been a terrible hack.
2023-05-26 19:55:16 +02:00
Christian Schwarz
9a4789ec73 demote warn line to info-level, as the log line in set_stopping() is also info!()
This should fix the faile regress tests that barked on allowed_errors
2023-05-26 18:22:41 +02:00
Christian Schwarz
72159ee686 Merge remote-tracking branch 'origin/main' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-26 18:03:35 +02:00
Christian Schwarz
e7c4ef9f4f don't hold TENANTS lock while waiting for set_stopping() 2023-05-26 17:46:09 +02:00
Christian Schwarz
13d3f4c29f set_stopping(): report in result if not transitioning to Stopping 2023-05-26 17:46:09 +02:00
Christian Schwarz
b09beaa4fe log while waiting for tenant to finish activation 2023-05-26 09:34:12 +02:00
Christian Schwarz
1367e2b0ee improve TenantState doc comments, repeating what's in the Mermaid diagram 2023-05-26 09:31:44 +02:00
Christian Schwarz
dd0f5c4ef3 Merge remote-tracking branch 'origin/main' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-25 22:20:52 +02:00
Christian Schwarz
de780d2e0f make TenantState::{Loading,Attaching,Activating} owned by spawn_load / spawn_attach
See the Mermaid diagram in the doc comment for the now-possible state transitions.

The two core insights / changes are:
- spawn_load and spawn_attach own the tenant state until they're done
- once load()/attach() calls are done
    - if they failed, transition them to Broken directly (we know
      that there's no background activity because we didn't call activate yet)
    - if they succeed, call activate. We can make it infallible. How? Later.

- set_broken() and set_stopping() are changed to wait for spawn_load() /
  spawn_attach() to finish. This sounds scary because it might hinder
  detach or shutdown, but actually, concurrent attach+detach, or
  attach+shutdown, or load+shutdown, or attach+shutdown were just racy.
  With this change, they're not anymore.
  We can add a CancellationToken stored in Tenant for load/attach and cancel
  it from set_stopping() or set_broken() if necessary in the future.

So, why can activate() be infallible now: because we declare that
spawn_load and spawn_attach own the tenant state until they're done.
And we enforce that ownership using the wait_for at the start of
set_stopping and set_broken.
2023-05-25 15:02:43 +02:00
Christian Schwarz
f18d9f555b Revert "Revert "use tokio::sync::Receiver::wait_for""
This reverts commit eaf270c648.
2023-05-25 14:58:49 +02:00
Christian Schwarz
05a2fe08d1 Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-25 14:58:19 +02:00
Christian Schwarz
eaf270c648 Revert "use tokio::sync::Receiver::wait_for"
This reverts commit fe4ef121b6.
2023-05-25 14:57:41 +02:00
Christian Schwarz
ddad0928c5 Merge branch 'problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-25 14:53:32 +02:00
Christian Schwarz
96c550222b apply heikki's comment suggestion 2023-05-25 14:53:20 +02:00
Christian Schwarz
cf8ff7edad explainer comment on storage_broker::connect async weirdness 2023-05-25 14:51:48 +02:00
Christian Schwarz
da6573f551 Merge branch 'problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-25 10:54:30 +02:00
Christian Schwarz
2fee8c884f Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-25 10:54:03 +02:00
Christian Schwarz
fe4ef121b6 use tokio::sync::Receiver::wait_for 2023-05-25 10:44:26 +02:00
Christian Schwarz
641ca994dc assert_eq suggestion 2023-05-25 09:55:32 +02:00
Christian Schwarz
413598b19b fix merge fallout (?) 2023-05-24 17:42:51 +02:00
Christian Schwarz
b345f32e3f Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-24 17:25:35 +02:00
Christian Schwarz
69cfa9fe61 launch_wal_receiver: apply joonas's review suggestion (visibility + doc comment) 2023-05-24 17:20:03 +02:00
Christian Schwarz
2c424c8f4e Revert "activate_timelines counter is now == not_broken_timelines.len()"
not_broken_timelines is an iterator, doesn't have `len()`.

This reverts commit 4001f441c0.
2023-05-24 17:19:22 +02:00
Christian Schwarz
4001f441c0 activate_timelines counter is now == not_broken_timelines.len() 2023-05-24 17:14:49 +02:00
Christian Schwarz
ef956c47fc make it clear that walreceiver_status is always used in the branch where it's produced 2023-05-24 17:12:35 +02:00
Christian Schwarz
8606b6abe5 Merge remote-tracking branch 'origin/problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-24 17:02:18 +02:00
Christian Schwarz
732f60317b Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 16:58:25 +02:00
Christian Schwarz
b54431bbd3 pass the BrokerClientChannel by value & clone it as necessary
It's a wrapper around an inner Arc anyways

Also, this gets rid of the OnceCell
2023-05-24 12:29:05 +02:00
Christian Schwarz
def5eb8542 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 11:57:37 +02:00
Christian Schwarz
07da786ed3 apply joonas's suggestion to use parent: None + follows_from 2023-05-24 11:56:26 +02:00
Christian Schwarz
75c3c43b2e don't unwrap() the activate() result in spawn_load / spawn_attach 2023-05-24 11:36:07 +02:00
Christian Schwarz
bdf03eab58 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 11:32:38 +02:00
Christian Schwarz
32c85fa87a Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation 2023-05-24 11:31:00 +02:00
Christian Schwarz
b2e0c58a8c Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-23 20:44:34 +02:00
Christian Schwarz
94f30f0660 Merge branch 'problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-23 20:44:12 +02:00
Christian Schwarz
a55d224923 tests would fail because broker client needs to be launched on a tokio runtime thread 2023-05-23 20:43:10 +02:00
Christian Schwarz
4f586ac101 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-23 20:42:54 +02:00
Christian Schwarz
feb2e80b83 tests were failing because activate() was outside of a span with tenant_id 2023-05-23 20:36:32 +02:00
Christian Schwarz
ee22e81583 don't hold timelines lock inside set_stopping() 2023-05-23 20:11:15 +02:00
Christian Schwarz
3e604eaa39 refactor: introduce TenantState::Activating to avoid holding timelines lock inside Tenant::activate 2023-05-23 20:03:12 +02:00
Christian Schwarz
8bcb542a3b refactor: make timeline activation infallible
Timeline::activate() was only fallible because `launch_wal_receiver` was.

`launch_wal_receiver` was fallible only because of some preliminary
checks in `WalReceiver::start`.

Turns out these checks can be shifted to the type system by delaying
creatinon of the `WalReceiver` struct to the point where we activate the timeline.

The changes in this PR were enabled by my previous refactoring that funneled
the broker_client from pageserver startup to the activate() call sites.
2023-05-23 19:27:06 +02:00
Christian Schwarz
17b081d294 refactor: eliminate global storage_broker client state
(This is prep work to make `Timeline::activate` infallible.)

This patch removes the global storage_broker client instance from the
pageserver codebase.

Instead, pageserver startup instantiates it and passes it down to
the `Timeline::activate` function, which in turn passes it to
the WalReceiver, which is the entity that actually uses it.
2023-05-23 19:27:06 +02:00
Christian Schwarz
d5337e6a65 refactor responsibility for tenant/timeline activation
(This is prep work to make `Timeline::activate()` infallible.)

The current possibility for failure in `Timeline::activate()`
is the broker client's presence / absence. It should be an assert, but
we're careful with these. So, I'm planning to pass in the broker
client to activate(), thereby eliminating the possiblity of its absence.

In the unit tests, we don't have a broker client. So, I thought I'd be
in trouble because the unit tests also called `activate()` before this
PR.

However, closer inspection reveals a long-standing FIXME about this,
which is addressed by this patch.

It turns out that the unit tests don't actually need the background
loops to be running. They just need the state value to be `Active`.
So, for the tests, we just set it to that value but don't spawn
the background loops.

We'll need to revisit this if we ever do more Rust unit tests in the
future. But right now, this refactoring improves the code, so, let's
revisit when we get there.
2023-05-23 19:26:36 +02:00
Christian Schwarz
cc96a5186d tenant_map_insert: don't expose the vacant entry to the closure
This tightens up the API a little.
Byproduct of some refactoring work that I'm doing right now.
2023-05-23 19:25:47 +02:00
19 changed files with 732 additions and 441 deletions

View File

@@ -18,7 +18,29 @@ use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
/// The state of a tenant in this pageserver.
///
/// ```mermaid
/// stateDiagram-v2
///
/// [*] --> Loading: spawn_load()
/// [*] --> Attaching: spawn_attach()
///
/// Loading --> Activating: activate()
/// Attaching --> Activating: activate()
/// Activating --> Active: infallible
///
/// Loading --> Broken: load() failure
/// Attaching --> Broken: attach() failure
///
/// Active --> Stopping: set_stopping(), part of shutdown & detach
/// Stopping --> Broken: late error in remove_tenant_from_memory
///
/// Broken --> [*]: ignore / detach / shutdown
/// Stopping --> [*]: remove_from_memory complete
///
/// Active --> Broken: cfg(testing)-only tenant break point
/// ```
#[derive(
Clone,
PartialEq,
@@ -33,17 +55,38 @@ use bytes::{BufMut, Bytes, BytesMut};
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Loading,
/// This tenant is being downloaded from cloud storage.
/// This tenant is being attached to the pageserver.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Attaching,
/// Tenant is fully operational
/// The tenant is transitioning from Loading/Attaching to Active.
///
/// While in this state, the individual timelines are being activated.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Activating,
/// The tenant has finished activating and is open for business.
///
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
Active,
/// A tenant is recognized by pageserver, but it is being detached or the
/// The tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping,
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
///
/// If the tenant fails to load or attach, it will transition to this state
/// and it is guaranteed that no background tasks are running in its name.
///
/// The other way to transition into this state is from `Stopping` state
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
Broken { reason: String, backtrace: String },
}
@@ -60,6 +103,7 @@ impl TenantState {
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading => Attached,
Self::Activating => todo!(),
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,

View File

@@ -512,7 +512,7 @@ async fn collect_eviction_candidates(
if !tl.is_active() {
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction();
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers

View File

@@ -212,7 +212,7 @@ async fn build_timeline_info(
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
let mut info = build_timeline_info_common(timeline, ctx).await?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
@@ -230,7 +230,7 @@ async fn build_timeline_info(
Ok(info)
}
fn build_timeline_info_common(
async fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
@@ -261,7 +261,7 @@ fn build_timeline_info_common(
None
}
};
let current_physical_size = Some(timeline.layer_size_sum());
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -321,6 +321,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -551,7 +552,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in tenant.list_timelines().iter() {
current_physical_size += timeline.layer_size_sum();
current_physical_size += timeline.layer_size_sum().await;
}
let state = tenant.current_state();
@@ -655,7 +656,7 @@ async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info(reset);
let layer_map_info = timeline.layer_map_info(reset).await;
json_response(StatusCode::OK, layer_map_info)
}
@@ -859,7 +860,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned());
tenant.set_broken("broken from test".to_owned()).await;
json_response(StatusCode::OK, ())
}

View File

@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush()?;
modification.flush().await?;
}
}
// We're done importing all the data files.
modification.commit()?;
modification.commit().await?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush()?;
modification.flush().await?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -594,7 +594,7 @@ async fn import_file(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer();
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

View File

@@ -489,7 +489,9 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute

View File

@@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self) -> anyhow::Result<()> {
pub async fn flush(&mut self) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1116,13 +1116,15 @@ impl<'a> DatadirModification<'a> {
return Ok(());
}
let writer = self.tline.writer();
let writer = self.tline.writer().await;
let mut layer_map = self.tline.layers.write().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut result: anyhow::Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, self.lsn, value);
result = writer.put_locked(key, self.lsn, value, &mut layer_map);
false
} else {
true
@@ -1143,17 +1145,17 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer();
pub async fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value)?;
writer.put(key, lsn, &value).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
writer.delete(key_range, lsn).await?;
}
writer.finish_write(lsn);
@@ -1594,16 +1596,18 @@ fn is_slru_block_key(key: Key) -> bool {
}
#[cfg(test)]
pub fn create_test_timeline(
pub async fn create_test_timeline(
tenant: &crate::tenant::Tenant,
timeline_id: utils::id::TimelineId,
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
let tline = tenant
.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)
.await?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;
m.commit().await?;
Ok(tline)
}

File diff suppressed because it is too large Load Diff

View File

@@ -10,6 +10,7 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -19,7 +20,9 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::fs_ext::PathExt;
@@ -244,12 +247,55 @@ pub async fn shutdown_all_tenants() {
}
};
// Set tenant (and its timlines) to Stoppping state.
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we git SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for othher tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (_, tenant) in tenants_to_shut_down {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping();
tenants_to_freeze_and_flush.push(tenant);
join_set.spawn(async move {
match tenant.set_stopping().await {
Ok(()) => Ok(tenant),
Err(e) => Err((tenant, e)),
}
});
}
while let Some(res) = join_set.join_next().await {
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls set_stopping() panicked, don't know which tenant this is, and probably freeze_and_flush won't work anyways: {join_error:#}");
}
Ok(retval) => match retval {
Ok(tenant) => {
// success
debug!("tenant successfully stopped: {}", tenant.tenant_id);
tenants_to_freeze_and_flush.push(tenant);
}
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
Err((tenant, SetStoppingError::AlreadyStopping)) => {
tenants_to_freeze_and_flush.push(tenant);
}
Err((tenant, SetStoppingError::Broken)) => {
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id);
tenants_to_freeze_and_flush.push(tenant);
}
},
}
}
@@ -266,8 +312,9 @@ pub async fn shutdown_all_tenants() {
// On error, log it but continue with the shutdown for other tenants.
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
debug!("freeze_and_flush tenant {tenant_id}");
// TODO this could probably run in a JoinSet as well?
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
@@ -589,13 +636,23 @@ where
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping(),
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure
}
Err(SetStoppingError::Broken) => {
// continue the procedure, let's hope the closure can deal with broken tenants
}
Err(SetStoppingError::AlreadyStopping) => {
// the tenant is already stopping or broken, don't do anything
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
None => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -620,7 +677,7 @@ where
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(e.to_string());
tenant.set_broken(e.to_string()).await;
}
None => {
warn!("Tenant {tenant_id} got removed from memory");

View File

@@ -1264,7 +1264,12 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = runtime.block_on(tenant.create_test_timeline(
TIMELINE_ID,
Lsn(0),
DEFAULT_PG_VERSION,
&ctx,
))?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -304,7 +304,7 @@ impl InMemoryLayer {
Ok(())
}
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())

View File

@@ -28,7 +28,7 @@ use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::context::{DownloadBehavior, RequestContext};
@@ -119,7 +119,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -179,7 +179,7 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
write_lock: tokio::sync::Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
flush_loop_state: Mutex<FlushLoopState>,
@@ -238,6 +238,8 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
}
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
/// Internal structure to hold all data needed for logical size calculation.
///
/// Calculation consists of two stages:
@@ -572,8 +574,8 @@ impl Timeline {
/// The sum of the file size of all historic layers in the layer map.
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().unwrap();
pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await;
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
@@ -664,7 +666,7 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false);
self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait().await
}
@@ -844,10 +846,10 @@ impl Timeline {
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
pub async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
_write_guard: self.write_lock.lock().await,
}
}
@@ -881,9 +883,9 @@ impl Timeline {
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
@@ -905,7 +907,7 @@ impl Timeline {
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.freeze_inmem_layer(true).await;
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
@@ -979,8 +981,8 @@ impl Timeline {
}
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1002,7 +1004,7 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
@@ -1015,7 +1017,7 @@ impl Timeline {
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1100,7 +1102,7 @@ impl Timeline {
}
// start the batch update
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1344,7 +1346,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
layers: tokio::sync::RwLock::new(LayerMap::default()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -1379,7 +1381,7 @@ impl Timeline {
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo {
@@ -1517,8 +1519,8 @@ impl Timeline {
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1647,7 +1649,7 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1800,7 +1802,7 @@ impl Timeline {
let local_layers = self
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
@@ -2152,8 +2154,8 @@ impl Timeline {
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
@@ -2360,7 +2362,7 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
let layers = timeline.layers.read().await;
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2539,9 +2541,16 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().unwrap();
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await;
self.get_layer_for_write_locked(lsn, &mut layers)
}
fn get_layer_for_write_locked(
&self,
lsn: Lsn,
layers: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
let last_record_lsn = self.get_last_record_lsn();
@@ -2584,16 +2593,29 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val)?;
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
fn put_value_locked(
&self,
key: Key,
lsn: Lsn,
val: &Value,
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write_locked(lsn, pre_locked_layer_map)?;
layer.put_value(key, lsn, val)?;
Ok(())
}
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn).await?;
layer.put_tombstone(key_range, lsn).await?;
Ok(())
}
@@ -2605,15 +2627,15 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().unwrap())
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
@@ -2651,7 +2673,7 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
@@ -2743,7 +2765,7 @@ impl Timeline {
.await?
} else {
// normal case, write out a L0 delta layer file.
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
HashMap::from([(delta_path, metadata)])
};
@@ -2752,7 +2774,7 @@ impl Timeline {
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now.
{
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
@@ -2846,7 +2868,7 @@ impl Timeline {
}
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
async fn create_delta_layer(
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
@@ -2870,7 +2892,7 @@ impl Timeline {
// Add it to the layer map
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
@@ -2922,10 +2944,14 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
async fn time_for_new_image_layer(
&self,
partition: &KeySpace,
lsn: Lsn,
) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut max_deltas = 0;
{
@@ -3020,7 +3046,7 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
start = img_range.end;
if force || self.time_for_new_image_layer(partition, lsn)? {
if force || self.time_for_new_image_layer(partition, lsn).await? {
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -3098,7 +3124,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
@@ -3165,9 +3191,8 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
@@ -3288,7 +3313,6 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3525,7 +3549,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3785,7 +3809,7 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4081,7 +4105,7 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut layers = self_clone.layers.write().await;
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
@@ -4239,7 +4263,7 @@ impl Timeline {
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
@@ -4341,8 +4365,8 @@ impl LocalLayerInfoForDiskUsageEviction {
}
impl Timeline {
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().unwrap();
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await;
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
@@ -4414,7 +4438,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// but will cause large code changes.
pub struct TimelineWriter<'a> {
tl: &'a Timeline,
_write_guard: MutexGuard<'a, ()>,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
impl Deref for TimelineWriter<'_> {
@@ -4430,12 +4454,23 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value)
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value).await
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn)
pub fn put_locked(
&self,
key: Key,
lsn: Lsn,
value: &Value,
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<()> {
self.tl
.put_value_locked(key, lsn, value, pre_locked_layer_map)
}
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn).await
}
/// Track the end of the latest digested WAL record.

View File

@@ -185,7 +185,7 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {

View File

@@ -1309,6 +1309,7 @@ mod tests {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.await
.expect("Failed to create an empty timeline for dummy wal connection manager");
ConnectionManagerState {

View File

@@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection(
}
}
timeline.check_checkpoint_distance().with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn =

View File

@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -1200,7 +1200,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit()?;
m.commit().await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
Ok(walingest)
@@ -1209,7 +1209,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1217,22 +1217,22 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1318,7 +1318,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1360,7 +1360,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
@@ -1373,7 +1373,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
@@ -1398,7 +1398,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
@@ -1428,14 +1428,14 @@ mod tests {
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1454,7 +1454,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit()?;
m.commit().await?;
// Check that rel is not visible anymore
assert_eq!(
@@ -1472,7 +1472,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1497,7 +1497,7 @@ mod tests {
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1509,7 +1509,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
@@ -1554,7 +1554,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check reported size and contents after truncation
assert_eq!(
@@ -1603,7 +1603,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
assert_eq!(
tline
@@ -1637,7 +1637,7 @@ mod tests {
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut lsn = 0x10;
@@ -1648,7 +1648,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
.await?;
m.commit()?;
m.commit().await?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1664,7 +1664,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
@@ -1677,7 +1677,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
@@ -1693,7 +1693,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*could not load tenant.*load local timeline.*",
".*load failed.*load local timeline.*",
]
)

View File

@@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore(
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*error attaching tenant: storage-sync-list-remote-timelines",
".*attach failed.*: storage-sync-list-remote-timelines",
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,

View File

@@ -647,7 +647,9 @@ def test_ignored_tenant_stays_broken_without_metadata(
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)

View File

@@ -22,6 +22,7 @@ from fixtures.neon_fixtures import (
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -308,9 +309,7 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(
".*could not load tenant.*Failed to list timelines directory.*"
)
env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*")
client = env.pageserver.http_client()
@@ -341,9 +340,15 @@ def test_pageserver_with_empty_tenants(
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert len(tenants) == 2
def not_loading():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
wait_until(10, 0.2, not_loading)
tenants = client.tenant_list()
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
@@ -355,7 +360,7 @@ def test_pageserver_with_empty_tenants(
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert (