Compare commits

...

67 Commits

Author SHA1 Message Date
Christian Schwarz
c1d9cb88ae turn Timeline::layers into tokio::sync::RwLock 2023-05-26 19:35:10 +02:00
Christian Schwarz
20a9356729 (does not compile): make TimelineWriter Send by using tokio::sync Mutex internally
fails with

cs@devvm:[~/src/neon]: cargo check -p pageserver  --features testing
    Checking pageserver v0.1.0 (/home/cs/src/neon/pageserver)
error: future cannot be sent between threads safely
   --> pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:426:33
    |
426 |         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
    |                                 ^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `Instrumented<[async block@pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:427:13: 439:14]>`, the trait `std::marker::Send` is not implemented for `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>`
note: future is not `Send` as this value is used across an await
   --> pageserver/src/tenant/timeline.rs:872:46
    |
850 |         let layers = self.layers.read().unwrap();
    |             ------ has type `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>` which is not `Send`
...
872 |                 self.freeze_inmem_layer(true).await;
    |                                              ^^^^^^ await occurs here, with `layers` maybe used later
...
881 |     }
    |     - `layers` is later dropped here
note: required by a bound in `TaskHandle::<E>::spawn`
   --> pageserver/src/tenant/timeline/walreceiver.rs:196:52
    |
192 |     fn spawn<Fut>(
    |        ----- required by a bound in this
...
196 |         Fut: Future<Output = anyhow::Result<()>> + Send,
    |                                                    ^^^^ required by this bound in `TaskHandle::<E>::spawn`

error: could not compile `pageserver` due to previous error
2023-05-26 19:35:10 +02:00
Christian Schwarz
163ec3ccd1 basebackup import: pre-lock the layer map for the flush() calls
The checkpointer loop isn't running anyway, so, there's no risk of
blocking it through the pre-lock.

(cherry picked from commit 1b2663350c)
2023-05-26 19:35:10 +02:00
Christian Schwarz
eba6c00de4 controversial but necessary: keep holding layer map lock inside compact_level0_phase1
Without this, the seocnd read().unwrap() becomes an await point,
which makes the future not-Send, but, we require it to be Send
because it runs inside task_mgr::spawn, which requires the Fut's to be Send
2023-05-26 19:35:10 +02:00
Christian Schwarz
9d3267e474 clippy continued 2023-05-26 19:33:11 +02:00
Christian Schwarz
6b25cb5030 clippy 2023-05-26 18:36:13 +02:00
Christian Schwarz
f91ad65fb3 Merge branch 'problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify' into problame/async-timeline-get/refactor-timeline-initialization-to-avoid-holding-tenants-timelines-lock 2023-05-26 18:24:23 +02:00
Christian Schwarz
9a4789ec73 demote warn line to info-level, as the log line in set_stopping() is also info!()
This should fix the faile regress tests that barked on allowed_errors
2023-05-26 18:22:41 +02:00
Christian Schwarz
72159ee686 Merge remote-tracking branch 'origin/main' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-26 18:03:35 +02:00
Christian Schwarz
be74662d05 re-introduce the check for 0 layers, based on cause 2023-05-26 17:52:26 +02:00
Christian Schwarz
e7c4ef9f4f don't hold TENANTS lock while waiting for set_stopping() 2023-05-26 17:46:09 +02:00
Christian Schwarz
13d3f4c29f set_stopping(): report in result if not transitioning to Stopping 2023-05-26 17:46:09 +02:00
Christian Schwarz
67258af8a2 Revert "test_broken_timelines: regex needs changing due to changes in this PR"
This reverts commit 17ba307004.
2023-05-26 17:40:37 +02:00
Christian Schwarz
17ba307004 test_broken_timelines: regex needs changing due to changes in this PR
The regex is different because tenant2 is not broken anymore with this
PR, because we allow empty timeline dirs to load
2023-05-26 17:40:34 +02:00
Christian Schwarz
e1486444d6 Revert "test_broken_timelines: wait for tenants to load"
This reverts commit c6f9b8f318.
2023-05-26 17:40:27 +02:00
Christian Schwarz
c6f9b8f318 test_broken_timelines: wait for tenants to load
Without this, we rely on the basebackup request to wait for the tenant to load.

It works, but, would be nice to rule it out, no?
2023-05-26 17:39:14 +02:00
Christian Schwarz
ba3e3bdddf clippy 2023-05-26 17:07:15 +02:00
Christian Schwarz
71f9bbef0d fix the test timeline creation functions 2023-05-26 16:01:45 +02:00
Christian Schwarz
4680f8c60b finish WIP: keep the real timeline from create_empty_timeline outside of timelines map until it has finished filling 2023-05-26 15:29:19 +02:00
Christian Schwarz
3c1fc2617c WIP 2023-05-26 14:24:23 +02:00
Christian Schwarz
60cc197ce3 fix test_timeline_create_break_after_uninit_mark (the refactoring added .context()) 2023-05-26 10:19:24 +02:00
Christian Schwarz
609a929968 instrument shutdown_all_tenants code path, include timeline_id in logs if failed to flush
This can be extracted into an independent commit.
2023-05-26 10:12:33 +02:00
Christian Schwarz
f2abc4c933 independent fix: test_pageserver_metrics_removed_after_detach didn't wait for uploads
This resulted in unexpectedly absent metrics `pageserver_remote_timeline_client_bytes_finished`
tripping the assert quoted below.

Not sure why this PR (#4350) exposed this problem though.
Are we detaching faster? If so, why?

AssertionError: assert {'pageserver_...s_count', ...} == {'pageserver_...s_count', ...}
  Extra items in the right set:
  'pageserver_remote_timeline_client_bytes_started_total'
  'pageserver_remote_timeline_client_bytes_finished_total'
  Full diff:
    {
     'pageserver_created_persistent_files_total',
     'pageserver_current_logical_size',
     'pageserver_evictions_total',
     'pageserver_evictions_with_low_residence_duration_total',
     'pageserver_getpage_reconstruct_seconds_bucket',
     'pageserver_getpage_reconstruct_seconds_count',
     'pageserver_getpage_reconstruct_seconds_sum',
     'pageserver_io_operations_bytes_total',
     'pageserver_io_operations_seconds_bucket',
     'pageserver_io_operations_seconds_count',
     'pageserver_io_operations_seconds_sum',
     'pageserver_last_record_lsn',
     'pageserver_materialized_cache_hits_total',
     'pageserver_remote_operation_seconds_bucket',
     'pageserver_remote_operation_seconds_count',
     'pageserver_remote_operation_seconds_sum',
     'pageserver_remote_physical_size',
  -  'pageserver_remote_timeline_client_bytes_finished_total',
  -  'pageserver_remote_timeline_client_bytes_started_total',
     'pageserver_remote_timeline_client_calls_started_bucket',
     'pageserver_remote_timeline_client_calls_started_count',
     'pageserver_remote_timeline_client_calls_started_sum',
     'pageserver_remote_timeline_client_calls_unfinished',
     'pageserver_resident_physical_size',
     'pageserver_smgr_query_seconds_bucket',
     'pageserver_smgr_query_seconds_count',
     'pageserver_smgr_query_seconds_sum',
     'pageserver_storage_operations_seconds_count_total',
     'pageserver_storage_operations_seconds_sum_total',
     'pageserver_tenant_states_count',
     'pageserver_wait_lsn_seconds_bucket',
     'pageserver_wait_lsn_seconds_count',
     'pageserver_wait_lsn_seconds_sum',
     'pageserver_written_persistent_bytes_total',
    }
2023-05-26 09:54:30 +02:00
Christian Schwarz
b09beaa4fe log while waiting for tenant to finish activation 2023-05-26 09:34:12 +02:00
Christian Schwarz
1367e2b0ee improve TenantState doc comments, repeating what's in the Mermaid diagram 2023-05-26 09:31:44 +02:00
Christian Schwarz
122e23071b fix the tests (commenting out too-conservative "Timeline has no ancestor and no layer files" assert) 2023-05-26 09:23:26 +02:00
Christian Schwarz
696c6ed6ff fix cfg(test) code to the extent that clippy passes 2023-05-26 08:49:42 +02:00
Christian Schwarz
0874e27023 refactor timeline initialization
High-level ideas:
- placeholder Timeline object in timelines map during a timeline creation
- the timeline creations (branch, bootstrap, import_from_basebackup)
  prepare durable state (on-disk & remote)state, if necessary using
  _another_ _temporary_ Timeline object
- once the timeline creations have prepared the durable state, they
  use the normal load routine (load_local_timeline) that is also used
  during pageserver startup
- Once the loading is done, we replace the placheolder timeline object
  with the real one
2023-05-25 23:01:40 +02:00
Christian Schwarz
6fe39ecbf7 add ability to have fake metrics (needed in next patch so we can have to Timeline objects with the same id in memory) 2023-05-25 23:01:40 +02:00
Christian Schwarz
a0c2a85505 timeline_init_and_sync: don't hold Tenant::timelines while load_layer_map
This patch inlines `initialize_with_lock` and then reorganizes the code
such that we can `load_layer_map` without holding the
`Tenant::timelines` lock.

As a nice aside, we can get rid of the dummy() uninit mark, which has
always been a terrible hack.
2023-05-25 23:01:40 +02:00
Christian Schwarz
dd0f5c4ef3 Merge remote-tracking branch 'origin/main' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-25 22:20:52 +02:00
Christian Schwarz
de780d2e0f make TenantState::{Loading,Attaching,Activating} owned by spawn_load / spawn_attach
See the Mermaid diagram in the doc comment for the now-possible state transitions.

The two core insights / changes are:
- spawn_load and spawn_attach own the tenant state until they're done
- once load()/attach() calls are done
    - if they failed, transition them to Broken directly (we know
      that there's no background activity because we didn't call activate yet)
    - if they succeed, call activate. We can make it infallible. How? Later.

- set_broken() and set_stopping() are changed to wait for spawn_load() /
  spawn_attach() to finish. This sounds scary because it might hinder
  detach or shutdown, but actually, concurrent attach+detach, or
  attach+shutdown, or load+shutdown, or attach+shutdown were just racy.
  With this change, they're not anymore.
  We can add a CancellationToken stored in Tenant for load/attach and cancel
  it from set_stopping() or set_broken() if necessary in the future.

So, why can activate() be infallible now: because we declare that
spawn_load and spawn_attach own the tenant state until they're done.
And we enforce that ownership using the wait_for at the start of
set_stopping and set_broken.
2023-05-25 15:02:43 +02:00
Christian Schwarz
f18d9f555b Revert "Revert "use tokio::sync::Receiver::wait_for""
This reverts commit eaf270c648.
2023-05-25 14:58:49 +02:00
Christian Schwarz
05a2fe08d1 Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-25 14:58:19 +02:00
Christian Schwarz
eaf270c648 Revert "use tokio::sync::Receiver::wait_for"
This reverts commit fe4ef121b6.
2023-05-25 14:57:41 +02:00
Christian Schwarz
ddad0928c5 Merge branch 'problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-25 14:53:32 +02:00
Christian Schwarz
96c550222b apply heikki's comment suggestion 2023-05-25 14:53:20 +02:00
Christian Schwarz
cf8ff7edad explainer comment on storage_broker::connect async weirdness 2023-05-25 14:51:48 +02:00
Christian Schwarz
da6573f551 Merge branch 'problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-25 10:54:30 +02:00
Christian Schwarz
2fee8c884f Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-25 10:54:03 +02:00
Christian Schwarz
fe4ef121b6 use tokio::sync::Receiver::wait_for 2023-05-25 10:44:26 +02:00
Christian Schwarz
641ca994dc assert_eq suggestion 2023-05-25 09:55:32 +02:00
Christian Schwarz
413598b19b fix merge fallout (?) 2023-05-24 17:42:51 +02:00
Christian Schwarz
b345f32e3f Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-24 17:25:35 +02:00
Christian Schwarz
69cfa9fe61 launch_wal_receiver: apply joonas's review suggestion (visibility + doc comment) 2023-05-24 17:20:03 +02:00
Christian Schwarz
2c424c8f4e Revert "activate_timelines counter is now == not_broken_timelines.len()"
not_broken_timelines is an iterator, doesn't have `len()`.

This reverts commit 4001f441c0.
2023-05-24 17:19:22 +02:00
Christian Schwarz
4001f441c0 activate_timelines counter is now == not_broken_timelines.len() 2023-05-24 17:14:49 +02:00
Christian Schwarz
ef956c47fc make it clear that walreceiver_status is always used in the branch where it's produced 2023-05-24 17:12:35 +02:00
Christian Schwarz
8606b6abe5 Merge remote-tracking branch 'origin/problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-24 17:02:18 +02:00
Christian Schwarz
732f60317b Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 16:58:25 +02:00
Christian Schwarz
b54431bbd3 pass the BrokerClientChannel by value & clone it as necessary
It's a wrapper around an inner Arc anyways

Also, this gets rid of the OnceCell
2023-05-24 12:29:05 +02:00
Christian Schwarz
def5eb8542 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 11:57:37 +02:00
Christian Schwarz
07da786ed3 apply joonas's suggestion to use parent: None + follows_from 2023-05-24 11:56:26 +02:00
Christian Schwarz
75c3c43b2e don't unwrap() the activate() result in spawn_load / spawn_attach 2023-05-24 11:36:07 +02:00
Christian Schwarz
bdf03eab58 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-24 11:32:38 +02:00
Christian Schwarz
32c85fa87a Merge remote-tracking branch 'origin/main' into problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation 2023-05-24 11:31:00 +02:00
Christian Schwarz
b2e0c58a8c Merge branch 'problame/infallible-timeline-activate/4-make-infallible' into problame/async-timeline-get/dont-hold-timelines-lock-inside-tenant-state-send-modify 2023-05-23 20:44:34 +02:00
Christian Schwarz
94f30f0660 Merge branch 'problame/infallible-timeline-activate/3-funnel-storage-broker-client' into problame/infallible-timeline-activate/4-make-infallible 2023-05-23 20:44:12 +02:00
Christian Schwarz
a55d224923 tests would fail because broker client needs to be launched on a tokio runtime thread 2023-05-23 20:43:10 +02:00
Christian Schwarz
4f586ac101 Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client 2023-05-23 20:42:54 +02:00
Christian Schwarz
feb2e80b83 tests were failing because activate() was outside of a span with tenant_id 2023-05-23 20:36:32 +02:00
Christian Schwarz
ee22e81583 don't hold timelines lock inside set_stopping() 2023-05-23 20:11:15 +02:00
Christian Schwarz
3e604eaa39 refactor: introduce TenantState::Activating to avoid holding timelines lock inside Tenant::activate 2023-05-23 20:03:12 +02:00
Christian Schwarz
8bcb542a3b refactor: make timeline activation infallible
Timeline::activate() was only fallible because `launch_wal_receiver` was.

`launch_wal_receiver` was fallible only because of some preliminary
checks in `WalReceiver::start`.

Turns out these checks can be shifted to the type system by delaying
creatinon of the `WalReceiver` struct to the point where we activate the timeline.

The changes in this PR were enabled by my previous refactoring that funneled
the broker_client from pageserver startup to the activate() call sites.
2023-05-23 19:27:06 +02:00
Christian Schwarz
17b081d294 refactor: eliminate global storage_broker client state
(This is prep work to make `Timeline::activate` infallible.)

This patch removes the global storage_broker client instance from the
pageserver codebase.

Instead, pageserver startup instantiates it and passes it down to
the `Timeline::activate` function, which in turn passes it to
the WalReceiver, which is the entity that actually uses it.
2023-05-23 19:27:06 +02:00
Christian Schwarz
d5337e6a65 refactor responsibility for tenant/timeline activation
(This is prep work to make `Timeline::activate()` infallible.)

The current possibility for failure in `Timeline::activate()`
is the broker client's presence / absence. It should be an assert, but
we're careful with these. So, I'm planning to pass in the broker
client to activate(), thereby eliminating the possiblity of its absence.

In the unit tests, we don't have a broker client. So, I thought I'd be
in trouble because the unit tests also called `activate()` before this
PR.

However, closer inspection reveals a long-standing FIXME about this,
which is addressed by this patch.

It turns out that the unit tests don't actually need the background
loops to be running. They just need the state value to be `Active`.
So, for the tests, we just set it to that value but don't spawn
the background loops.

We'll need to revisit this if we ever do more Rust unit tests in the
future. But right now, this refactoring improves the code, so, let's
revisit when we get there.
2023-05-23 19:26:36 +02:00
Christian Schwarz
cc96a5186d tenant_map_insert: don't expose the vacant entry to the closure
This tightens up the API a little.
Byproduct of some refactoring work that I'm doing right now.
2023-05-23 19:25:47 +02:00
24 changed files with 1929 additions and 1111 deletions

View File

@@ -18,7 +18,29 @@ use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
/// The state of a tenant in this pageserver.
///
/// ```mermaid
/// stateDiagram-v2
///
/// [*] --> Loading: spawn_load()
/// [*] --> Attaching: spawn_attach()
///
/// Loading --> Activating: activate()
/// Attaching --> Activating: activate()
/// Activating --> Active: infallible
///
/// Loading --> Broken: load() failure
/// Attaching --> Broken: attach() failure
///
/// Active --> Stopping: set_stopping(), part of shutdown & detach
/// Stopping --> Broken: late error in remove_tenant_from_memory
///
/// Broken --> [*]: ignore / detach / shutdown
/// Stopping --> [*]: remove_from_memory complete
///
/// Active --> Broken: cfg(testing)-only tenant break point
/// ```
#[derive(
Clone,
PartialEq,
@@ -33,17 +55,38 @@ use bytes::{BufMut, Bytes, BytesMut};
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Loading,
/// This tenant is being downloaded from cloud storage.
/// This tenant is being attached to the pageserver.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Attaching,
/// Tenant is fully operational
/// The tenant is transitioning from Loading/Attaching to Active.
///
/// While in this state, the individual timelines are being activated.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Activating,
/// The tenant has finished activating and is open for business.
///
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
Active,
/// A tenant is recognized by pageserver, but it is being detached or the
/// The tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping,
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
///
/// If the tenant fails to load or attach, it will transition to this state
/// and it is guaranteed that no background tasks are running in its name.
///
/// The other way to transition into this state is from `Stopping` state
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
Broken { reason: String, backtrace: String },
}
@@ -60,6 +103,7 @@ impl TenantState {
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading => Attached,
Self::Activating => todo!(),
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -101,6 +145,7 @@ impl std::fmt::Debug for TenantState {
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
Creating,
/// The timeline is recognized by the pageserver but is not yet operational.
/// In particular, the walreceiver connection loop is not running for this timeline.
/// It will eventually transition to state Active or Broken.

View File

@@ -512,7 +512,7 @@ async fn collect_eviction_candidates(
if !tl.is_active() {
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction();
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers

View File

@@ -212,7 +212,7 @@ async fn build_timeline_info(
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
let mut info = build_timeline_info_common(timeline, ctx).await?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
@@ -230,7 +230,7 @@ async fn build_timeline_info(
Ok(info)
}
fn build_timeline_info_common(
async fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
@@ -261,7 +261,7 @@ fn build_timeline_info_common(
None
}
};
let current_physical_size = Some(timeline.layer_size_sum());
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -321,6 +321,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -551,7 +552,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in tenant.list_timelines().iter() {
current_physical_size += timeline.layer_size_sum();
current_physical_size += timeline.layer_size_sum().await;
}
let state = tenant.current_state();
@@ -655,7 +656,7 @@ async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info(reset);
let layer_map_info = timeline.layer_map_info(reset).await;
json_response(StatusCode::OK, layer_map_info)
}
@@ -859,7 +860,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned());
tenant.set_broken("broken from test".to_owned()).await;
json_response(StatusCode::OK, ())
}

View File

@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush()?;
modification.flush().await?;
}
}
// We're done importing all the data files.
modification.commit()?;
modification.commit().await?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush()?;
modification.flush().await?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -594,7 +594,7 @@ async fn import_file(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer();
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

View File

@@ -24,7 +24,7 @@ pub mod walredo;
use std::path::Path;
use crate::task_mgr::TaskKind;
use tracing::info;
use tracing::{info, instrument};
/// Current storage format version
///
@@ -45,6 +45,7 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[instrument(skip_all)]
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -720,6 +720,7 @@ impl StorageTimeMetrics {
#[derive(Debug)]
pub struct TimelineMetrics {
fake: bool,
tenant_id: String,
timeline_id: String,
pub reconstruct_time_histo: Histogram,
@@ -744,6 +745,7 @@ pub struct TimelineMetrics {
impl TimelineMetrics {
pub fn new(
fake: bool,
tenant_id: &TenantId,
timeline_id: &TimelineId,
evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder,
@@ -797,7 +799,8 @@ impl TimelineMetrics {
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
TimelineMetrics {
let m = TimelineMetrics {
fake,
tenant_id,
timeline_id,
reconstruct_time_histo,
@@ -819,12 +822,16 @@ impl TimelineMetrics {
evictions_with_low_residence_duration: std::sync::RwLock::new(
evictions_with_low_residence_duration,
),
}
}
}
};
impl Drop for TimelineMetrics {
fn drop(&mut self) {
if fake {
m.remove_metrics();
}
m
}
fn remove_metrics(&self) {
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]);
@@ -860,6 +867,14 @@ impl Drop for TimelineMetrics {
}
}
impl Drop for TimelineMetrics {
fn drop(&mut self) {
if !self.fake {
self.remove_metrics();
}
}
}
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
let tid = tenant_id.to_string();
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);

View File

@@ -14,6 +14,7 @@ use bytes::Buf;
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TenantState;
use pageserver_api::models::TimelineState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -24,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::collections::hash_map::Entry;
use std::io;
use std::net::TcpListener;
use std::pin::pin;
@@ -51,6 +53,8 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::compare_arced_timeline;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
@@ -489,7 +493,14 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
let (guard, real_timeline_not_in_tenants_map) = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO spawn flush loop of timeline early (before activation),
// but then we need to take care of shutting it down in case we fail
// (bootstrap_timeline probably also needs it?)
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -503,21 +514,49 @@ impl PageServerHandler {
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let doit = async {
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(
&mut copyin_reader,
base_lsn,
self.broker_client.clone(),
&ctx,
)
.await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
real_timeline_not_in_tenants_map
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.await?;
// Read the end of the tar archive.
read_tar_eof(copyin_reader).await?;
// Read the end of the tar archive.
read_tar_eof(copyin_reader).await?;
anyhow::Ok(())
};
let placeholder_timeline = match doit.await {
Ok(()) => {
match guard.creation_complete_remove_uninit_marker_and_get_placeholder_timeline() {
Ok(placeholder_timeline) => placeholder_timeline,
Err(err) => {
error!(
"failed to remove uninit marker for new_timeline_id={timeline_id}: {err:#}"
);
return Err(QueryError::Other(err.context("remove uninit marker file")));
}
}
}
Err(e) => {
debug_assert_current_span_has_tenant_and_timeline_id();
guard.creation_failed();
return Err(QueryError::Other(e));
}
};
// todo share with Tenant::create_timeline
match tenant.timelines.lock().unwrap().entry(timeline_id) {
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
Entry::Occupied(mut o) => {
info!("replacing placeholder timeline with the real one");
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
assert!(compare_arced_timeline(&placeholder_timeline, o.get()));
let replaced_placeholder = o.insert(Arc::clone(&real_timeline_not_in_tenants_map));
assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline));
},
}
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
@@ -525,7 +564,9 @@ impl PageServerHandler {
// It wouldn't work if base came from vanilla postgres though,
// since we discard some log files.
info!("done");
info!("done, activating timeline");
real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx).await;
Ok(())
}

View File

@@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self) -> anyhow::Result<()> {
pub async fn flush(&mut self) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1116,13 +1116,15 @@ impl<'a> DatadirModification<'a> {
return Ok(());
}
let writer = self.tline.writer();
let writer = self.tline.writer().await;
let mut layer_map = self.tline.layers.write().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut result: anyhow::Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, self.lsn, value);
result = writer.put_locked(key, self.lsn, value, &mut layer_map);
false
} else {
true
@@ -1143,17 +1145,17 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer();
pub async fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value)?;
writer.put(key, lsn, &value).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
writer.delete(key_range, lsn).await?;
}
writer.finish_write(lsn);
@@ -1594,16 +1596,18 @@ fn is_slru_block_key(key: Key) -> bool {
}
#[cfg(test)]
pub fn create_test_timeline(
pub async fn create_test_timeline(
tenant: &crate::tenant::Tenant,
timeline_id: utils::id::TimelineId,
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
let tline = tenant
.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)
.await?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;
m.commit().await?;
Ok(tline)
}

View File

@@ -270,6 +270,8 @@ pub enum TaskKind {
DebugTool,
CreateTimeline,
#[cfg(test)]
UnitTest,
}

File diff suppressed because it is too large Load Diff

View File

@@ -10,6 +10,7 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -19,7 +20,10 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
TimelineLoadCause,
};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::fs_ext::PathExt;
@@ -119,6 +123,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
TimelineLoadCause::Startup,
&ctx,
) {
Ok(tenant) => {
@@ -154,6 +159,7 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
cause: TimelineLoadCause,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -170,6 +176,7 @@ pub fn schedule_local_tenant_processing(
})?,
"Cannot load tenant from empty directory {tenant_path:?}"
);
// TODO ensure there's no uninit mark / handle it correctly during ignore and load
let tenant_id = tenant_path
.file_name()
@@ -207,7 +214,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, cause, ctx)
};
Ok(tenant)
}
@@ -222,6 +229,7 @@ pub fn schedule_local_tenant_processing(
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
pub async fn shutdown_all_tenants() {
// Prevent new tenants from being created.
let tenants_to_shut_down = {
@@ -244,12 +252,55 @@ pub async fn shutdown_all_tenants() {
}
};
// Set tenant (and its timlines) to Stoppping state.
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we git SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for othher tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (_, tenant) in tenants_to_shut_down {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping();
tenants_to_freeze_and_flush.push(tenant);
join_set.spawn(async move {
match tenant.set_stopping().await {
Ok(()) => Ok(tenant),
Err(e) => Err((tenant, e)),
}
});
}
while let Some(res) = join_set.join_next().await {
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls set_stopping() panicked, don't know which tenant this is, and probably freeze_and_flush won't work anyways: {join_error:#}");
}
Ok(retval) => match retval {
Ok(tenant) => {
// success
debug!("tenant successfully stopped: {}", tenant.tenant_id);
tenants_to_freeze_and_flush.push(tenant);
}
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
Err((tenant, SetStoppingError::AlreadyStopping)) => {
tenants_to_freeze_and_flush.push(tenant);
}
Err((tenant, SetStoppingError::Broken)) => {
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id);
tenants_to_freeze_and_flush.push(tenant);
}
},
}
}
@@ -266,8 +317,9 @@ pub async fn shutdown_all_tenants() {
// On error, log it but continue with the shutdown for other tenants.
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
debug!("freeze_and_flush tenant {tenant_id}");
// TODO this could probably run in a JoinSet as well?
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
@@ -291,7 +343,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, TimelineLoadCause::TenantCreate, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -437,7 +489,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, TimelineLoadCause::TenantLoad, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -510,7 +562,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), TimelineLoadCause::Attach ,ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -589,13 +641,23 @@ where
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping(),
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure
}
Err(SetStoppingError::Broken) => {
// continue the procedure, let's hope the closure can deal with broken tenants
}
Err(SetStoppingError::AlreadyStopping) => {
// the tenant is already stopping or broken, don't do anything
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
None => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -620,7 +682,7 @@ where
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(e.to_string());
tenant.set_broken(e.to_string()).await;
}
None => {
warn!("Tenant {tenant_id} got removed from memory");

View File

@@ -1264,7 +1264,12 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = runtime.block_on(tenant.create_test_timeline(
TIMELINE_ID,
Lsn(0),
DEFAULT_PG_VERSION,
&ctx,
))?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -304,7 +304,7 @@ impl InMemoryLayer {
Ok(())
}
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())

View File

@@ -28,7 +28,7 @@ use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::context::{DownloadBehavior, RequestContext};
@@ -63,13 +63,13 @@ use utils::{
simple_rcu::{Rcu, RcuReadGuard},
};
use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{import_datadir, page_cache};
use crate::{is_temporary, task_mgr};
pub(super) use self::eviction_task::EvictionTaskTenantState;
@@ -119,7 +119,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -179,7 +179,7 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
write_lock: tokio::sync::Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
flush_loop_state: Mutex<FlushLoopState>,
@@ -238,6 +238,8 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
}
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
/// Internal structure to hold all data needed for logical size calculation.
///
/// Calculation consists of two stages:
@@ -572,8 +574,8 @@ impl Timeline {
/// The sum of the file size of all historic layers in the layer map.
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().unwrap();
pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await;
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
@@ -664,12 +666,25 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false);
if self.current_state() == TimelineState::Creating {
debug!("timelines in Creating state are never written to");
assert!(
self.layers.read().await.open_layer.is_none(),
"would have nothing to flush anyways"
);
return Ok(());
}
self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait().await
}
/// Outermost timeline compaction operation; downloads needed layers.
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
if self.current_state() == TimelineState::Creating {
debug!("timelines is in Creating state");
return Ok(());
}
const ROUNDS: usize = 2;
let last_record_lsn = self.get_last_record_lsn();
@@ -844,10 +859,10 @@ impl Timeline {
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
pub async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
_write_guard: self.write_lock.lock().await,
}
}
@@ -881,9 +896,9 @@ impl Timeline {
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
@@ -905,7 +920,7 @@ impl Timeline {
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.freeze_inmem_layer(true).await;
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
@@ -916,13 +931,31 @@ impl Timeline {
Ok(())
}
pub fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
pub async fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
if self.current_state() == TimelineState::Creating {
panic!("timelines in Creating state are never activated");
}
self.maybe_spawn_flush_loop();
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.set_state(TimelineState::Active).await;
self.launch_eviction_task();
}
pub fn set_state(&self, new_state: TimelineState) {
pub async fn set_state(&self, new_state: TimelineState) {
if self.current_state() == TimelineState::Creating {
info!("timelines in Creating state are never activated, nothing to stop");
assert_eq!(
*self.flush_loop_state.lock().unwrap(),
FlushLoopState::NotStarted
);
assert!(
self.layers.read().await.open_layer.is_none(),
"would have nothing to flush anyways"
);
assert!(self.walreceiver.lock().unwrap().is_none());
// TODO: assert other tasks launched in activate are not running
return;
}
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
@@ -962,6 +995,14 @@ impl Timeline {
loop {
let current_state = *receiver.borrow_and_update();
match current_state {
TimelineState::Creating => {
// A timeline _object_ in state Creating never transitions out of it.
// It gets replaced by another object in Loading state once creation is done.
// So, `self` is not the right object to subscribe to.
// Luckily, there's no code path that calls this function.
// But let's error out instead of an unreachable, just to be on the safe side.
return Err(current_state);
}
TimelineState::Loading => {
receiver
.changed()
@@ -979,8 +1020,8 @@ impl Timeline {
}
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1002,7 +1043,7 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
@@ -1015,7 +1056,7 @@ impl Timeline {
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1100,7 +1141,7 @@ impl Timeline {
}
// start the batch update
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1305,7 +1346,6 @@ impl Timeline {
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
}
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
@@ -1318,11 +1358,16 @@ impl Timeline {
timeline_id: TimelineId,
tenant_id: TenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
remote_client: Option<RemoteTimelineClient>,
remote_client: Option<Arc<RemoteTimelineClient>>,
pg_version: u32,
is_create_placeholder: bool,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Loading);
let (state, _) = watch::channel(if is_create_placeholder {
TimelineState::Creating
} else {
TimelineState::Loading
});
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
@@ -1344,13 +1389,13 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
layers: tokio::sync::RwLock::new(LayerMap::default()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
walreceiver: Mutex::new(None),
remote_client: remote_client.map(Arc::new),
remote_client,
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
@@ -1366,6 +1411,7 @@ impl Timeline {
ancestor_lsn: metadata.ancestor_lsn(),
metrics: TimelineMetrics::new(
is_create_placeholder,
&tenant_id,
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
@@ -1379,7 +1425,7 @@ impl Timeline {
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo {
@@ -1513,12 +1559,39 @@ impl Timeline {
));
}
/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self: &Arc<Self>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
import_datadir::import_basebackup_from_tar(self, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")?;
// Flush loop needs to be spawned in order to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
self.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
});
self.freeze_and_flush()
.await
.context("Failed to flush after basebackup import")?;
Ok(())
}
///
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1647,7 +1720,7 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1800,7 +1873,7 @@ impl Timeline {
let local_layers = self
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
@@ -2015,6 +2088,12 @@ impl Timeline {
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if self.current_state() == TimelineState::Creating {
return Err(CalculateLogicalSizeError::Other(anyhow!(
"cannot calculate logical size for timeline in Creating state"
)));
}
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
@@ -2035,7 +2114,8 @@ impl Timeline {
TimelineState::Active => continue,
TimelineState::Broken
| TimelineState::Stopping
| TimelineState::Loading => {
| TimelineState::Loading
| TimelineState::Creating => {
break format!("aborted because timeline became inactive (new state: {new_state:?})")
}
}
@@ -2152,8 +2232,8 @@ impl Timeline {
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
@@ -2360,7 +2440,7 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
let layers = timeline.layers.read().await;
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2539,9 +2619,16 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().unwrap();
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await;
self.get_layer_for_write_locked(lsn, &mut layers)
}
fn get_layer_for_write_locked(
&self,
lsn: Lsn,
layers: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
let last_record_lsn = self.get_last_record_lsn();
@@ -2584,16 +2671,29 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val)?;
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
fn put_value_locked(
&self,
key: Key,
lsn: Lsn,
val: &Value,
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write_locked(lsn, pre_locked_layer_map)?;
layer.put_value(key, lsn, val)?;
Ok(())
}
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn).await?;
layer.put_tombstone(key_range, lsn).await?;
Ok(())
}
@@ -2605,15 +2705,15 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().unwrap())
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
@@ -2651,7 +2751,7 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
@@ -2743,7 +2843,7 @@ impl Timeline {
.await?
} else {
// normal case, write out a L0 delta layer file.
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
HashMap::from([(delta_path, metadata)])
};
@@ -2752,7 +2852,7 @@ impl Timeline {
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now.
{
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
@@ -2846,7 +2946,7 @@ impl Timeline {
}
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
async fn create_delta_layer(
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
@@ -2870,7 +2970,7 @@ impl Timeline {
// Add it to the layer map
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
@@ -2922,10 +3022,14 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
async fn time_for_new_image_layer(
&self,
partition: &KeySpace,
lsn: Lsn,
) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut max_deltas = 0;
{
@@ -3020,7 +3124,7 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
start = img_range.end;
if force || self.time_for_new_image_layer(partition, lsn)? {
if force || self.time_for_new_image_layer(partition, lsn).await? {
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -3098,7 +3202,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
@@ -3165,9 +3269,8 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
@@ -3288,7 +3391,6 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3525,7 +3627,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3733,6 +3835,11 @@ impl Timeline {
let now = SystemTime::now();
let mut result: GcResult = GcResult::default();
if self.current_state() == TimelineState::Creating {
debug!("timeline creating placeholder does not need GC");
return Ok(GcResult::default());
}
// Nothing to GC. Return early.
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
if latest_gc_cutoff >= new_gc_cutoff {
@@ -3785,7 +3892,7 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4081,7 +4188,7 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut layers = self_clone.layers.write().await;
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
@@ -4239,7 +4346,7 @@ impl Timeline {
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
@@ -4341,8 +4448,8 @@ impl LocalLayerInfoForDiskUsageEviction {
}
impl Timeline {
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().unwrap();
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await;
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
@@ -4414,7 +4521,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// but will cause large code changes.
pub struct TimelineWriter<'a> {
tl: &'a Timeline,
_write_guard: MutexGuard<'a, ()>,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
impl Deref for TimelineWriter<'_> {
@@ -4430,12 +4537,23 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value)
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value).await
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn)
pub fn put_locked(
&self,
key: Key,
lsn: Lsn,
value: &Value,
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
) -> anyhow::Result<()> {
self.tl
.put_value_locked(key, lsn, value, pre_locked_layer_map)
}
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn).await
}
/// Track the end of the latest digested WAL record.

View File

@@ -185,7 +185,7 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {

View File

@@ -148,6 +148,7 @@ pub(super) async fn connection_manager_loop_step(
Ok(()) => {
let new_state = connection_manager_state.timeline.current_state();
match new_state {
TimelineState::Creating => unreachable!("walreceiver should never be launched on a timeline in Creating state"),
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Stopping => {
@@ -1309,6 +1310,7 @@ mod tests {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.await
.expect("Failed to create an empty timeline for dummy wal connection manager");
ConnectionManagerState {

View File

@@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection(
}
}
timeline.check_checkpoint_distance().with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn =

View File

@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -1200,7 +1200,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit()?;
m.commit().await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
Ok(walingest)
@@ -1209,7 +1209,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1217,22 +1217,22 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1318,7 +1318,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1360,7 +1360,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
@@ -1373,7 +1373,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
@@ -1398,7 +1398,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
@@ -1428,14 +1428,14 @@ mod tests {
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1454,7 +1454,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit()?;
m.commit().await?;
// Check that rel is not visible anymore
assert_eq!(
@@ -1472,7 +1472,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1497,7 +1497,7 @@ mod tests {
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1509,7 +1509,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
@@ -1554,7 +1554,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check reported size and contents after truncation
assert_eq!(
@@ -1603,7 +1603,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
assert_eq!(
tline
@@ -1637,7 +1637,7 @@ mod tests {
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut lsn = 0x10;
@@ -1648,7 +1648,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
.await?;
m.commit()?;
m.commit().await?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1664,7 +1664,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
@@ -1677,7 +1677,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
@@ -1693,7 +1693,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*could not load tenant.*load local timeline.*",
".*load failed.*load local timeline.*",
]
)
@@ -172,8 +172,10 @@ def test_timeline_create_break_after_uninit_mark(neon_simple_env: NeonEnv):
# Introduce failpoint when creating a new timeline uninit mark, before any other files were created
pageserver_http.configure_failpoints(("after-timeline-uninit-mark-creation", "return"))
with pytest.raises(Exception, match="after-timeline-uninit-mark-creation"):
with pytest.raises(Exception, match="create timeline files"):
_ = env.neon_cli.create_timeline("test_timeline_create_break_after_uninit_mark", tenant_id)
env.pageserver.allowed_errors.append(".*InternalServerError.*create timeline files")
env.pageserver.allowed_errors.append(".*hitting failpoint after-timeline-uninit-mark-creation")
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
# "New" timeline is not present in the list, allowing pageserver to retry the same request

View File

@@ -128,17 +128,24 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
)
# Importing empty file fails
log.info("importing empty_file")
empty_file = os.path.join(test_output_dir, "empty_file")
with open(empty_file, "w") as _:
with pytest.raises(Exception):
import_tar(empty_file, empty_file)
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
# Importing corrupt backup fails
log.info("importing corrupt_base_tar")
with pytest.raises(Exception):
import_tar(corrupt_base_tar, wal_tar)
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
# A tar with trailing garbage is currently accepted. It prints a warnings
# to the pageserver log, however. Check that.
log.info("importing base_plus_garbage_tar")
import_tar(base_plus_garbage_tar, wal_tar)
assert env.pageserver.log_contains(
".*WARN.*ignored .* unexpected bytes after the tar archive.*"

View File

@@ -2,12 +2,11 @@
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import os
import queue
import shutil
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Tuple
import pytest
from fixtures.log_helper import log
@@ -140,7 +139,7 @@ def test_remote_storage_backup_and_restore(
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*error attaching tenant: storage-sync-list-remote-timelines",
".*attach failed.*: storage-sync-list-remote-timelines",
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,
@@ -656,21 +655,26 @@ def test_empty_branch_remote_storage_upload(
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_empty_branch_remote_storage_upload_on_restart(
def test_empty_branch_remote_storage_upload_failure(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
"""
Branches off a root branch, but does not write anything to the new branch, so
it has a metadata file only.
Branching is not acknowledged until the index_part.json is uploaded.
Ensures the branch is not on the remote storage and restarts the pageserver
— the upload should be scheduled by load, and create_timeline should await
for it even though it gets 409 Conflict.
Fails the index_part.json upload with a failpoint.
Ensures that timeline creation fails because of that.
Stops the pageserver.
Restarts it, still with failpoint enabled.
Waits for tenant to finish loading.
Ensures the timeline does not exist locally nor remotely.
Disables the failpoint.
Ensures that timeline can be created.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_empty_branch_remote_storage_upload_on_restart",
test_name="test_empty_branch_remote_storage_upload_failures",
)
env = neon_env_builder.init_start()
@@ -696,12 +700,21 @@ def test_empty_branch_remote_storage_upload_on_restart(
# index upload is now hitting the failpoint, should not block the shutdown
env.pageserver.stop()
env.pageserver.allowed_errors.append(
f".*failed to create on-disk state for new_timeline_id={new_branch_timeline_id}.*wait for initial uploads to complete.*upload queue was stopped"
)
timeline_path = (
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
)
uninit_marker_path = env.repo_dir / timeline_path.with_suffix(".___uninit")
local_metadata = env.repo_dir / timeline_path / "metadata"
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
assert (
not uninit_marker_path.exists()
), "uninit marker should be deleted during orderly shutdown"
assert not (
env.repo_dir / timeline_path
).exists(), "unfinished timeline dir should be deleted during orderly shutdown"
assert isinstance(env.remote_storage, LocalFsStorage)
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
@@ -709,54 +722,26 @@ def test_empty_branch_remote_storage_upload_on_restart(
not new_branch_on_remote_storage.exists()
), "failpoint should had prohibited index_part.json upload"
# during reconciliation we should had scheduled the uploads and on the
# retried create_timeline, we will await for those to complete on next
# client.timeline_create
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
# restart without failpoint
env.pageserver.start()
# sleep a bit to force the upload task go into exponential backoff
time.sleep(1)
wait_until_tenant_state(client, env.initial_tenant, "Active", 5)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
# retry creation
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
)
def create_in_background():
barrier.wait()
try:
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
)
q.put(None)
except PageserverApiException as e:
q.put(e)
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
create_thread = threading.Thread(target=create_in_background)
create_thread.start()
try:
# maximize chances of actually waiting for the uploads by create_timeline
barrier.wait()
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
client.configure_failpoints(("before-upload-index", "off"))
conflict = q.get()
assert conflict, "create_timeline should not have succeeded"
assert (
conflict.status_code == 409
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
assert (
new_branch_on_remote_storage / "index_part.json"
).is_file(), "uploads scheduled during initial load should had been awaited for"
finally:
create_thread.join()
assert (env.repo_dir / timeline_path).exists()
assert not uninit_marker_path.exists()
assert (
new_branch_on_remote_storage / "index_part.json"
).is_file(), "uploads scheduled during initial load should had been awaited for"
def wait_upload_queue_empty(

View File

@@ -647,7 +647,9 @@ def test_ignored_tenant_stays_broken_without_metadata(
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)

View File

@@ -20,8 +20,10 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
last_flush_lsn_upload,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -249,8 +251,12 @@ def test_pageserver_metrics_removed_after_detach(
tenant_1, _ = env.neon_cli.create_tenant()
tenant_2, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_1)
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_2)
tenant_1_timeline = env.neon_cli.create_timeline(
"test_metrics_removed_after_detach", tenant_id=tenant_1
)
tenant_2_timeline = env.neon_cli.create_timeline(
"test_metrics_removed_after_detach", tenant_id=tenant_2
)
endpoint_tenant1 = env.endpoints.create_start(
"test_metrics_removed_after_detach", tenant_id=tenant_1
@@ -259,13 +265,17 @@ def test_pageserver_metrics_removed_after_detach(
"test_metrics_removed_after_detach", tenant_id=tenant_2
)
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
for endpoint, timeline_id in [
(endpoint_tenant1, tenant_1_timeline),
(endpoint_tenant2, tenant_2_timeline),
]:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (5000050000,)
last_flush_lsn_upload(env, endpoint, endpoint.tenant_id, timeline_id)
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
ps_metrics = env.pageserver.http_client().get_metrics()
@@ -308,9 +318,7 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(
".*could not load tenant.*Failed to list timelines directory.*"
)
env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*")
client = env.pageserver.http_client()
@@ -341,9 +349,15 @@ def test_pageserver_with_empty_tenants(
env.pageserver.start()
client = env.pageserver.http_client()
tenants = client.tenant_list()
assert len(tenants) == 2
def not_loading():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
wait_until(10, 0.2, not_loading)
tenants = client.tenant_list()
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
@@ -355,7 +369,7 @@ def test_pageserver_with_empty_tenants(
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert (

View File

@@ -272,7 +272,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
".*Ignoring new state, equal to the existing one: Stopping"
)
env.pageserver.allowed_errors.append(
".*during shutdown: cannot flush frozen layers when flush_loop is not running, state is Exited"
".*shutdown_pageserver:.*freeze_and_flush.*cannot flush frozen layers when flush_loop is not running, state is Exited"
)
ps_http = env.pageserver.http_client()