Add existing_initdb_timeline_id param to timeline creation (#5912)

This PR adds an `existing_initdb_timeline_id` option to timeline
creation APIs, taking an optional timeline ID.

Follow-up of  #5390.

If the `existing_initdb_timeline_id` option is specified via the HTTP
API, the pageserver downloads the existing initdb archive from the given
timeline ID and extracts it, instead of running initdb itself.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Arpad Müller
2023-11-30 22:32:04 +01:00
committed by GitHub
parent 3842773546
commit b71b8ecfc2
17 changed files with 245 additions and 44 deletions

View File

@@ -415,6 +415,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
None,
None,
Some(pg_version),
None,
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
@@ -495,6 +496,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
None,
None,
Some(pg_version),
None,
)?;
let new_timeline_id = timeline_info.timeline_id;
@@ -582,6 +584,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
start_lsn,
Some(ancestor_timeline_id),
None,
None,
)?;
let new_timeline_id = timeline_info.timeline_id;

View File

@@ -565,6 +565,7 @@ impl PageServerNode {
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
existing_initdb_timeline_id: Option<TimelineId>,
) -> anyhow::Result<TimelineInfo> {
// If timeline ID was not specified, generate one
let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
@@ -578,6 +579,7 @@ impl PageServerNode {
ancestor_start_lsn,
ancestor_timeline_id,
pg_version,
existing_initdb_timeline_id,
})
.send()?
.error_from_body()?

View File

@@ -179,6 +179,8 @@ pub struct TimelineCreateRequest {
#[serde(default)]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde(default)]
pub existing_initdb_timeline_id: Option<TimelineId>,
#[serde(default)]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}

View File

@@ -1028,6 +1028,9 @@ paths:
format: hex
pg_version:
type: integer
existing_initdb_timeline_id:
type: string
format: hex
responses:
"201":
description: TimelineInfo

View File

@@ -441,6 +441,7 @@ async fn timeline_create_handler(
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
request_data.existing_initdb_timeline_id,
state.broker_client.clone(),
&ctx,
)

View File

@@ -7,12 +7,13 @@ use std::pin::Pin;
use std::task::{self, Poll};
use anyhow::{bail, ensure, Context, Result};
use async_compression::tokio::bufread::ZstdDecoder;
use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
use bytes::Bytes;
use camino::Utf8Path;
use futures::StreamExt;
use nix::NixPath;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_tar::Archive;
use tokio_tar::Builder;
use tokio_tar::HeaderMode;
@@ -732,3 +733,13 @@ pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result<Vec<u8>> {
}
Ok(compressed.buf)
}
pub async fn extract_tar_zst(
pgdata_path: &Utf8Path,
tar_zst: impl AsyncBufRead + Unpin,
) -> Result<()> {
let tar = Box::pin(ZstdDecoder::new(tar_zst));
let mut archive = Archive::new(tar);
archive.unpack(pgdata_path).await?;
Ok(())
}

View File

@@ -24,6 +24,7 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tokio::task::JoinSet;
@@ -1558,12 +1559,14 @@ impl Tenant {
///
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
#[allow(clippy::too_many_arguments)]
pub async fn create_timeline(
&self,
new_timeline_id: TimelineId,
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
@@ -1638,7 +1641,7 @@ impl Tenant {
.await?
}
None => {
self.bootstrap_timeline(new_timeline_id, pg_version, ctx)
self.bootstrap_timeline(new_timeline_id, pg_version, load_existing_initdb, ctx)
.await?
}
};
@@ -2951,6 +2954,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_uninit_mark = {
@@ -2973,8 +2977,6 @@ impl Tenant {
format!("Failed to remove already existing initdb directory: {pgdata_path}")
})?;
}
// Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
scopeguard::defer! {
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
@@ -2982,31 +2984,58 @@ impl Tenant {
error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
}
}
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
if let Some(existing_initdb_timeline_id) = load_existing_initdb {
let Some(storage) = &self.remote_storage else {
bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
};
let (initdb_tar_zst_path, initdb_tar_zst) =
self::remote_timeline_client::download_initdb_tar_zst(
self.conf,
storage,
&self.tenant_shard_id,
&existing_initdb_timeline_id,
)
.await
.context("download initdb tar")?;
let buf_read = Box::pin(BufReader::new(initdb_tar_zst));
import_datadir::extract_tar_zst(&pgdata_path, buf_read)
.await
.context("extract initdb tar")?;
// Upload the created data dir to S3
if let Some(storage) = &self.remote_storage {
let pgdata_zstd = import_datadir::create_tar_zst(&pgdata_path).await?;
let pgdata_zstd = Bytes::from(pgdata_zstd);
backoff::retry(
|| async {
self::remote_timeline_client::upload_initdb_dir(
storage,
&self.tenant_shard_id.tenant_id,
&timeline_id,
pgdata_zstd.clone(),
)
if initdb_tar_zst_path.exists() {
tokio::fs::remove_file(&initdb_tar_zst_path)
.await
},
|_| false,
3,
u32::MAX,
"persist_initdb_tar_zst",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await?;
.context("tempfile removal")?;
}
} else {
// Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
// Upload the created data dir to S3
if let Some(storage) = &self.remote_storage {
let pgdata_zstd = import_datadir::create_tar_zst(&pgdata_path).await?;
let pgdata_zstd = Bytes::from(pgdata_zstd);
backoff::retry(
|| async {
self::remote_timeline_client::upload_initdb_dir(
storage,
&self.tenant_shard_id.tenant_id,
&timeline_id,
pgdata_zstd.clone(),
)
.await
},
|_| false,
3,
u32::MAX,
"persist_initdb_tar_zst",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await?;
}
}
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
// Import the contents of the data directory at the initial checkpoint
// LSN, and any WAL after that.

View File

@@ -188,6 +188,7 @@ use anyhow::Context;
use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
pub(crate) use download::download_initdb_tar_zst;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
@@ -1077,7 +1078,17 @@ impl RemoteTimelineClient {
let remaining_layers: Vec<RemotePath> = remaining
.into_iter()
.filter(|p| p!= &latest_index)
.filter(|p| {
if p == &latest_index {
return false;
}
if let Some(name) = p.object_name() {
if name == INITDB_PATH {
return false;
}
}
true
})
.inspect(|path| {
if let Some(name) = path.object_name() {
info!(%name, "deleting a file not referenced from index_part.json");

View File

@@ -8,11 +8,12 @@ use std::future::Future;
use std::time::Duration;
use anyhow::{anyhow, Context};
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use utils::{backoff, crashsafe};
use crate::config::PageServerConf;
@@ -20,14 +21,15 @@ use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::Generation;
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::TimelineId;
use super::index::{IndexPart, LayerFileMetadata};
use super::{
parse_remote_index_path, remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
};
static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
@@ -374,6 +376,69 @@ pub(super) async fn download_index_part(
}
}
pub(crate) async fn download_initdb_tar_zst(
conf: &'static PageServerConf,
storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Result<(Utf8PathBuf, File), DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
let timeline_path = conf.timelines_path(tenant_shard_id);
if !timeline_path.exists() {
tokio::fs::create_dir_all(&timeline_path)
.await
.with_context(|| format!("timeline dir creation {timeline_path}"))
.map_err(DownloadError::Other)?;
}
let temp_path = timeline_path.join(format!("{INITDB_PATH}-{timeline_id}.{TEMP_FILE_SUFFIX}"));
let file = download_retry(
|| async {
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&temp_path)
.await
.with_context(|| format!("tempfile creation {temp_path}"))
.map_err(DownloadError::Other)?;
let mut download = storage.download(&remote_path).await?;
tokio::io::copy(&mut download.download_stream, &mut file)
.await
.with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
.map_err(DownloadError::Other)?;
file.seek(std::io::SeekFrom::Start(0))
.await
.with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
.map_err(DownloadError::Other)?;
Ok(file)
},
&format!("download {remote_path}"),
)
.await
.map_err(|e| {
if temp_path.exists() {
// Do a best-effort attempt at deleting the temporary file upon encountering an error.
// We don't have async here nor do we want to pile on any extra errors.
if let Err(e) = std::fs::remove_file(&temp_path) {
warn!("error deleting temporary file {temp_path}: {e}");
}
}
e
})?;
Ok((temp_path, file))
}
/// Helper function to handle retries for a download operation.
///
/// Remote operations can fail due to rate limits (IAM, S3), spurious network

View File

@@ -2115,7 +2115,7 @@ mod tests {
.load()
.await;
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
.await
.unwrap();

View File

@@ -362,12 +362,16 @@ class PageserverHttpClient(requests.Session):
new_timeline_id: TimelineId,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
existing_initdb_timeline_id: Optional[TimelineId] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id),
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
"existing_initdb_timeline_id": str(existing_initdb_timeline_id)
if existing_initdb_timeline_id
else None,
}
if pg_version != PgVersion.NOT_SET:
body["pg_version"] = int(pg_version)

View File

@@ -1,7 +1,7 @@
import time
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef
from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
@@ -235,10 +235,14 @@ if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
def assert_prefix_empty(
neon_env_builder: "NeonEnvBuilder",
prefix: Optional[str] = None,
allowed_postfix: Optional[str] = None,
):
response = list_prefix(neon_env_builder, prefix)
keys = response["KeyCount"]
objects = response.get("Contents", [])
objects: List[ObjectTypeDef] = response.get("Contents", [])
common_prefixes = response.get("CommonPrefixes", [])
remote_storage = neon_env_builder.pageserver_remote_storage
@@ -261,7 +265,18 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
)
assert keys == 0, f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
filtered_count = 0
if allowed_postfix is None:
filtered_count = len(objects)
else:
for _obj in objects:
key: str = str(response.get("Key", []))
if not (allowed_postfix.endswith(key)):
filtered_count += 1
assert (
filtered_count == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
def assert_prefix_not_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):

View File

@@ -603,7 +603,12 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
remote_timeline_path = env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id)
assert not list(remote_timeline_path.iterdir())
filtered = [
path
for path in remote_timeline_path.iterdir()
if not (path.name.endswith("initdb.tar.zst"))
]
assert len(filtered) == 0
# timeline deletion should kill ongoing uploads, so, the metric will be gone
assert get_queued_count(file_kind="index", op_kind="upload") is None

View File

@@ -197,6 +197,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
# So by ignoring these instead of waiting for empty upload queue
# we execute more distinct code paths.
'.*stopping left-over name="remote upload".*',
".*Failed to load index_part from remote storage, failed creation?.*",
]
)
@@ -285,6 +286,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
str(tenant_id),
)
),
allowed_postfix="initdb.tar.zst",
)

View File

@@ -290,10 +290,13 @@ def test_pageserver_with_empty_tenants(
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
env.pageserver.allowed_errors.extend(
[
".*marking .* as locally complete, while it doesnt exist in remote index.*",
".*Failed to load index_part from remote storage, failed creation?.*",
".*load failed.*list timelines directory.*",
]
)
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
client = env.pageserver.http_client()

View File

@@ -230,6 +230,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
env.pageserver.allowed_errors.append(".*Timeline dir entry become invalid.*")
# In one of the branches we poll for tenant to become active. Polls can generate this log message:
env.pageserver.allowed_errors.append(f".*Tenant {env.initial_tenant} is not active*")
env.pageserver.allowed_errors.append(
".*Failed to load index_part from remote storage, failed creation?.*"
)
ps_http.configure_failpoints((failpoint, "return"))
@@ -308,8 +311,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
)
timeline_dir = env.pageserver.timeline_dir(env.initial_tenant, timeline_id)
# Check local is empty
assert not timeline_dir.exists()
if failpoint != "timeline-delete-after-index-delete":
# Check local is empty
assert (not timeline_dir.exists()) or len(os.listdir(timeline_dir)) == 0
# Check no delete mark present
assert not (timeline_dir.parent / f"{timeline_id}.___deleted").exists()

View File

@@ -1,6 +1,7 @@
import sys
import tarfile
import tempfile
import time
from pathlib import Path
import pytest
@@ -125,3 +126,43 @@ def test_wal_restore_initdb(
)
log.info(f"original lsn: {original_lsn}, restored lsn: {restored_lsn}")
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
def test_wal_restore_http(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("create table t as select generate_series(1,300000)")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps_client = env.pageserver.http_client()
# shut down the endpoint and delete the timeline from the pageserver
endpoint.stop()
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
test_output_dir / "initdb.tar.zst"
(env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / "initdb.tar.zst")
ps_client.timeline_delete(tenant_id, timeline_id)
time.sleep(2)
# verify that it is indeed deleted
# TODO
# issue the restoration command
ps_client.timeline_create(
tenant_id=tenant_id,
new_timeline_id=timeline_id,
existing_initdb_timeline_id=timeline_id,
pg_version=env.pg_version,
)
# the table is back now!
restored = env.endpoints.create_start("main")
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]