mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
Compare commits
77 Commits
test_repli
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a18d44013 | ||
|
|
8f8b7ad4fd | ||
|
|
70fdaa47f1 | ||
|
|
b5cf92b948 | ||
|
|
a109a246f3 | ||
|
|
a0b6f0c052 | ||
|
|
9ce6e7c86b | ||
|
|
e839c97188 | ||
|
|
0c0cd1857d | ||
|
|
b725cc879a | ||
|
|
4f4073a124 | ||
|
|
64efcbf8da | ||
|
|
abb4112a28 | ||
|
|
a97b21ab13 | ||
|
|
9d3267e474 | ||
|
|
6b25cb5030 | ||
|
|
f91ad65fb3 | ||
|
|
9a4789ec73 | ||
|
|
72159ee686 | ||
|
|
be74662d05 | ||
|
|
e7c4ef9f4f | ||
|
|
13d3f4c29f | ||
|
|
67258af8a2 | ||
|
|
17ba307004 | ||
|
|
e1486444d6 | ||
|
|
c6f9b8f318 | ||
|
|
ba3e3bdddf | ||
|
|
71f9bbef0d | ||
|
|
4680f8c60b | ||
|
|
3c1fc2617c | ||
|
|
60cc197ce3 | ||
|
|
609a929968 | ||
|
|
f2abc4c933 | ||
|
|
b09beaa4fe | ||
|
|
1367e2b0ee | ||
|
|
122e23071b | ||
|
|
696c6ed6ff | ||
|
|
0874e27023 | ||
|
|
6fe39ecbf7 | ||
|
|
a0c2a85505 | ||
|
|
dd0f5c4ef3 | ||
|
|
de780d2e0f | ||
|
|
f18d9f555b | ||
|
|
05a2fe08d1 | ||
|
|
eaf270c648 | ||
|
|
ddad0928c5 | ||
|
|
96c550222b | ||
|
|
cf8ff7edad | ||
|
|
da6573f551 | ||
|
|
2fee8c884f | ||
|
|
fe4ef121b6 | ||
|
|
641ca994dc | ||
|
|
413598b19b | ||
|
|
b345f32e3f | ||
|
|
69cfa9fe61 | ||
|
|
2c424c8f4e | ||
|
|
4001f441c0 | ||
|
|
ef956c47fc | ||
|
|
8606b6abe5 | ||
|
|
732f60317b | ||
|
|
b54431bbd3 | ||
|
|
def5eb8542 | ||
|
|
07da786ed3 | ||
|
|
75c3c43b2e | ||
|
|
bdf03eab58 | ||
|
|
32c85fa87a | ||
|
|
b2e0c58a8c | ||
|
|
94f30f0660 | ||
|
|
a55d224923 | ||
|
|
4f586ac101 | ||
|
|
feb2e80b83 | ||
|
|
ee22e81583 | ||
|
|
3e604eaa39 | ||
|
|
8bcb542a3b | ||
|
|
17b081d294 | ||
|
|
d5337e6a65 | ||
|
|
cc96a5186d |
@@ -154,6 +154,7 @@ pub enum ActivatingFrom {
|
||||
/// A state of a timeline in pageserver's memory.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum TimelineState {
|
||||
Creating,
|
||||
/// The timeline is recognized by the pageserver but is not yet operational.
|
||||
/// In particular, the walreceiver connection loop is not running for this timeline.
|
||||
/// It will eventually transition to state Active or Broken.
|
||||
@@ -165,7 +166,10 @@ pub enum TimelineState {
|
||||
/// It cannot transition back into any other state.
|
||||
Stopping,
|
||||
/// The timeline is broken and not operational (previous states: Loading or Active).
|
||||
Broken { reason: String, backtrace: String },
|
||||
Broken {
|
||||
reason: String,
|
||||
backtrace: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -746,6 +746,7 @@ impl StorageTimeMetrics {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TimelineMetrics {
|
||||
fake: bool,
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
pub get_reconstruct_data_time_histo: Histogram,
|
||||
@@ -770,6 +771,7 @@ pub struct TimelineMetrics {
|
||||
|
||||
impl TimelineMetrics {
|
||||
pub fn new(
|
||||
fake: bool,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder,
|
||||
@@ -828,7 +830,8 @@ impl TimelineMetrics {
|
||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
|
||||
MATERIALIZED_PAGE_CACHE_HIT.get();
|
||||
|
||||
TimelineMetrics {
|
||||
let m = TimelineMetrics {
|
||||
fake,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
get_reconstruct_data_time_histo,
|
||||
@@ -850,12 +853,16 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
read_num_fs_layers,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
impl Drop for TimelineMetrics {
|
||||
fn drop(&mut self) {
|
||||
if fake {
|
||||
m.remove_metrics();
|
||||
}
|
||||
|
||||
m
|
||||
}
|
||||
|
||||
fn remove_metrics(&self) {
|
||||
let tenant_id = &self.tenant_id;
|
||||
let timeline_id = &self.timeline_id;
|
||||
let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]);
|
||||
@@ -892,6 +899,14 @@ impl Drop for TimelineMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineMetrics {
|
||||
fn drop(&mut self) {
|
||||
if !self.fake {
|
||||
self.remove_metrics();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
|
||||
let tid = tenant_id.to_string();
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
|
||||
@@ -14,6 +14,7 @@ use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
@@ -24,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::pin::pin;
|
||||
@@ -51,6 +53,8 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant;
|
||||
use crate::tenant::compare_arced_timeline;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::GetTenantError;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
@@ -487,11 +491,20 @@ impl PageServerHandler {
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
|
||||
let (guard, real_timeline_not_in_tenants_map) = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO spawn flush loop of timeline early (before activation),
|
||||
// but then we need to take care of shutting it down in case we fail
|
||||
// (bootstrap_timeline probably also needs it?)
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
@@ -505,21 +518,49 @@ impl PageServerHandler {
|
||||
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
let doit = async {
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
timeline
|
||||
.import_basebackup_from_tar(
|
||||
&mut copyin_reader,
|
||||
base_lsn,
|
||||
self.broker_client.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
real_timeline_not_in_tenants_map
|
||||
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
|
||||
.await?;
|
||||
|
||||
// Read the end of the tar archive.
|
||||
read_tar_eof(copyin_reader).await?;
|
||||
// Read the end of the tar archive.
|
||||
read_tar_eof(copyin_reader).await?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
let placeholder_timeline = match doit.await {
|
||||
Ok(()) => {
|
||||
match guard.creation_complete_remove_uninit_marker_and_get_placeholder_timeline() {
|
||||
Ok(placeholder_timeline) => placeholder_timeline,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"failed to remove uninit marker for new_timeline_id={timeline_id}: {err:#}"
|
||||
);
|
||||
return Err(QueryError::Other(err.context("remove uninit marker file")));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
guard.creation_failed();
|
||||
return Err(QueryError::Other(e));
|
||||
}
|
||||
};
|
||||
|
||||
// todo share with Tenant::create_timeline
|
||||
match tenant.timelines.lock().unwrap().entry(timeline_id) {
|
||||
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
|
||||
Entry::Occupied(mut o) => {
|
||||
info!("replacing placeholder timeline with the real one");
|
||||
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
|
||||
assert!(compare_arced_timeline(&placeholder_timeline, o.get()));
|
||||
let replaced_placeholder = o.insert(Arc::clone(&real_timeline_not_in_tenants_map));
|
||||
assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline));
|
||||
},
|
||||
}
|
||||
|
||||
// TODO check checksum
|
||||
// Meanwhile you can verify client-side by taking fullbackup
|
||||
@@ -527,7 +568,9 @@ impl PageServerHandler {
|
||||
// It wouldn't work if base came from vanilla postgres though,
|
||||
// since we discard some log files.
|
||||
|
||||
info!("done");
|
||||
info!("done, activating timeline");
|
||||
real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), None, &ctx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -273,6 +273,8 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
CreateTimeline,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -20,7 +20,9 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
|
||||
use crate::tenant::{
|
||||
create_tenant_files, CreateTenantFilesMode, Tenant, TenantState, TimelineLoadCause,
|
||||
};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
|
||||
|
||||
use utils::fs_ext::PathExt;
|
||||
@@ -121,6 +123,7 @@ pub async fn init_tenant_mgr(
|
||||
&tenant_dir_path,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
TimelineLoadCause::Startup,
|
||||
Some(init_order.clone()),
|
||||
&ctx,
|
||||
) {
|
||||
@@ -157,6 +160,7 @@ pub fn schedule_local_tenant_processing(
|
||||
tenant_path: &Path,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
cause: TimelineLoadCause,
|
||||
init_order: Option<InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
@@ -174,6 +178,7 @@ pub fn schedule_local_tenant_processing(
|
||||
})?,
|
||||
"Cannot load tenant from empty directory {tenant_path:?}"
|
||||
);
|
||||
// TODO ensure there's no uninit mark / handle it correctly during ignore and load
|
||||
|
||||
let tenant_id = tenant_path
|
||||
.file_name()
|
||||
@@ -216,6 +221,7 @@ pub fn schedule_local_tenant_processing(
|
||||
tenant_id,
|
||||
broker_client,
|
||||
remote_storage,
|
||||
cause,
|
||||
init_order,
|
||||
ctx,
|
||||
)
|
||||
@@ -315,7 +321,7 @@ pub async fn create_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, TimelineLoadCause::TenantCreate, None, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -463,7 +469,7 @@ pub async fn load_tenant(
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, TimelineLoadCause::TenantLoad, None, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -536,7 +542,7 @@ pub async fn attach_tenant(
|
||||
.context("check for attach marker file existence")?;
|
||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), TimelineLoadCause::Attach, None, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
|
||||
@@ -68,13 +68,13 @@ use utils::{
|
||||
simple_rcu::{Rcu, RcuReadGuard},
|
||||
};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::repository::GcResult;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::METADATA_FILE_NAME;
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::{import_datadir, page_cache};
|
||||
use crate::{is_temporary, task_mgr};
|
||||
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
@@ -86,6 +86,7 @@ use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset};
|
||||
use super::TimelineLoadCause;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub(super) enum FlushLoopState {
|
||||
@@ -690,12 +691,22 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
// make a few additional sanity checks before panicking
|
||||
assert!(self.layers.read().await.open_layer.is_none());
|
||||
panic!("caller must prevent calls for timelines in Creating state")
|
||||
}
|
||||
self.freeze_inmem_layer(false).await;
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub async fn compact(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
assert!(
|
||||
!matches!(self.current_state(), TimelineState::Creating),
|
||||
"caller must prevent calls for timelines in Creating state"
|
||||
);
|
||||
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -949,12 +960,33 @@ impl Timeline {
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
panic!("timelines in Creating state are never activated");
|
||||
}
|
||||
self.maybe_spawn_flush_loop();
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
self.set_state(TimelineState::Active);
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
}
|
||||
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
// Do a few assertions before panicking to detect other code that is lacking checks for `Creating` state.
|
||||
assert_eq!(
|
||||
*self.flush_loop_state.lock().unwrap(),
|
||||
FlushLoopState::NotStarted
|
||||
);
|
||||
assert!(
|
||||
self.layers
|
||||
.try_read()
|
||||
.expect("we would never be modifying Timeline::layers in a Creating timeline")
|
||||
.open_layer
|
||||
.is_none(),
|
||||
"would have nothing to flush anyways"
|
||||
);
|
||||
assert!(self.walreceiver.lock().unwrap().is_none());
|
||||
panic!("timelines in Creating state never change state");
|
||||
}
|
||||
match (self.current_state(), new_state) {
|
||||
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
|
||||
warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
@@ -1022,6 +1054,12 @@ impl Timeline {
|
||||
loop {
|
||||
let current_state = receiver.borrow().clone();
|
||||
match current_state {
|
||||
TimelineState::Creating => {
|
||||
// A timeline _object_ in state Creating never transitions out of it.
|
||||
// It gets replaced by another object in Loading state once creation is done.
|
||||
// So, `self` is not the right object to subscribe to.
|
||||
panic!("timelines in Creating state never change state, hence can't wait for it to become active");
|
||||
}
|
||||
TimelineState::Loading => {
|
||||
receiver
|
||||
.changed()
|
||||
@@ -1390,13 +1428,18 @@ impl Timeline {
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
remote_client: Option<Arc<RemoteTimelineClient>>,
|
||||
pg_version: u32,
|
||||
is_create_placeholder: bool,
|
||||
initial_logical_size_can_start: Option<completion::Barrier>,
|
||||
initial_logical_size_attempt: Option<completion::Completion>,
|
||||
) -> Arc<Self> {
|
||||
let disk_consistent_lsn = metadata.disk_consistent_lsn();
|
||||
let (state, _) = watch::channel(TimelineState::Loading);
|
||||
let (state, _) = watch::channel(if is_create_placeholder {
|
||||
TimelineState::Creating
|
||||
} else {
|
||||
TimelineState::Loading
|
||||
});
|
||||
|
||||
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
|
||||
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
|
||||
@@ -1424,7 +1467,7 @@ impl Timeline {
|
||||
walredo_mgr,
|
||||
walreceiver: Mutex::new(None),
|
||||
|
||||
remote_client: remote_client.map(Arc::new),
|
||||
remote_client,
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
@@ -1440,6 +1483,7 @@ impl Timeline {
|
||||
ancestor_lsn: metadata.ancestor_lsn(),
|
||||
|
||||
metrics: TimelineMetrics::new(
|
||||
is_create_placeholder,
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
|
||||
@@ -1595,20 +1639,41 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
///
|
||||
/// Initialize with an empty layer map. Used when creating a new timeline.
|
||||
///
|
||||
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
|
||||
let mut layers = self.layers.try_write().expect(
|
||||
"in the context where we call this function, no other task has access to the object",
|
||||
);
|
||||
layers.next_open_layer_at = Some(Lsn(start_lsn.0));
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
pub(crate) async fn import_basebackup_from_tar(
|
||||
self: &Arc<Self>,
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
import_datadir::import_basebackup_from_tar(self, copyin_read, base_lsn, ctx)
|
||||
.await
|
||||
.context("Failed to import basebackup")?;
|
||||
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
// We want to run proper checkpoint before we mark timeline as available to outside world
|
||||
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
|
||||
self.maybe_spawn_flush_loop();
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
self.freeze_and_flush()
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
///
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
pub(super) async fn load_layer_map(
|
||||
&self,
|
||||
cause: &TimelineLoadCause,
|
||||
disk_consistent_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
@@ -1711,7 +1776,19 @@ impl Timeline {
|
||||
}
|
||||
|
||||
updates.flush();
|
||||
layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1);
|
||||
|
||||
if disk_consistent_lsn == Lsn(0) {
|
||||
// If disk_consistent_lsn is 0, then we're still in bootstrap/basebackup_import/create_test_timeline.
|
||||
// Set next_open_layer_at to initdb_lsn to enable the put@initdb_lsn optimization in flush_frozen_layer.
|
||||
assert!(matches!(cause, TimelineLoadCause::TimelineCreate { .. }));
|
||||
assert_eq!(
|
||||
num_layers, 0,
|
||||
"if we crash, creating timelines get removed from disk"
|
||||
);
|
||||
layers.next_open_layer_at = Some(self.initdb_lsn);
|
||||
} else {
|
||||
layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1);
|
||||
}
|
||||
|
||||
info!(
|
||||
"loaded layer map with {} layers at {}, total physical size: {}",
|
||||
@@ -2127,6 +2204,10 @@ impl Timeline {
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
panic!("cannot calculate logical size for timeline in Creating state");
|
||||
}
|
||||
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
|
||||
@@ -2147,7 +2228,8 @@ impl Timeline {
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken { .. }
|
||||
| TimelineState::Stopping
|
||||
| TimelineState::Loading => {
|
||||
| TimelineState::Loading
|
||||
| TimelineState::Creating => {
|
||||
break format!("aborted because timeline became inactive (new state: {new_state:?})")
|
||||
}
|
||||
}
|
||||
@@ -4074,6 +4156,11 @@ impl Timeline {
|
||||
let now = SystemTime::now();
|
||||
let mut result: GcResult = GcResult::default();
|
||||
|
||||
if self.current_state() == TimelineState::Creating {
|
||||
debug!("timeline creating placeholder does not need GC");
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
|
||||
// Nothing to GC. Return early.
|
||||
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
|
||||
if latest_gc_cutoff >= new_gc_cutoff {
|
||||
|
||||
@@ -151,6 +151,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
Ok(()) => {
|
||||
let new_state = connection_manager_state.timeline.current_state();
|
||||
match new_state {
|
||||
TimelineState::Creating => unreachable!("walreceiver should never be launched on a timeline in Creating state"),
|
||||
// we're already active as walreceiver, no need to reactivate
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken { .. } | TimelineState::Stopping => {
|
||||
|
||||
@@ -3079,6 +3079,21 @@ def fork_at_current_lsn(
|
||||
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
|
||||
|
||||
|
||||
def last_flush_lsn_checkpoint(
|
||||
env: NeonEnv, endpoint: Endpoint, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> Lsn:
|
||||
"""
|
||||
Wait for pageserver to catch to the latest flush LSN of given endpoint, then
|
||||
checkpoint pageserver.
|
||||
"""
|
||||
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
ps_http = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
def last_flush_lsn_upload(
|
||||
env: NeonEnv, endpoint: Endpoint, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> Lsn:
|
||||
@@ -3087,10 +3102,7 @@ def last_flush_lsn_upload(
|
||||
checkpoint pageserver, and wait for it to be uploaded (remote_consistent_lsn
|
||||
reaching flush LSN).
|
||||
"""
|
||||
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
last_flush_lsn = last_flush_lsn_checkpoint(env, endpoint, tenant_id, timeline_id)
|
||||
ps_http = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
return last_flush_lsn
|
||||
|
||||
@@ -172,8 +172,10 @@ def test_timeline_create_break_after_uninit_mark(neon_simple_env: NeonEnv):
|
||||
|
||||
# Introduce failpoint when creating a new timeline uninit mark, before any other files were created
|
||||
pageserver_http.configure_failpoints(("after-timeline-uninit-mark-creation", "return"))
|
||||
with pytest.raises(Exception, match="after-timeline-uninit-mark-creation"):
|
||||
with pytest.raises(Exception, match="create timeline files"):
|
||||
_ = env.neon_cli.create_timeline("test_timeline_create_break_after_uninit_mark", tenant_id)
|
||||
env.pageserver.allowed_errors.append(".*InternalServerError.*create timeline files")
|
||||
env.pageserver.allowed_errors.append(".*hitting failpoint after-timeline-uninit-mark-creation")
|
||||
|
||||
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
|
||||
# "New" timeline is not present in the list, allowing pageserver to retry the same request
|
||||
|
||||
@@ -133,17 +133,24 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
)
|
||||
|
||||
# Importing empty file fails
|
||||
log.info("importing empty_file")
|
||||
empty_file = os.path.join(test_output_dir, "empty_file")
|
||||
with open(empty_file, "w") as _:
|
||||
with pytest.raises(Exception):
|
||||
import_tar(empty_file, empty_file)
|
||||
|
||||
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
|
||||
|
||||
# Importing corrupt backup fails
|
||||
log.info("importing corrupt_base_tar")
|
||||
with pytest.raises(Exception):
|
||||
import_tar(corrupt_base_tar, wal_tar)
|
||||
|
||||
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
|
||||
|
||||
# A tar with trailing garbage is currently accepted. It prints a warnings
|
||||
# to the pageserver log, however. Check that.
|
||||
log.info("importing base_plus_garbage_tar")
|
||||
import_tar(base_plus_garbage_tar, wal_tar)
|
||||
assert env.pageserver.log_contains(
|
||||
".*WARN.*ignored .* unexpected bytes after the tar archive.*"
|
||||
|
||||
@@ -2,12 +2,11 @@
|
||||
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
|
||||
|
||||
import os
|
||||
import queue
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -674,21 +673,26 @@ def test_empty_branch_remote_storage_upload(
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_empty_branch_remote_storage_upload_on_restart(
|
||||
def test_empty_branch_remote_storage_upload_failure(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
"""
|
||||
Branches off a root branch, but does not write anything to the new branch, so
|
||||
it has a metadata file only.
|
||||
Branching is not acknowledged until the index_part.json is uploaded.
|
||||
|
||||
Ensures the branch is not on the remote storage and restarts the pageserver
|
||||
— the upload should be scheduled by load, and create_timeline should await
|
||||
for it even though it gets 409 Conflict.
|
||||
Fails the index_part.json upload with a failpoint.
|
||||
Ensures that timeline creation fails because of that.
|
||||
Stops the pageserver.
|
||||
Restarts it, still with failpoint enabled.
|
||||
Waits for tenant to finish loading.
|
||||
Ensures the timeline does not exist locally nor remotely.
|
||||
|
||||
Disables the failpoint.
|
||||
Ensures that timeline can be created.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_empty_branch_remote_storage_upload_on_restart",
|
||||
test_name="test_empty_branch_remote_storage_upload_failures",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -714,9 +718,14 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
# index upload is now hitting the failpoint, it should block the shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to create on-disk state for new_timeline_id={new_branch_timeline_id}.*wait for initial uploads to complete.*upload queue was stopped"
|
||||
)
|
||||
|
||||
timeline_path = (
|
||||
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
|
||||
)
|
||||
uninit_marker_path = env.repo_dir / timeline_path.with_suffix(".___uninit")
|
||||
|
||||
local_metadata = env.repo_dir / timeline_path / "metadata"
|
||||
assert local_metadata.is_file()
|
||||
@@ -727,54 +736,26 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
not new_branch_on_remote_storage.exists()
|
||||
), "failpoint should had prohibited index_part.json upload"
|
||||
|
||||
# during reconciliation we should had scheduled the uploads and on the
|
||||
# retried create_timeline, we will await for those to complete on next
|
||||
# client.timeline_create
|
||||
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
|
||||
# restart without failpoint
|
||||
env.pageserver.start()
|
||||
|
||||
# sleep a bit to force the upload task go into exponential backoff
|
||||
time.sleep(1)
|
||||
wait_until_tenant_state(client, env.initial_tenant, "Active", 5)
|
||||
|
||||
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
barrier = threading.Barrier(2)
|
||||
# retry creation
|
||||
client.timeline_create(
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
new_timeline_id=new_branch_timeline_id,
|
||||
pg_version=env.pg_version,
|
||||
)
|
||||
|
||||
def create_in_background():
|
||||
barrier.wait()
|
||||
try:
|
||||
client.timeline_create(
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
new_timeline_id=new_branch_timeline_id,
|
||||
pg_version=env.pg_version,
|
||||
)
|
||||
q.put(None)
|
||||
except PageserverApiException as e:
|
||||
q.put(e)
|
||||
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
create_thread = threading.Thread(target=create_in_background)
|
||||
create_thread.start()
|
||||
|
||||
try:
|
||||
# maximize chances of actually waiting for the uploads by create_timeline
|
||||
barrier.wait()
|
||||
|
||||
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
|
||||
|
||||
client.configure_failpoints(("before-upload-index", "off"))
|
||||
conflict = q.get()
|
||||
|
||||
assert conflict, "create_timeline should not have succeeded"
|
||||
assert (
|
||||
conflict.status_code == 409
|
||||
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
|
||||
|
||||
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
assert (
|
||||
new_branch_on_remote_storage / "index_part.json"
|
||||
).is_file(), "uploads scheduled during initial load should had been awaited for"
|
||||
finally:
|
||||
create_thread.join()
|
||||
assert (env.repo_dir / timeline_path).exists()
|
||||
assert not uninit_marker_path.exists()
|
||||
assert (
|
||||
new_branch_on_remote_storage / "index_part.json"
|
||||
).is_file(), "uploads scheduled during initial load should had been awaited for"
|
||||
|
||||
|
||||
def wait_upload_queue_empty(
|
||||
|
||||
@@ -20,6 +20,8 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
last_flush_lsn_checkpoint,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.utils import timeline_delete_wait_completed
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -251,8 +253,12 @@ def test_pageserver_metrics_removed_after_detach(
|
||||
tenant_1, _ = env.neon_cli.create_tenant()
|
||||
tenant_2, _ = env.neon_cli.create_tenant()
|
||||
|
||||
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_1)
|
||||
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_2)
|
||||
tenant_1_timeline = env.neon_cli.create_timeline(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_1
|
||||
)
|
||||
tenant_2_timeline = env.neon_cli.create_timeline(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_2
|
||||
)
|
||||
|
||||
endpoint_tenant1 = env.endpoints.create_start(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_1
|
||||
@@ -261,13 +267,20 @@ def test_pageserver_metrics_removed_after_detach(
|
||||
"test_metrics_removed_after_detach", tenant_id=tenant_2
|
||||
)
|
||||
|
||||
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
|
||||
for endpoint, timeline_id in [
|
||||
(endpoint_tenant1, tenant_1_timeline),
|
||||
(endpoint_tenant2, tenant_2_timeline),
|
||||
]:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
if remote_storage_kind != RemoteStorageKind.NOOP:
|
||||
last_flush_lsn_upload(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
else:
|
||||
last_flush_lsn_checkpoint(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
endpoint.stop()
|
||||
|
||||
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
|
||||
|
||||
@@ -305,7 +305,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
)
|
||||
# this happens, because the stuck timeline is visible to shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*freeze_and_flush_on_shutdown.+: failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited"
|
||||
".*shutdown_pageserver.*freeze_and_flush timeline failed timeline_id=.* err=cannot flush frozen layers when flush_loop is not running, state is Exited"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
Reference in New Issue
Block a user