refactor timeline initialization

High-level ideas:
- placeholder Timeline object in timelines map during a timeline creation
- the timeline creations (branch, bootstrap, import_from_basebackup)
  prepare durable state (on-disk & remote)state, if necessary using
  _another_ _temporary_ Timeline object
- once the timeline creations have prepared the durable state, they
  use the normal load routine (load_local_timeline) that is also used
  during pageserver startup
- Once the loading is done, we replace the placheolder timeline object
  with the real one
This commit is contained in:
Christian Schwarz
2023-05-25 22:27:53 +02:00
parent 6fe39ecbf7
commit 0874e27023
8 changed files with 717 additions and 507 deletions

View File

@@ -126,6 +126,7 @@ impl std::fmt::Debug for TenantState {
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
Creating,
/// The timeline is recognized by the pageserver but is not yet operational.
/// In particular, the walreceiver connection loop is not running for this timeline.
/// It will eventually transition to state Active or Broken.

View File

@@ -51,6 +51,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
@@ -489,7 +490,10 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
let (uninit_mark, timeline) = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -503,21 +507,32 @@ impl PageServerHandler {
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let doit = async {
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(
&mut copyin_reader,
base_lsn,
self.broker_client.clone(),
&ctx,
)
.await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.await?;
// Read the end of the tar archive.
read_tar_eof(copyin_reader).await?;
// Read the end of the tar archive.
read_tar_eof(copyin_reader).await?;
anyhow::Ok(())
};
match doit.await {
Ok(()) => {
// TODO if we fail anywhere above, then we won't clean up the remote index part which create_empty_timeline already uploaded.
uninit_mark
.remove_uninit_mark()
.context("remove uninit mark")?;
}
Err(e) => {
debug_assert_current_span_has_tenant_and_timeline_id();
error!("error importing basebackup: {:?}", e);
crate::tenant::cleanup_timeline_directory(uninit_mark);
}
}
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
@@ -525,7 +540,9 @@ impl PageServerHandler {
// It wouldn't work if base came from vanilla postgres though,
// since we discard some log files.
info!("done");
info!("done, activating timeline");
timeline.activate(self.broker_client.clone(), &ctx);
Ok(())
}

View File

@@ -270,6 +270,8 @@ pub enum TaskKind {
DebugTool,
CreateTimeline,
#[cfg(test)]
UnitTest,
}

File diff suppressed because it is too large Load Diff

View File

@@ -19,7 +19,9 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, Tenant, TenantState, TimelineLoadCause,
};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::fs_ext::PathExt;
@@ -119,6 +121,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
TimelineLoadCause::Startup,
&ctx,
) {
Ok(tenant) => {
@@ -154,6 +157,7 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
cause: TimelineLoadCause,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -207,7 +211,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, cause, ctx)
};
Ok(tenant)
}
@@ -289,7 +293,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, TimelineLoadCause::TenantCreate, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -435,7 +439,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, TimelineLoadCause::TenantLoad, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -508,7 +512,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), TimelineLoadCause::Attach ,ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -63,13 +63,13 @@ use utils::{
simple_rcu::{Rcu, RcuReadGuard},
};
use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{import_datadir, page_cache};
use crate::{is_temporary, task_mgr};
pub(super) use self::eviction_task::EvictionTaskTenantState;
@@ -664,12 +664,25 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
if self.current_state() == TimelineState::Creating {
debug!("timelines in Creating state are never written to");
assert!(
self.layers.read().unwrap().open_layer.is_none(),
"would have nothing to flush anyways"
);
return Ok(());
}
self.freeze_inmem_layer(false);
self.flush_frozen_layers_and_wait().await
}
/// Outermost timeline compaction operation; downloads needed layers.
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
if self.current_state() == TimelineState::Creating {
debug!("timelines is in Creating state");
return Ok(());
}
const ROUNDS: usize = 2;
let last_record_lsn = self.get_last_record_lsn();
@@ -917,12 +930,30 @@ impl Timeline {
}
pub fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
if self.current_state() == TimelineState::Creating {
panic!("timelines in Creating state are never activated");
}
self.maybe_spawn_flush_loop();
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task();
}
pub fn set_state(&self, new_state: TimelineState) {
if self.current_state() == TimelineState::Creating {
info!("timelines in Creating state are never activated, nothing to stop");
assert_eq!(
*self.flush_loop_state.lock().unwrap(),
FlushLoopState::NotStarted
);
assert!(
self.layers.read().unwrap().open_layer.is_none(),
"would have nothing to flush anyways"
);
assert!(self.walreceiver.lock().unwrap().is_none());
// TODO: assert other tasks launched in activate are not running
return;
}
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
@@ -962,6 +993,14 @@ impl Timeline {
loop {
let current_state = *receiver.borrow_and_update();
match current_state {
TimelineState::Creating => {
// A timeline _object_ in state Creating never transitions out of it.
// It gets replaced by another object in Loading state once creation is done.
// So, `self` is not the right object to subscribe to.
// Luckily, there's no code path that calls this function.
// But let's error out instead of an unreachable, just to be on the safe side.
return Err(current_state);
}
TimelineState::Loading => {
receiver
.changed()
@@ -1305,7 +1344,6 @@ impl Timeline {
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
}
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
@@ -1318,11 +1356,16 @@ impl Timeline {
timeline_id: TimelineId,
tenant_id: TenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
remote_client: Option<RemoteTimelineClient>,
remote_client: Option<Arc<RemoteTimelineClient>>,
pg_version: u32,
is_create_placeholder: bool,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Loading);
let (state, _) = watch::channel(if is_create_placeholder {
TimelineState::Creating
} else {
TimelineState::Loading
});
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
@@ -1350,7 +1393,7 @@ impl Timeline {
walredo_mgr,
walreceiver: Mutex::new(None),
remote_client: remote_client.map(Arc::new),
remote_client,
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
@@ -1366,7 +1409,7 @@ impl Timeline {
ancestor_lsn: metadata.ancestor_lsn(),
metrics: TimelineMetrics::new(
false,
is_create_placeholder,
&tenant_id,
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
@@ -1514,6 +1557,33 @@ impl Timeline {
));
}
/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self: &Arc<Self>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
import_datadir::import_basebackup_from_tar(self, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")?;
// Flush loop needs to be spawned in order to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
self.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
});
self.freeze_and_flush()
.await
.context("Failed to flush after basebackup import")?;
Ok(())
}
///
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
@@ -2016,6 +2086,12 @@ impl Timeline {
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if self.current_state() == TimelineState::Creating {
return Err(CalculateLogicalSizeError::Other(anyhow!(
"cannot calculate logical size for timeline in Creating state"
)));
}
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
@@ -2036,7 +2112,8 @@ impl Timeline {
TimelineState::Active => continue,
TimelineState::Broken
| TimelineState::Stopping
| TimelineState::Loading => {
| TimelineState::Loading
| TimelineState::Creating => {
break format!("aborted because timeline became inactive (new state: {new_state:?})")
}
}
@@ -3734,6 +3811,11 @@ impl Timeline {
let now = SystemTime::now();
let mut result: GcResult = GcResult::default();
if self.current_state() == TimelineState::Creating {
debug!("timeline creating placeholder does not need GC");
return Ok(GcResult::default());
}
// Nothing to GC. Return early.
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
if latest_gc_cutoff >= new_gc_cutoff {

View File

@@ -148,6 +148,7 @@ pub(super) async fn connection_manager_loop_step(
Ok(()) => {
let new_state = connection_manager_state.timeline.current_state();
match new_state {
TimelineState::Creating => unreachable!("walreceiver should never be launched on a timeline in Creating state"),
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Stopping => {

View File

@@ -2,12 +2,11 @@
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import os
import queue
import shutil
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Tuple
import pytest
from fixtures.log_helper import log
@@ -656,21 +655,26 @@ def test_empty_branch_remote_storage_upload(
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_empty_branch_remote_storage_upload_on_restart(
def test_empty_branch_remote_storage_upload_failure(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
"""
Branches off a root branch, but does not write anything to the new branch, so
it has a metadata file only.
Branching is not acknowledged until the index_part.json is uploaded.
Ensures the branch is not on the remote storage and restarts the pageserver
— the upload should be scheduled by load, and create_timeline should await
for it even though it gets 409 Conflict.
Fails the index_part.json upload with a failpoint.
Ensures that timeline creation fails because of that.
Stops the pageserver.
Restarts it, still with failpoint enabled.
Waits for tenant to finish loading.
Ensures the timeline does not exist locally nor remotely.
Disables the failpoint.
Ensures that timeline can be created.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_empty_branch_remote_storage_upload_on_restart",
test_name="test_empty_branch_remote_storage_upload_failures",
)
env = neon_env_builder.init_start()
@@ -696,12 +700,21 @@ def test_empty_branch_remote_storage_upload_on_restart(
# index upload is now hitting the failpoint, should not block the shutdown
env.pageserver.stop()
env.pageserver.allowed_errors.append(
f".*failed to create on-disk state for new_timeline_id={new_branch_timeline_id}.*wait for initial uploads to complete.*upload queue was stopped"
)
timeline_path = (
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
)
uninit_marker_path = env.repo_dir / timeline_path.with_suffix(".___uninit")
local_metadata = env.repo_dir / timeline_path / "metadata"
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
assert (
not uninit_marker_path.exists()
), "uninit marker should be deleted during orderly shutdown"
assert not (
env.repo_dir / timeline_path
).exists(), "unfinished timeline dir should be deleted during orderly shutdown"
assert isinstance(env.remote_storage, LocalFsStorage)
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
@@ -709,54 +722,26 @@ def test_empty_branch_remote_storage_upload_on_restart(
not new_branch_on_remote_storage.exists()
), "failpoint should had prohibited index_part.json upload"
# during reconciliation we should had scheduled the uploads and on the
# retried create_timeline, we will await for those to complete on next
# client.timeline_create
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
# restart without failpoint
env.pageserver.start()
# sleep a bit to force the upload task go into exponential backoff
time.sleep(1)
wait_until_tenant_state(client, env.initial_tenant, "Active", 5)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
# retry creation
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
)
def create_in_background():
barrier.wait()
try:
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
)
q.put(None)
except PageserverApiException as e:
q.put(e)
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
create_thread = threading.Thread(target=create_in_background)
create_thread.start()
try:
# maximize chances of actually waiting for the uploads by create_timeline
barrier.wait()
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
client.configure_failpoints(("before-upload-index", "off"))
conflict = q.get()
assert conflict, "create_timeline should not have succeeded"
assert (
conflict.status_code == 409
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
assert (
new_branch_on_remote_storage / "index_part.json"
).is_file(), "uploads scheduled during initial load should had been awaited for"
finally:
create_thread.join()
assert (env.repo_dir / timeline_path).exists()
assert not uninit_marker_path.exists()
assert (
new_branch_on_remote_storage / "index_part.json"
).is_file(), "uploads scheduled during initial load should had been awaited for"
def wait_upload_queue_empty(