mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
67 Commits
vk/compile
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c1d9cb88ae | ||
|
|
20a9356729 | ||
|
|
163ec3ccd1 | ||
|
|
eba6c00de4 | ||
|
|
9d3267e474 | ||
|
|
6b25cb5030 | ||
|
|
f91ad65fb3 | ||
|
|
9a4789ec73 | ||
|
|
72159ee686 | ||
|
|
be74662d05 | ||
|
|
e7c4ef9f4f | ||
|
|
13d3f4c29f | ||
|
|
67258af8a2 | ||
|
|
17ba307004 | ||
|
|
e1486444d6 | ||
|
|
c6f9b8f318 | ||
|
|
ba3e3bdddf | ||
|
|
71f9bbef0d | ||
|
|
4680f8c60b | ||
|
|
3c1fc2617c | ||
|
|
60cc197ce3 | ||
|
|
609a929968 | ||
|
|
f2abc4c933 | ||
|
|
b09beaa4fe | ||
|
|
1367e2b0ee | ||
|
|
122e23071b | ||
|
|
696c6ed6ff | ||
|
|
0874e27023 | ||
|
|
6fe39ecbf7 | ||
|
|
a0c2a85505 | ||
|
|
dd0f5c4ef3 | ||
|
|
de780d2e0f | ||
|
|
f18d9f555b | ||
|
|
05a2fe08d1 | ||
|
|
eaf270c648 | ||
|
|
ddad0928c5 | ||
|
|
96c550222b | ||
|
|
cf8ff7edad | ||
|
|
da6573f551 | ||
|
|
2fee8c884f | ||
|
|
fe4ef121b6 | ||
|
|
641ca994dc | ||
|
|
413598b19b | ||
|
|
b345f32e3f | ||
|
|
69cfa9fe61 | ||
|
|
2c424c8f4e | ||
|
|
4001f441c0 | ||
|
|
ef956c47fc | ||
|
|
8606b6abe5 | ||
|
|
732f60317b | ||
|
|
b54431bbd3 | ||
|
|
def5eb8542 | ||
|
|
07da786ed3 | ||
|
|
75c3c43b2e | ||
|
|
bdf03eab58 | ||
|
|
32c85fa87a | ||
|
|
b2e0c58a8c | ||
|
|
94f30f0660 | ||
|
|
a55d224923 | ||
|
|
4f586ac101 | ||
|
|
feb2e80b83 | ||
|
|
ee22e81583 | ||
|
|
3e604eaa39 | ||
|
|
8bcb542a3b | ||
|
|
17b081d294 | ||
|
|
d5337e6a65 | ||
|
|
cc96a5186d |
@@ -18,7 +18,29 @@ use crate::reltag::RelTag;
|
||||
use anyhow::bail;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
/// The state of a tenant in this pageserver.
|
||||
///
|
||||
/// ```mermaid
|
||||
/// stateDiagram-v2
|
||||
///
|
||||
/// [*] --> Loading: spawn_load()
|
||||
/// [*] --> Attaching: spawn_attach()
|
||||
///
|
||||
/// Loading --> Activating: activate()
|
||||
/// Attaching --> Activating: activate()
|
||||
/// Activating --> Active: infallible
|
||||
///
|
||||
/// Loading --> Broken: load() failure
|
||||
/// Attaching --> Broken: attach() failure
|
||||
///
|
||||
/// Active --> Stopping: set_stopping(), part of shutdown & detach
|
||||
/// Stopping --> Broken: late error in remove_tenant_from_memory
|
||||
///
|
||||
/// Broken --> [*]: ignore / detach / shutdown
|
||||
/// Stopping --> [*]: remove_from_memory complete
|
||||
///
|
||||
/// Active --> Broken: cfg(testing)-only tenant break point
|
||||
/// ```
|
||||
#[derive(
|
||||
Clone,
|
||||
PartialEq,
|
||||
@@ -33,17 +55,38 @@ use bytes::{BufMut, Bytes, BytesMut};
|
||||
)]
|
||||
#[serde(tag = "slug", content = "data")]
|
||||
pub enum TenantState {
|
||||
/// This tenant is being loaded from local disk
|
||||
/// This tenant is being loaded from local disk.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Loading,
|
||||
/// This tenant is being downloaded from cloud storage.
|
||||
/// This tenant is being attached to the pageserver.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Attaching,
|
||||
/// Tenant is fully operational
|
||||
/// The tenant is transitioning from Loading/Attaching to Active.
|
||||
///
|
||||
/// While in this state, the individual timelines are being activated.
|
||||
///
|
||||
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
|
||||
Activating,
|
||||
/// The tenant has finished activating and is open for business.
|
||||
///
|
||||
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
|
||||
Active,
|
||||
/// A tenant is recognized by pageserver, but it is being detached or the
|
||||
/// The tenant is recognized by pageserver, but it is being detached or the
|
||||
/// system is being shut down.
|
||||
///
|
||||
/// Transitions out of this state are possible through `set_broken()`.
|
||||
Stopping,
|
||||
/// A tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations, because it failed to be activated.
|
||||
/// The tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations.
|
||||
///
|
||||
/// If the tenant fails to load or attach, it will transition to this state
|
||||
/// and it is guaranteed that no background tasks are running in its name.
|
||||
///
|
||||
/// The other way to transition into this state is from `Stopping` state
|
||||
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
|
||||
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
|
||||
Broken { reason: String, backtrace: String },
|
||||
}
|
||||
|
||||
@@ -60,6 +103,7 @@ impl TenantState {
|
||||
// tenant mgr startup distinguishes attaching from loading via marker file.
|
||||
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
||||
Self::Loading => Attached,
|
||||
Self::Activating => todo!(),
|
||||
// We only reach Active after successful load / attach.
|
||||
// So, call atttachment status Attached.
|
||||
Self::Active => Attached,
|
||||
@@ -101,6 +145,7 @@ impl std::fmt::Debug for TenantState {
|
||||
/// A state of a timeline in pageserver's memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TimelineState {
|
||||
Creating,
|
||||
/// The timeline is recognized by the pageserver but is not yet operational.
|
||||
/// In particular, the walreceiver connection loop is not running for this timeline.
|
||||
/// It will eventually transition to state Active or Broken.
|
||||
|
||||
@@ -512,7 +512,7 @@ async fn collect_eviction_candidates(
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
|
||||
@@ -212,7 +212,7 @@ async fn build_timeline_info(
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
||||
let mut info = build_timeline_info_common(timeline, ctx).await?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
@@ -230,7 +230,7 @@ async fn build_timeline_info(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_info_common(
|
||||
async fn build_timeline_info_common(
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
@@ -261,7 +261,7 @@ fn build_timeline_info_common(
|
||||
None
|
||||
}
|
||||
};
|
||||
let current_physical_size = Some(timeline.layer_size_sum());
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
|
||||
@@ -321,6 +321,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
Ok(Some(new_timeline)) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
@@ -551,7 +552,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
current_physical_size += timeline.layer_size_sum().await;
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
@@ -655,7 +656,7 @@ async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let layer_map_info = timeline.layer_map_info(reset);
|
||||
let layer_map_info = timeline.layer_map_info(reset).await;
|
||||
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
@@ -859,7 +860,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
.await
|
||||
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
|
||||
|
||||
tenant.set_broken("broken from test".to_owned());
|
||||
tenant.set_broken("broken from test".to_owned()).await;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
}
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
tokio_tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -594,7 +594,7 @@ async fn import_file(
|
||||
// zenith.signal is not necessarily the last file, that we handle
|
||||
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
||||
// will update lsn once more to the final one.
|
||||
let writer = modification.tline.writer();
|
||||
let writer = modification.tline.writer().await;
|
||||
writer.finish_write(prev_lsn);
|
||||
|
||||
debug!("imported zenith signal {}", prev_lsn);
|
||||
|
||||
@@ -24,7 +24,7 @@ pub mod walredo;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use tracing::info;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
/// Current storage format version
|
||||
///
|
||||
@@ -45,6 +45,7 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub use crate::metrics::preinitialize_metrics;
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn shutdown_pageserver(exit_code: i32) {
|
||||
// Shut down the libpq endpoint task. This prevents new connections from
|
||||
// being accepted.
|
||||
|
||||
@@ -720,6 +720,7 @@ impl StorageTimeMetrics {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TimelineMetrics {
|
||||
fake: bool,
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
pub reconstruct_time_histo: Histogram,
|
||||
@@ -744,6 +745,7 @@ pub struct TimelineMetrics {
|
||||
|
||||
impl TimelineMetrics {
|
||||
pub fn new(
|
||||
fake: bool,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder,
|
||||
@@ -797,7 +799,8 @@ impl TimelineMetrics {
|
||||
let evictions_with_low_residence_duration =
|
||||
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
|
||||
|
||||
TimelineMetrics {
|
||||
let m = TimelineMetrics {
|
||||
fake,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
reconstruct_time_histo,
|
||||
@@ -819,12 +822,16 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration: std::sync::RwLock::new(
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
impl Drop for TimelineMetrics {
|
||||
fn drop(&mut self) {
|
||||
if fake {
|
||||
m.remove_metrics();
|
||||
}
|
||||
|
||||
m
|
||||
}
|
||||
|
||||
fn remove_metrics(&self) {
|
||||
let tenant_id = &self.tenant_id;
|
||||
let timeline_id = &self.timeline_id;
|
||||
let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]);
|
||||
@@ -860,6 +867,14 @@ impl Drop for TimelineMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineMetrics {
|
||||
fn drop(&mut self) {
|
||||
if !self.fake {
|
||||
self.remove_metrics();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
|
||||
let tid = tenant_id.to_string();
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
|
||||
@@ -14,6 +14,7 @@ use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
@@ -24,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::pin::pin;
|
||||
@@ -51,6 +53,8 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant;
|
||||
use crate::tenant::compare_arced_timeline;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::GetTenantError;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
@@ -489,7 +493,14 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
|
||||
let (guard, real_timeline_not_in_tenants_map) = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO spawn flush loop of timeline early (before activation),
|
||||
// but then we need to take care of shutting it down in case we fail
|
||||
// (bootstrap_timeline probably also needs it?)
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
@@ -503,21 +514,49 @@ impl PageServerHandler {
|
||||
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
let doit = async {
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
timeline
|
||||
.import_basebackup_from_tar(
|
||||
&mut copyin_reader,
|
||||
base_lsn,
|
||||
self.broker_client.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
real_timeline_not_in_tenants_map
|
||||
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
|
||||
.await?;
|
||||
|
||||
// Read the end of the tar archive.
|
||||
read_tar_eof(copyin_reader).await?;
|
||||
// Read the end of the tar archive.
|
||||
read_tar_eof(copyin_reader).await?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
let placeholder_timeline = match doit.await {
|
||||
Ok(()) => {
|
||||
match guard.creation_complete_remove_uninit_marker_and_get_placeholder_timeline() {
|
||||
Ok(placeholder_timeline) => placeholder_timeline,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"failed to remove uninit marker for new_timeline_id={timeline_id}: {err:#}"
|
||||
);
|
||||
return Err(QueryError::Other(err.context("remove uninit marker file")));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
guard.creation_failed();
|
||||
return Err(QueryError::Other(e));
|
||||
}
|
||||
};
|
||||
|
||||
// todo share with Tenant::create_timeline
|
||||
match tenant.timelines.lock().unwrap().entry(timeline_id) {
|
||||
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
|
||||
Entry::Occupied(mut o) => {
|
||||
info!("replacing placeholder timeline with the real one");
|
||||
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
|
||||
assert!(compare_arced_timeline(&placeholder_timeline, o.get()));
|
||||
let replaced_placeholder = o.insert(Arc::clone(&real_timeline_not_in_tenants_map));
|
||||
assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline));
|
||||
},
|
||||
}
|
||||
|
||||
// TODO check checksum
|
||||
// Meanwhile you can verify client-side by taking fullbackup
|
||||
@@ -525,7 +564,9 @@ impl PageServerHandler {
|
||||
// It wouldn't work if base came from vanilla postgres though,
|
||||
// since we discard some log files.
|
||||
|
||||
info!("done");
|
||||
info!("done, activating timeline");
|
||||
real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self) -> anyhow::Result<()> {
|
||||
pub async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -1116,13 +1116,15 @@ impl<'a> DatadirModification<'a> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let writer = self.tline.writer();
|
||||
let writer = self.tline.writer().await;
|
||||
|
||||
let mut layer_map = self.tline.layers.write().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: anyhow::Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
result = writer.put_locked(key, self.lsn, value, &mut layer_map);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
@@ -1143,17 +1145,17 @@ impl<'a> DatadirModification<'a> {
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer();
|
||||
pub async fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer().await;
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, &value)?;
|
||||
writer.put(key, lsn, &value).await?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn)?;
|
||||
writer.delete(key_range, lsn).await?;
|
||||
}
|
||||
|
||||
writer.finish_write(lsn);
|
||||
@@ -1594,16 +1596,18 @@ fn is_slru_block_key(key: Key) -> bool {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)
|
||||
.await?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
|
||||
@@ -270,6 +270,8 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
CreateTimeline,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -10,6 +10,7 @@ use tokio::fs;
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -19,7 +20,10 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
|
||||
use crate::tenant::{
|
||||
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
|
||||
TimelineLoadCause,
|
||||
};
|
||||
use crate::IGNORED_TENANT_FILE_NAME;
|
||||
|
||||
use utils::fs_ext::PathExt;
|
||||
@@ -119,6 +123,7 @@ pub async fn init_tenant_mgr(
|
||||
&tenant_dir_path,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
TimelineLoadCause::Startup,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
@@ -154,6 +159,7 @@ pub fn schedule_local_tenant_processing(
|
||||
tenant_path: &Path,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
cause: TimelineLoadCause,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
anyhow::ensure!(
|
||||
@@ -170,6 +176,7 @@ pub fn schedule_local_tenant_processing(
|
||||
})?,
|
||||
"Cannot load tenant from empty directory {tenant_path:?}"
|
||||
);
|
||||
// TODO ensure there's no uninit mark / handle it correctly during ignore and load
|
||||
|
||||
let tenant_id = tenant_path
|
||||
.file_name()
|
||||
@@ -207,7 +214,7 @@ pub fn schedule_local_tenant_processing(
|
||||
} else {
|
||||
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
||||
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
|
||||
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, cause, ctx)
|
||||
};
|
||||
Ok(tenant)
|
||||
}
|
||||
@@ -222,6 +229,7 @@ pub fn schedule_local_tenant_processing(
|
||||
/// That could be easily misinterpreted by control plane, the consumer of the
|
||||
/// management API. For example, it could attach the tenant on a different pageserver.
|
||||
/// We would then be in split-brain once this pageserver restarts.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn shutdown_all_tenants() {
|
||||
// Prevent new tenants from being created.
|
||||
let tenants_to_shut_down = {
|
||||
@@ -244,12 +252,55 @@ pub async fn shutdown_all_tenants() {
|
||||
}
|
||||
};
|
||||
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
// Since we can only transition into Stopping state after activation is complete,
|
||||
// run it in a JoinSet so all tenants have a chance to stop before we git SIGKILLed.
|
||||
//
|
||||
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
|
||||
// 1. Lock out any new requests to the tenants.
|
||||
// 2. Signal cancellation to WAL receivers (we wait on it below).
|
||||
// 3. Signal cancellation for othher tenant background loops.
|
||||
// 4. ???
|
||||
//
|
||||
// The waiting for the cancellation is not done uniformly.
|
||||
// We certainly wait for WAL receivers to shut down.
|
||||
// That is necessary so that no new data comes in before the freeze_and_flush.
|
||||
// But the tenant background loops are joined-on in our caller.
|
||||
// It's mesed up.
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
|
||||
for (_, tenant) in tenants_to_shut_down {
|
||||
if tenant.is_active() {
|
||||
// updates tenant state, forbidding new GC and compaction iterations from starting
|
||||
tenant.set_stopping();
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
join_set.spawn(async move {
|
||||
match tenant.set_stopping().await {
|
||||
Ok(()) => Ok(tenant),
|
||||
Err(e) => Err((tenant, e)),
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = join_set.join_next().await {
|
||||
match res {
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the futures");
|
||||
}
|
||||
Err(join_error) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("task that calls set_stopping() panicked, don't know which tenant this is, and probably freeze_and_flush won't work anyways: {join_error:#}");
|
||||
}
|
||||
Ok(retval) => match retval {
|
||||
Ok(tenant) => {
|
||||
// success
|
||||
debug!("tenant successfully stopped: {}", tenant.tenant_id);
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
|
||||
Err((tenant, SetStoppingError::AlreadyStopping)) => {
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
Err((tenant, SetStoppingError::Broken)) => {
|
||||
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id);
|
||||
tenants_to_freeze_and_flush.push(tenant);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,8 +317,9 @@ pub async fn shutdown_all_tenants() {
|
||||
// On error, log it but continue with the shutdown for other tenants.
|
||||
for tenant in tenants_to_freeze_and_flush {
|
||||
let tenant_id = tenant.tenant_id();
|
||||
debug!("shutdown tenant {tenant_id}");
|
||||
debug!("freeze_and_flush tenant {tenant_id}");
|
||||
|
||||
// TODO this could probably run in a JoinSet as well?
|
||||
if let Err(err) = tenant.freeze_and_flush().await {
|
||||
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
|
||||
}
|
||||
@@ -291,7 +343,7 @@ pub async fn create_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, TimelineLoadCause::TenantCreate, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -437,7 +489,7 @@ pub async fn load_tenant(
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, TimelineLoadCause::TenantLoad, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -510,7 +562,7 @@ pub async fn attach_tenant(
|
||||
.context("check for attach marker file existence")?;
|
||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), TimelineLoadCause::Attach ,ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -589,13 +641,23 @@ where
|
||||
{
|
||||
let tenants_accessor = TENANTS.write().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => match tenant.current_state() {
|
||||
TenantState::Attaching
|
||||
| TenantState::Loading
|
||||
| TenantState::Broken { .. }
|
||||
| TenantState::Active => tenant.set_stopping(),
|
||||
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
|
||||
},
|
||||
Some(tenant) => {
|
||||
let tenant = Arc::clone(tenant);
|
||||
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
||||
drop(tenants_accessor);
|
||||
match tenant.set_stopping().await {
|
||||
Ok(()) => {
|
||||
// we won, continue stopping procedure
|
||||
}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
// continue the procedure, let's hope the closure can deal with broken tenants
|
||||
}
|
||||
Err(SetStoppingError::AlreadyStopping) => {
|
||||
// the tenant is already stopping or broken, don't do anything
|
||||
return Err(TenantStateError::IsStopping(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
None => return Err(TenantStateError::NotFound(tenant_id)),
|
||||
}
|
||||
}
|
||||
@@ -620,7 +682,7 @@ where
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
tenant.set_broken(e.to_string());
|
||||
tenant.set_broken(e.to_string()).await;
|
||||
}
|
||||
None => {
|
||||
warn!("Tenant {tenant_id} got removed from memory");
|
||||
|
||||
@@ -1264,7 +1264,12 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = runtime.block_on(tenant.create_test_timeline(
|
||||
TIMELINE_ID,
|
||||
Lsn(0),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
))?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -304,7 +304,7 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -28,7 +28,7 @@ use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
@@ -63,13 +63,13 @@ use utils::{
|
||||
simple_rcu::{Rcu, RcuReadGuard},
|
||||
};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::repository::GcResult;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::METADATA_FILE_NAME;
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::{import_datadir, page_cache};
|
||||
use crate::{is_temporary, task_mgr};
|
||||
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
@@ -119,7 +119,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -179,7 +179,7 @@ pub struct Timeline {
|
||||
/// Locked automatically by [`TimelineWriter`] and checkpointer.
|
||||
/// Must always be acquired before the layer map/individual layer lock
|
||||
/// to avoid deadlock.
|
||||
write_lock: Mutex<()>,
|
||||
write_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
/// Used to avoid multiple `flush_loop` tasks running
|
||||
flush_loop_state: Mutex<FlushLoopState>,
|
||||
@@ -238,6 +238,8 @@ pub struct Timeline {
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
}
|
||||
|
||||
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
/// Calculation consists of two stages:
|
||||
@@ -572,8 +574,8 @@ impl Timeline {
|
||||
/// The sum of the file size of all historic layers in the layer map.
|
||||
/// This method makes no distinction between local and remote layers.
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size();
|
||||
@@ -664,12 +666,25 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
self.freeze_inmem_layer(false);
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
debug!("timelines in Creating state are never written to");
|
||||
assert!(
|
||||
self.layers.read().await.open_layer.is_none(),
|
||||
"would have nothing to flush anyways"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
self.freeze_inmem_layer(false).await;
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
debug!("timelines is in Creating state");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -844,10 +859,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub fn writer(&self) -> TimelineWriter<'_> {
|
||||
pub async fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
tl: self,
|
||||
_write_guard: self.write_lock.lock().unwrap(),
|
||||
_write_guard: self.write_lock.lock().await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -881,9 +896,9 @@ impl Timeline {
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
@@ -905,7 +920,7 @@ impl Timeline {
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.freeze_inmem_layer(true).await;
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
@@ -916,13 +931,31 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
|
||||
pub async fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
panic!("timelines in Creating state are never activated");
|
||||
}
|
||||
self.maybe_spawn_flush_loop();
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
self.set_state(TimelineState::Active);
|
||||
self.set_state(TimelineState::Active).await;
|
||||
self.launch_eviction_task();
|
||||
}
|
||||
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
pub async fn set_state(&self, new_state: TimelineState) {
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
info!("timelines in Creating state are never activated, nothing to stop");
|
||||
assert_eq!(
|
||||
*self.flush_loop_state.lock().unwrap(),
|
||||
FlushLoopState::NotStarted
|
||||
);
|
||||
assert!(
|
||||
self.layers.read().await.open_layer.is_none(),
|
||||
"would have nothing to flush anyways"
|
||||
);
|
||||
assert!(self.walreceiver.lock().unwrap().is_none());
|
||||
// TODO: assert other tasks launched in activate are not running
|
||||
return;
|
||||
}
|
||||
match (self.current_state(), new_state) {
|
||||
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
|
||||
warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
@@ -962,6 +995,14 @@ impl Timeline {
|
||||
loop {
|
||||
let current_state = *receiver.borrow_and_update();
|
||||
match current_state {
|
||||
TimelineState::Creating => {
|
||||
// A timeline _object_ in state Creating never transitions out of it.
|
||||
// It gets replaced by another object in Loading state once creation is done.
|
||||
// So, `self` is not the right object to subscribe to.
|
||||
// Luckily, there's no code path that calls this function.
|
||||
// But let's error out instead of an unreachable, just to be on the safe side.
|
||||
return Err(current_state);
|
||||
}
|
||||
TimelineState::Loading => {
|
||||
receiver
|
||||
.changed()
|
||||
@@ -979,8 +1020,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
in_memory_layers.push(open_layer.info());
|
||||
@@ -1002,7 +1043,7 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
if self.remote_client.is_none() {
|
||||
return Ok(Some(false));
|
||||
@@ -1015,7 +1056,7 @@ impl Timeline {
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1100,7 +1141,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// start the batch update
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
|
||||
let mut results = Vec::with_capacity(layers_to_evict.len());
|
||||
@@ -1305,7 +1346,6 @@ impl Timeline {
|
||||
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
|
||||
}
|
||||
}
|
||||
|
||||
/// Open a Timeline handle.
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
@@ -1318,11 +1358,16 @@ impl Timeline {
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
remote_client: Option<Arc<RemoteTimelineClient>>,
|
||||
pg_version: u32,
|
||||
is_create_placeholder: bool,
|
||||
) -> Arc<Self> {
|
||||
let disk_consistent_lsn = metadata.disk_consistent_lsn();
|
||||
let (state, _) = watch::channel(TimelineState::Loading);
|
||||
let (state, _) = watch::channel(if is_create_placeholder {
|
||||
TimelineState::Creating
|
||||
} else {
|
||||
TimelineState::Loading
|
||||
});
|
||||
|
||||
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
|
||||
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
|
||||
@@ -1344,13 +1389,13 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
layers: tokio::sync::RwLock::new(LayerMap::default()),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver: Mutex::new(None),
|
||||
|
||||
remote_client: remote_client.map(Arc::new),
|
||||
remote_client,
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
@@ -1366,6 +1411,7 @@ impl Timeline {
|
||||
ancestor_lsn: metadata.ancestor_lsn(),
|
||||
|
||||
metrics: TimelineMetrics::new(
|
||||
is_create_placeholder,
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
|
||||
@@ -1379,7 +1425,7 @@ impl Timeline {
|
||||
layer_flush_start_tx,
|
||||
layer_flush_done_tx,
|
||||
|
||||
write_lock: Mutex::new(()),
|
||||
write_lock: tokio::sync::Mutex::new(()),
|
||||
layer_removal_cs: Default::default(),
|
||||
|
||||
gc_info: std::sync::RwLock::new(GcInfo {
|
||||
@@ -1513,12 +1559,39 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
pub(crate) async fn import_basebackup_from_tar(
|
||||
self: &Arc<Self>,
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
import_datadir::import_basebackup_from_tar(self, copyin_read, base_lsn, ctx)
|
||||
.await
|
||||
.context("Failed to import basebackup")?;
|
||||
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
// We want to run proper checkpoint before we mark timeline as available to outside world
|
||||
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
|
||||
self.maybe_spawn_flush_loop();
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
self.freeze_and_flush()
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -1647,7 +1720,7 @@ impl Timeline {
|
||||
|
||||
// We're holding a layer map lock for a while but this
|
||||
// method is only called during init so it's fine.
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut updates = layer_map.batch_update();
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
@@ -1800,7 +1873,7 @@ impl Timeline {
|
||||
let local_layers = self
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.await
|
||||
.iter_historic_layers()
|
||||
.map(|l| (l.filename(), l))
|
||||
.collect::<HashMap<_, _>>();
|
||||
@@ -2015,6 +2088,12 @@ impl Timeline {
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
return Err(CalculateLogicalSizeError::Other(anyhow!(
|
||||
"cannot calculate logical size for timeline in Creating state"
|
||||
)));
|
||||
}
|
||||
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
|
||||
@@ -2035,7 +2114,8 @@ impl Timeline {
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken
|
||||
| TimelineState::Stopping
|
||||
| TimelineState::Loading => {
|
||||
| TimelineState::Loading
|
||||
| TimelineState::Creating => {
|
||||
break format!("aborted because timeline became inactive (new state: {new_state:?})")
|
||||
}
|
||||
}
|
||||
@@ -2152,8 +2232,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().await.iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
return Some(historic_layer);
|
||||
@@ -2360,7 +2440,7 @@ impl Timeline {
|
||||
#[allow(clippy::never_loop)] // see comment at bottom of this loop
|
||||
'layer_map_search: loop {
|
||||
let remote_layer = {
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
let layers = timeline.layers.read().await;
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
@@ -2539,9 +2619,16 @@ impl Timeline {
|
||||
///
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().await;
|
||||
self.get_layer_for_write_locked(lsn, &mut layers)
|
||||
}
|
||||
|
||||
fn get_layer_for_write_locked(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
layers: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -2584,16 +2671,29 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_tombstone(key_range, lsn)?;
|
||||
fn put_value_locked(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write_locked(lsn, pre_locked_layer_map)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_tombstone(key_range, lsn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2605,15 +2705,15 @@ impl Timeline {
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
// Freeze the current open in-memory layer. It will be written to disk on next
|
||||
// iteration.
|
||||
let _write_guard = if write_lock_held {
|
||||
None
|
||||
} else {
|
||||
Some(self.write_lock.lock().unwrap())
|
||||
Some(self.write_lock.lock().await)
|
||||
};
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
@@ -2651,7 +2751,7 @@ impl Timeline {
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
let layer_to_flush = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers.frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
@@ -2743,7 +2843,7 @@ impl Timeline {
|
||||
.await?
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
|
||||
HashMap::from([(delta_path, metadata)])
|
||||
};
|
||||
|
||||
@@ -2752,7 +2852,7 @@ impl Timeline {
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
@@ -2846,7 +2946,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Write out the given frozen in-memory layer as a new L0 delta file
|
||||
fn create_delta_layer(
|
||||
async fn create_delta_layer(
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
|
||||
@@ -2870,7 +2970,7 @@ impl Timeline {
|
||||
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
&batch_updates,
|
||||
@@ -2922,10 +3022,14 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
partition: &KeySpace,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<bool> {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_deltas = 0;
|
||||
{
|
||||
@@ -3020,7 +3124,7 @@ impl Timeline {
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
start = img_range.end;
|
||||
if force || self.time_for_new_image_layer(partition, lsn)? {
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await? {
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -3098,7 +3202,7 @@ impl Timeline {
|
||||
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
for l in image_layers {
|
||||
@@ -3165,9 +3269,8 @@ impl Timeline {
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
let threshold = self.get_compaction_threshold();
|
||||
@@ -3288,7 +3391,6 @@ impl Timeline {
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
@@ -3525,7 +3627,7 @@ impl Timeline {
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
@@ -3733,6 +3835,11 @@ impl Timeline {
|
||||
let now = SystemTime::now();
|
||||
let mut result: GcResult = GcResult::default();
|
||||
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
debug!("timeline creating placeholder does not need GC");
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
|
||||
// Nothing to GC. Return early.
|
||||
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
|
||||
if latest_gc_cutoff >= new_gc_cutoff {
|
||||
@@ -3785,7 +3892,7 @@ impl Timeline {
|
||||
// 4. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
// TODO holding a write lock is too agressive and avoidable
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
@@ -4081,7 +4188,7 @@ impl Timeline {
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut layers = self_clone.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
@@ -4239,7 +4346,7 @@ impl Timeline {
|
||||
) {
|
||||
let mut downloads = Vec::new();
|
||||
{
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
@@ -4341,8 +4448,8 @@ impl LocalLayerInfoForDiskUsageEviction {
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().unwrap();
|
||||
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_layer_size: Option<u64> = None;
|
||||
let mut resident_layers = Vec::new();
|
||||
@@ -4414,7 +4521,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
|
||||
// but will cause large code changes.
|
||||
pub struct TimelineWriter<'a> {
|
||||
tl: &'a Timeline,
|
||||
_write_guard: MutexGuard<'a, ()>,
|
||||
_write_guard: tokio::sync::MutexGuard<'a, ()>,
|
||||
}
|
||||
|
||||
impl Deref for TimelineWriter<'_> {
|
||||
@@ -4430,12 +4537,23 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value).await
|
||||
}
|
||||
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn)
|
||||
pub fn put_locked(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<()> {
|
||||
self.tl
|
||||
.put_value_locked(key, lsn, value, pre_locked_layer_map)
|
||||
}
|
||||
|
||||
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn).await
|
||||
}
|
||||
|
||||
/// Track the end of the latest digested WAL record.
|
||||
|
||||
@@ -185,7 +185,7 @@ impl Timeline {
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if hist_layer.is_remote_layer() {
|
||||
|
||||
@@ -148,6 +148,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
Ok(()) => {
|
||||
let new_state = connection_manager_state.timeline.current_state();
|
||||
match new_state {
|
||||
TimelineState::Creating => unreachable!("walreceiver should never be launched on a timeline in Creating state"),
|
||||
// we're already active as walreceiver, no need to reactivate
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken | TimelineState::Stopping => {
|
||||
@@ -1309,6 +1310,7 @@ mod tests {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
}
|
||||
|
||||
timeline.check_checkpoint_distance().with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let timeline_remote_consistent_lsn =
|
||||
|
||||
@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1200,7 +1200,7 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
||||
|
||||
Ok(walingest)
|
||||
@@ -1209,7 +1209,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1217,22 +1217,22 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
@@ -1318,7 +1318,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -1360,7 +1360,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
|
||||
@@ -1373,7 +1373,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
|
||||
@@ -1398,7 +1398,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
@@ -1428,14 +1428,14 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1454,7 +1454,7 @@ mod tests {
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
@@ -1472,7 +1472,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1497,7 +1497,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
@@ -1509,7 +1509,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
@@ -1554,7 +1554,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
@@ -1603,7 +1603,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -1637,7 +1637,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
@@ -1648,7 +1648,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -1664,7 +1664,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE
|
||||
@@ -1677,7 +1677,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE - 1
|
||||
@@ -1693,7 +1693,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
size as BlockNumber
|
||||
|
||||
@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*could not load tenant.*load local timeline.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -172,8 +172,10 @@ def test_timeline_create_break_after_uninit_mark(neon_simple_env: NeonEnv):
|
||||
|
||||
# Introduce failpoint when creating a new timeline uninit mark, before any other files were created
|
||||
pageserver_http.configure_failpoints(("after-timeline-uninit-mark-creation", "return"))
|
||||
with pytest.raises(Exception, match="after-timeline-uninit-mark-creation"):
|
||||
with pytest.raises(Exception, match="create timeline files"):
|
||||
_ = env.neon_cli.create_timeline("test_timeline_create_break_after_uninit_mark", tenant_id)
|
||||
env.pageserver.allowed_errors.append(".*InternalServerError.*create timeline files")
|
||||
env.pageserver.allowed_errors.append(".*hitting failpoint after-timeline-uninit-mark-creation")
|
||||
|
||||
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
|
||||
# "New" timeline is not present in the list, allowing pageserver to retry the same request
|
||||
|
||||
@@ -128,17 +128,24 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
)
|
||||
|
||||
# Importing empty file fails
|
||||
log.info("importing empty_file")
|
||||
empty_file = os.path.join(test_output_dir, "empty_file")
|
||||
with open(empty_file, "w") as _:
|
||||
with pytest.raises(Exception):
|
||||
import_tar(empty_file, empty_file)
|
||||
|
||||
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
|
||||
|
||||
# Importing corrupt backup fails
|
||||
log.info("importing corrupt_base_tar")
|
||||
with pytest.raises(Exception):
|
||||
import_tar(corrupt_base_tar, wal_tar)
|
||||
|
||||
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
|
||||
|
||||
# A tar with trailing garbage is currently accepted. It prints a warnings
|
||||
# to the pageserver log, however. Check that.
|
||||
log.info("importing base_plus_garbage_tar")
|
||||
import_tar(base_plus_garbage_tar, wal_tar)
|
||||
assert env.pageserver.log_contains(
|
||||
".*WARN.*ignored .* unexpected bytes after the tar archive.*"
|
||||
|
||||
@@ -2,12 +2,11 @@
|
||||
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
|
||||
|
||||
import os
|
||||
import queue
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -140,7 +139,7 @@ def test_remote_storage_backup_and_restore(
|
||||
# This is before the failures injected by test_remote_failures, so it's a permanent error.
|
||||
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*error attaching tenant: storage-sync-list-remote-timelines",
|
||||
".*attach failed.*: storage-sync-list-remote-timelines",
|
||||
)
|
||||
# Attach it. This HTTP request will succeed and launch a
|
||||
# background task to load the tenant. In that background task,
|
||||
@@ -656,21 +655,26 @@ def test_empty_branch_remote_storage_upload(
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_empty_branch_remote_storage_upload_on_restart(
|
||||
def test_empty_branch_remote_storage_upload_failure(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
"""
|
||||
Branches off a root branch, but does not write anything to the new branch, so
|
||||
it has a metadata file only.
|
||||
Branching is not acknowledged until the index_part.json is uploaded.
|
||||
|
||||
Ensures the branch is not on the remote storage and restarts the pageserver
|
||||
— the upload should be scheduled by load, and create_timeline should await
|
||||
for it even though it gets 409 Conflict.
|
||||
Fails the index_part.json upload with a failpoint.
|
||||
Ensures that timeline creation fails because of that.
|
||||
Stops the pageserver.
|
||||
Restarts it, still with failpoint enabled.
|
||||
Waits for tenant to finish loading.
|
||||
Ensures the timeline does not exist locally nor remotely.
|
||||
|
||||
Disables the failpoint.
|
||||
Ensures that timeline can be created.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_empty_branch_remote_storage_upload_on_restart",
|
||||
test_name="test_empty_branch_remote_storage_upload_failures",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -696,12 +700,21 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
# index upload is now hitting the failpoint, should not block the shutdown
|
||||
env.pageserver.stop()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to create on-disk state for new_timeline_id={new_branch_timeline_id}.*wait for initial uploads to complete.*upload queue was stopped"
|
||||
)
|
||||
|
||||
timeline_path = (
|
||||
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
|
||||
)
|
||||
uninit_marker_path = env.repo_dir / timeline_path.with_suffix(".___uninit")
|
||||
|
||||
local_metadata = env.repo_dir / timeline_path / "metadata"
|
||||
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
|
||||
assert (
|
||||
not uninit_marker_path.exists()
|
||||
), "uninit marker should be deleted during orderly shutdown"
|
||||
assert not (
|
||||
env.repo_dir / timeline_path
|
||||
).exists(), "unfinished timeline dir should be deleted during orderly shutdown"
|
||||
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
|
||||
@@ -709,54 +722,26 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
not new_branch_on_remote_storage.exists()
|
||||
), "failpoint should had prohibited index_part.json upload"
|
||||
|
||||
# during reconciliation we should had scheduled the uploads and on the
|
||||
# retried create_timeline, we will await for those to complete on next
|
||||
# client.timeline_create
|
||||
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
|
||||
# restart without failpoint
|
||||
env.pageserver.start()
|
||||
|
||||
# sleep a bit to force the upload task go into exponential backoff
|
||||
time.sleep(1)
|
||||
wait_until_tenant_state(client, env.initial_tenant, "Active", 5)
|
||||
|
||||
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
barrier = threading.Barrier(2)
|
||||
# retry creation
|
||||
client.timeline_create(
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
new_timeline_id=new_branch_timeline_id,
|
||||
pg_version=env.pg_version,
|
||||
)
|
||||
|
||||
def create_in_background():
|
||||
barrier.wait()
|
||||
try:
|
||||
client.timeline_create(
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
new_timeline_id=new_branch_timeline_id,
|
||||
pg_version=env.pg_version,
|
||||
)
|
||||
q.put(None)
|
||||
except PageserverApiException as e:
|
||||
q.put(e)
|
||||
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
create_thread = threading.Thread(target=create_in_background)
|
||||
create_thread.start()
|
||||
|
||||
try:
|
||||
# maximize chances of actually waiting for the uploads by create_timeline
|
||||
barrier.wait()
|
||||
|
||||
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
|
||||
|
||||
client.configure_failpoints(("before-upload-index", "off"))
|
||||
conflict = q.get()
|
||||
|
||||
assert conflict, "create_timeline should not have succeeded"
|
||||
assert (
|
||||
conflict.status_code == 409
|
||||
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
|
||||
|
||||
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
assert (
|
||||
new_branch_on_remote_storage / "index_part.json"
|
||||
).is_file(), "uploads scheduled during initial load should had been awaited for"
|
||||
finally:
|
||||
create_thread.join()
|
||||
assert (env.repo_dir / timeline_path).exists()
|
||||
assert not uninit_marker_path.exists()
|
||||
assert (
|
||||
new_branch_on_remote_storage / "index_part.json"
|
||||
).is_file(), "uploads scheduled during initial load should had been awaited for"
|
||||
|
||||
|
||||
def wait_upload_queue_empty(
|
||||
|
||||
@@ -647,7 +647,9 @@ def test_ignored_tenant_stays_broken_without_metadata(
|
||||
metadata_removed = True
|
||||
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
|
||||
|
||||
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
|
||||
)
|
||||
|
||||
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
|
||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
||||
|
||||
@@ -20,8 +20,10 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
@@ -249,8 +251,12 @@ def test_pageserver_metrics_removed_after_detach(
|
||||
tenant_1, _ = env.neon_cli.create_tenant()
|
||||
tenant_2, _ = env.neon_cli.create_tenant()
|
||||
|
||||
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_1)
|
||||
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_2)
|
||||
tenant_1_timeline = env.neon_cli.create_timeline(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_1
|
||||
)
|
||||
tenant_2_timeline = env.neon_cli.create_timeline(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_2
|
||||
)
|
||||
|
||||
endpoint_tenant1 = env.endpoints.create_start(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_1
|
||||
@@ -259,13 +265,17 @@ def test_pageserver_metrics_removed_after_detach(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_2
|
||||
)
|
||||
|
||||
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
|
||||
for endpoint, timeline_id in [
|
||||
(endpoint_tenant1, tenant_1_timeline),
|
||||
(endpoint_tenant2, tenant_2_timeline),
|
||||
]:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
last_flush_lsn_upload(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
|
||||
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
@@ -308,9 +318,7 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*could not load tenant.*Failed to list timelines directory.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -341,9 +349,15 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
tenants = client.tenant_list()
|
||||
|
||||
assert len(tenants) == 2
|
||||
def not_loading():
|
||||
tenants = client.tenant_list()
|
||||
assert len(tenants) == 2
|
||||
assert all(t["state"]["slug"] != "Loading" for t in tenants)
|
||||
|
||||
wait_until(10, 0.2, not_loading)
|
||||
|
||||
tenants = client.tenant_list()
|
||||
|
||||
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
|
||||
assert (
|
||||
@@ -355,7 +369,7 @@ def test_pageserver_with_empty_tenants(
|
||||
broken_tenant_status["state"]["slug"] == "Broken"
|
||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
||||
|
||||
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
|
||||
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
|
||||
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
|
||||
assert (
|
||||
|
||||
@@ -272,7 +272,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
".*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*during shutdown: cannot flush frozen layers when flush_loop is not running, state is Exited"
|
||||
".*shutdown_pageserver:.*freeze_and_flush.*cannot flush frozen layers when flush_loop is not running, state is Exited"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
Reference in New Issue
Block a user