From 450be26bbbf2e39dd2184faed8e6601072006fb1 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 22 Nov 2024 23:47:06 +0100 Subject: [PATCH] fast imports: initial Importer and Storage changes (#9218) Co-authored-by: Heikki Linnakangas Co-authored-by: Stas Kelvic # Context This PR contains PoC-level changes for a product feature that allows onboarding large databases into Neon without going through the regular data path. # Changes This internal RFC provides all the context * https://github.com/neondatabase/cloud/pull/19799 In the language of the RFC, this PR covers * the Importer code (`fast_import`) * all the Pageserver changes (mgmt API changes, flow implementation, etc) * a basic test for the Pageserver changes # Reviewing As acknowledged in the RFC, the code added in this PR is not ready for general availability. Also, the **architecture is not to be discussed in this PR**, but in the RFC and associated Slack channel instead. Reviewers of this PR should take that into consideration. The quality bar to apply during review depends on what area of the code is being reviewed: * Importer code (`fast_import`): practically anything goes * Core flow (`flow.rs`): * Malicious input data must be expected and the existing threat models apply. * The code must not be safe to execute on *dedicated* Pageserver instances: * This means in particular that tenants *on other* Pageserver instances must not be affected negatively wrt data confidentiality, integrity or availability. * Other code: the usual quality bar * Pay special attention to correct use of gate guards, timeline cancellation in all places during shutdown & migration, etc. * Consider the broader system impact; if you find potentially problematic interactions with Storage features that were not covered in the RFC, bring that up during the review. I recommend submitting three separate reviews, for the three high-level areas with different quality bars. # References (Internal-only) * refs https://github.com/neondatabase/cloud/issues/17507 * refs https://github.com/neondatabase/company_projects/issues/293 * refs https://github.com/neondatabase/company_projects/issues/309 * refs https://github.com/neondatabase/cloud/issues/20646 --------- Co-authored-by: Stas Kelvich Co-authored-by: Heikki Linnakangas Co-authored-by: John Spray --- Cargo.lock | 47 +- Cargo.toml | 8 +- compute/compute-node.Dockerfile | 23 +- compute_tools/Cargo.toml | 7 + compute_tools/src/bin/fast_import.rs | 338 ++++++++ .../src/bin/fast_import/child_stdio_to_log.rs | 35 + compute_tools/src/bin/fast_import/s3_uri.rs | 75 ++ compute_tools/src/bin/fast_import/s5cmd.rs | 27 + control_plane/src/bin/neon_local.rs | 1 + libs/pageserver_api/Cargo.toml | 1 + libs/pageserver_api/src/config.rs | 13 + libs/pageserver_api/src/keyspace.rs | 4 +- libs/pageserver_api/src/models.rs | 41 + libs/postgres_initdb/Cargo.toml | 12 + libs/postgres_initdb/src/lib.rs | 103 +++ pageserver/Cargo.toml | 2 + pageserver/src/config.rs | 10 + pageserver/src/http/openapi_spec.yml | 30 + pageserver/src/http/routes.rs | 31 + pageserver/src/pgdatadir_mapping.rs | 16 +- pageserver/src/task_mgr.rs | 2 + pageserver/src/tenant.rs | 577 ++++++++++--- .../src/tenant/remote_timeline_client.rs | 13 + .../tenant/remote_timeline_client/download.rs | 2 +- .../tenant/remote_timeline_client/index.rs | 100 ++- pageserver/src/tenant/timeline.rs | 28 +- .../src/tenant/timeline/import_pgdata.rs | 218 +++++ .../src/tenant/timeline/import_pgdata/flow.rs | 798 ++++++++++++++++++ .../import_pgdata/importbucket_client.rs | 315 +++++++ .../import_pgdata/importbucket_format.rs | 20 + .../import_pgdata/index_part_format.rs | 68 ++ .../timeline/import_pgdata/upcall_api.rs | 119 +++ pageserver/src/tenant/timeline/uninit.rs | 36 +- test_runner/fixtures/common_types.py | 23 + test_runner/fixtures/neon_fixtures.py | 14 + test_runner/fixtures/pageserver/http.py | 76 +- test_runner/fixtures/utils.py | 7 + test_runner/regress/test_import_pgdata.py | 307 +++++++ workspace_hack/Cargo.toml | 3 +- 39 files changed, 3368 insertions(+), 182 deletions(-) create mode 100644 compute_tools/src/bin/fast_import.rs create mode 100644 compute_tools/src/bin/fast_import/child_stdio_to_log.rs create mode 100644 compute_tools/src/bin/fast_import/s3_uri.rs create mode 100644 compute_tools/src/bin/fast_import/s5cmd.rs create mode 100644 libs/postgres_initdb/Cargo.toml create mode 100644 libs/postgres_initdb/src/lib.rs create mode 100644 pageserver/src/tenant/timeline/import_pgdata.rs create mode 100644 pageserver/src/tenant/timeline/import_pgdata/flow.rs create mode 100644 pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs create mode 100644 pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs create mode 100644 pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs create mode 100644 pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs create mode 100644 test_runner/regress/test_import_pgdata.py diff --git a/Cargo.lock b/Cargo.lock index a25fa89c77..665aa4aecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -374,6 +374,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-kms" +version = "1.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "564a597a3c71a957d60a2e4c62c93d78ee5a0d636531e15b760acad983a5c18e" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-s3" version = "1.52.0" @@ -590,9 +612,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87" +checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1235,6 +1257,10 @@ name = "compute_tools" version = "0.1.0" dependencies = [ "anyhow", + "aws-config", + "aws-sdk-kms", + "aws-sdk-s3", + "base64 0.13.1", "bytes", "camino", "cfg-if", @@ -1252,13 +1278,16 @@ dependencies = [ "opentelemetry", "opentelemetry_sdk", "postgres", + "postgres_initdb", "prometheus", "regex", "remote_storage", "reqwest 0.12.4", "rlimit", "rust-ini", + "serde", "serde_json", + "serde_with", "signal-hook", "tar", "thiserror", @@ -3712,6 +3741,7 @@ dependencies = [ "num_cpus", "once_cell", "pageserver_api", + "pageserver_client", "pageserver_compaction", "pin-project-lite", "postgres", @@ -3720,6 +3750,7 @@ dependencies = [ "postgres_backend", "postgres_connection", "postgres_ffi", + "postgres_initdb", "pq_proto", "procfs", "rand 0.8.5", @@ -4195,6 +4226,17 @@ dependencies = [ "utils", ] +[[package]] +name = "postgres_initdb" +version = "0.1.0" +dependencies = [ + "anyhow", + "camino", + "thiserror", + "tokio", + "workspace_hack", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -7504,6 +7546,7 @@ dependencies = [ "anyhow", "axum", "axum-core", + "base64 0.13.1", "base64 0.21.1", "base64ct", "bytes", diff --git a/Cargo.toml b/Cargo.toml index aac19a4122..e3dc5b97f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "libs/vm_monitor", "libs/walproposer", "libs/wal_decoder", + "libs/postgres_initdb", ] [workspace.package] @@ -57,6 +58,7 @@ async-trait = "0.1" aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] } aws-sdk-s3 = "1.52" aws-sdk-iam = "1.46.0" +aws-sdk-kms = "1.47.0" aws-smithy-async = { version = "1.2.1", default-features = false, features=["rt-tokio"] } aws-smithy-types = "1.2" aws-credential-types = "1.2.0" @@ -73,7 +75,7 @@ bytes = "1.0" camino = "1.1.6" cfg-if = "1.0.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } -clap = { version = "4.0", features = ["derive"] } +clap = { version = "4.0", features = ["derive", "env"] } comfy-table = "7.1" const_format = "0.2" crc32c = "0.6" @@ -154,7 +156,7 @@ sentry = { version = "0.32", default-features = false, features = ["backtrace", serde = { version = "1.0", features = ["derive"] } serde_json = "1" serde_path_to_error = "0.1" -serde_with = "2.0" +serde_with = { version = "2.0", features = [ "base64" ] } serde_assert = "0.5.0" sha2 = "0.10.2" signal-hook = "0.3" @@ -213,12 +215,14 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", br compute_api = { version = "0.1", path = "./libs/compute_api/" } consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } +pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" } postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" } +postgres_initdb = { path = "./libs/postgres_initdb" } pq_proto = { version = "0.1", path = "./libs/pq_proto/" } remote_storage = { version = "0.1", path = "./libs/remote_storage/" } safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" } diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 32405ece86..7c21c67a0a 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -1243,7 +1243,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \ ######################################################################################### # -# Compile and run the Neon-specific `compute_ctl` binary +# Compile and run the Neon-specific `compute_ctl` and `fast_import` binaries # ######################################################################################### FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools @@ -1264,6 +1264,7 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de FROM debian:$DEBIAN_FLAVOR AS compute-tools-image COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl +COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import ######################################################################################### # @@ -1458,6 +1459,7 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \ COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl +COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import # pgbouncer and its config COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer @@ -1533,6 +1535,25 @@ RUN apt update && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 +# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2 +# used by fast_import +ARG TARGETARCH +ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb +RUN set -ex; \ + \ + # Determine the expected checksum based on TARGETARCH + if [ "${TARGETARCH}" = "amd64" ]; then \ + CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \ + elif [ "${TARGETARCH}" = "arm64" ]; then \ + CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \ + else \ + echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \ + fi; \ + \ + # Compute and validate the checksum + echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c - +RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb + ENV LANG=en_US.utf8 USER postgres ENTRYPOINT ["/usr/local/bin/compute_ctl"] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 0bf4ed53d6..c0c390caef 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -10,6 +10,10 @@ default = [] testing = [] [dependencies] +base64.workspace = true +aws-config.workspace = true +aws-sdk-s3.workspace = true +aws-sdk-kms.workspace = true anyhow.workspace = true camino.workspace = true chrono.workspace = true @@ -27,6 +31,8 @@ opentelemetry.workspace = true opentelemetry_sdk.workspace = true postgres.workspace = true regex.workspace = true +serde.workspace = true +serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true tar.workspace = true @@ -43,6 +49,7 @@ thiserror.workspace = true url.workspace = true prometheus.workspace = true +postgres_initdb.workspace = true compute_api.workspace = true utils.workspace = true workspace_hack.workspace = true diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs new file mode 100644 index 0000000000..3b0b990df2 --- /dev/null +++ b/compute_tools/src/bin/fast_import.rs @@ -0,0 +1,338 @@ +//! This program dumps a remote Postgres database into a local Postgres database +//! and uploads the resulting PGDATA into object storage for import into a Timeline. +//! +//! # Context, Architecture, Design +//! +//! See cloud.git Fast Imports RFC () +//! for the full picture. +//! The RFC describing the storage pieces of importing the PGDATA dump into a Timeline +//! is publicly accessible at . +//! +//! # This is a Prototype! +//! +//! This program is part of a prototype feature and not yet used in production. +//! +//! The cloud.git RFC contains lots of suggestions for improving e2e throughput +//! of this step of the timeline import process. +//! +//! # Local Testing +//! +//! - Comment out most of the pgxns in The Dockerfile.compute-tools to speed up the build. +//! - Build the image with the following command: +//! +//! ```bash +//! docker buildx build --build-arg DEBIAN_FLAVOR=bullseye-slim --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/Dockerfile.com +//! docker push localhost:3030/localregistry/compute-node-v14:latest +//! ``` + +use anyhow::Context; +use aws_config::BehaviorVersion; +use camino::{Utf8Path, Utf8PathBuf}; +use clap::Parser; +use nix::unistd::Pid; +use tracing::{info, info_span, warn, Instrument}; +use utils::fs_ext::is_directory_empty; + +#[path = "fast_import/child_stdio_to_log.rs"] +mod child_stdio_to_log; +#[path = "fast_import/s3_uri.rs"] +mod s3_uri; +#[path = "fast_import/s5cmd.rs"] +mod s5cmd; + +#[derive(clap::Parser)] +struct Args { + #[clap(long)] + working_directory: Utf8PathBuf, + #[clap(long, env = "NEON_IMPORTER_S3_PREFIX")] + s3_prefix: s3_uri::S3Uri, + #[clap(long)] + pg_bin_dir: Utf8PathBuf, + #[clap(long)] + pg_lib_dir: Utf8PathBuf, +} + +#[serde_with::serde_as] +#[derive(serde::Deserialize)] +struct Spec { + encryption_secret: EncryptionSecret, + #[serde_as(as = "serde_with::base64::Base64")] + source_connstring_ciphertext_base64: Vec, +} + +#[derive(serde::Deserialize)] +enum EncryptionSecret { + #[allow(clippy::upper_case_acronyms)] + KMS { key_id: String }, +} + +#[tokio::main] +pub(crate) async fn main() -> anyhow::Result<()> { + utils::logging::init( + utils::logging::LogFormat::Plain, + utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter, + utils::logging::Output::Stdout, + )?; + + info!("starting"); + + let Args { + working_directory, + s3_prefix, + pg_bin_dir, + pg_lib_dir, + } = Args::parse(); + + let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; + + let spec: Spec = { + let spec_key = s3_prefix.append("/spec.json"); + let s3_client = aws_sdk_s3::Client::new(&aws_config); + let object = s3_client + .get_object() + .bucket(&spec_key.bucket) + .key(spec_key.key) + .send() + .await + .context("get spec from s3")? + .body + .collect() + .await + .context("download spec body")?; + serde_json::from_slice(&object.into_bytes()).context("parse spec as json")? + }; + + match tokio::fs::create_dir(&working_directory).await { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + if !is_directory_empty(&working_directory) + .await + .context("check if working directory is empty")? + { + anyhow::bail!("working directory is not empty"); + } else { + // ok + } + } + Err(e) => return Err(anyhow::Error::new(e).context("create working directory")), + } + + let pgdata_dir = working_directory.join("pgdata"); + tokio::fs::create_dir(&pgdata_dir) + .await + .context("create pgdata directory")?; + + // + // Setup clients + // + let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; + let kms_client = aws_sdk_kms::Client::new(&aws_config); + + // + // Initialize pgdata + // + let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded + postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs { + superuser, + locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded, + pg_version: 140000, // XXX: this shouldn't be hard-coded but derived from which compute image we're running in + initdb_bin: pg_bin_dir.join("initdb").as_ref(), + library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local. + pgdata: &pgdata_dir, + }) + .await + .context("initdb")?; + + let nproc = num_cpus::get(); + + // + // Launch postgres process + // + let mut postgres_proc = tokio::process::Command::new(pg_bin_dir.join("postgres")) + .arg("-D") + .arg(&pgdata_dir) + .args(["-c", "wal_level=minimal"]) + .args(["-c", "shared_buffers=10GB"]) + .args(["-c", "max_wal_senders=0"]) + .args(["-c", "fsync=off"]) + .args(["-c", "full_page_writes=off"]) + .args(["-c", "synchronous_commit=off"]) + .args(["-c", "maintenance_work_mem=8388608"]) + .args(["-c", &format!("max_parallel_maintenance_workers={nproc}")]) + .args(["-c", &format!("max_parallel_workers={nproc}")]) + .args(["-c", &format!("max_parallel_workers_per_gather={nproc}")]) + .args(["-c", &format!("max_worker_processes={nproc}")]) + .args(["-c", "effective_io_concurrency=100"]) + .env_clear() + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .context("spawn postgres")?; + + info!("spawned postgres, waiting for it to become ready"); + tokio::spawn( + child_stdio_to_log::relay_process_output( + postgres_proc.stdout.take(), + postgres_proc.stderr.take(), + ) + .instrument(info_span!("postgres")), + ); + let restore_pg_connstring = + format!("host=localhost port=5432 user={superuser} dbname=postgres"); + loop { + let res = tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await; + if res.is_ok() { + info!("postgres is ready, could connect to it"); + break; + } + } + + // + // Decrypt connection string + // + let source_connection_string = { + match spec.encryption_secret { + EncryptionSecret::KMS { key_id } => { + let mut output = kms_client + .decrypt() + .key_id(key_id) + .ciphertext_blob(aws_sdk_s3::primitives::Blob::new( + spec.source_connstring_ciphertext_base64, + )) + .send() + .await + .context("decrypt source connection string")?; + let plaintext = output + .plaintext + .take() + .context("get plaintext source connection string")?; + String::from_utf8(plaintext.into_inner()) + .context("parse source connection string as utf8")? + } + } + }; + + // + // Start the work + // + + let dumpdir = working_directory.join("dumpdir"); + + let common_args = [ + // schema mapping (prob suffices to specify them on one side) + "--no-owner".to_string(), + "--no-privileges".to_string(), + "--no-publications".to_string(), + "--no-security-labels".to_string(), + "--no-subscriptions".to_string(), + "--no-tablespaces".to_string(), + // format + "--format".to_string(), + "directory".to_string(), + // concurrency + "--jobs".to_string(), + num_cpus::get().to_string(), + // progress updates + "--verbose".to_string(), + ]; + + info!("dump into the working directory"); + { + let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump")) + .args(&common_args) + .arg("-f") + .arg(&dumpdir) + .arg("--no-sync") + // POSITIONAL args + // source db (db name included in connection string) + .arg(&source_connection_string) + // how we run it + .env_clear() + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .context("spawn pg_dump")?; + + info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump"); + + tokio::spawn( + child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take()) + .instrument(info_span!("pg_dump")), + ); + + let st = pg_dump.wait().await.context("wait for pg_dump")?; + info!(status=?st, "pg_dump exited"); + if !st.success() { + warn!(status=%st, "pg_dump failed, restore will likely fail as well"); + } + } + + // TODO: do it in a streaming way, plenty of internal research done on this already + // TODO: do the unlogged table trick + + info!("restore from working directory into vanilla postgres"); + { + let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore")) + .args(&common_args) + .arg("-d") + .arg(&restore_pg_connstring) + // POSITIONAL args + .arg(&dumpdir) + // how we run it + .env_clear() + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .context("spawn pg_restore")?; + + info!(pid=%pg_restore.id().unwrap(), "spawned pg_restore"); + tokio::spawn( + child_stdio_to_log::relay_process_output( + pg_restore.stdout.take(), + pg_restore.stderr.take(), + ) + .instrument(info_span!("pg_restore")), + ); + let st = pg_restore.wait().await.context("wait for pg_restore")?; + info!(status=?st, "pg_restore exited"); + if !st.success() { + warn!(status=%st, "pg_restore failed, restore will likely fail as well"); + } + } + + info!("shutdown postgres"); + { + nix::sys::signal::kill( + Pid::from_raw( + i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"), + ), + nix::sys::signal::SIGTERM, + ) + .context("signal postgres to shut down")?; + postgres_proc + .wait() + .await + .context("wait for postgres to shut down")?; + } + + info!("upload pgdata"); + s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/")) + .await + .context("sync dump directory to destination")?; + + info!("write status"); + { + let status_dir = working_directory.join("status"); + std::fs::create_dir(&status_dir).context("create status directory")?; + let status_file = status_dir.join("status"); + std::fs::write(&status_file, serde_json::json!({"done": true}).to_string()) + .context("write status file")?; + s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata")) + .await + .context("sync status directory to destination")?; + } + + Ok(()) +} diff --git a/compute_tools/src/bin/fast_import/child_stdio_to_log.rs b/compute_tools/src/bin/fast_import/child_stdio_to_log.rs new file mode 100644 index 0000000000..6724ef9bed --- /dev/null +++ b/compute_tools/src/bin/fast_import/child_stdio_to_log.rs @@ -0,0 +1,35 @@ +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{ChildStderr, ChildStdout}; +use tracing::info; + +/// Asynchronously relays the output from a child process's `stdout` and `stderr` to the tracing log. +/// Each line is read and logged individually, with lossy UTF-8 conversion. +/// +/// # Arguments +/// +/// * `stdout`: An `Option` from the child process. +/// * `stderr`: An `Option` from the child process. +/// +pub(crate) async fn relay_process_output(stdout: Option, stderr: Option) { + let stdout_fut = async { + if let Some(stdout) = stdout { + let reader = BufReader::new(stdout); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + info!(fd = "stdout", "{}", line); + } + } + }; + + let stderr_fut = async { + if let Some(stderr) = stderr { + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + info!(fd = "stderr", "{}", line); + } + } + }; + + tokio::join!(stdout_fut, stderr_fut); +} diff --git a/compute_tools/src/bin/fast_import/s3_uri.rs b/compute_tools/src/bin/fast_import/s3_uri.rs new file mode 100644 index 0000000000..52bbef420f --- /dev/null +++ b/compute_tools/src/bin/fast_import/s3_uri.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use std::str::FromStr; + +/// Struct to hold parsed S3 components +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct S3Uri { + pub bucket: String, + pub key: String, +} + +impl FromStr for S3Uri { + type Err = anyhow::Error; + + /// Parse an S3 URI into a bucket and key + fn from_str(uri: &str) -> Result { + // Ensure the URI starts with "s3://" + if !uri.starts_with("s3://") { + return Err(anyhow::anyhow!("Invalid S3 URI scheme")); + } + + // Remove the "s3://" prefix + let stripped_uri = &uri[5..]; + + // Split the remaining string into bucket and key parts + if let Some((bucket, key)) = stripped_uri.split_once('/') { + Ok(S3Uri { + bucket: bucket.to_string(), + key: key.to_string(), + }) + } else { + Err(anyhow::anyhow!( + "Invalid S3 URI format, missing bucket or key" + )) + } + } +} + +impl S3Uri { + pub fn append(&self, suffix: &str) -> Self { + Self { + bucket: self.bucket.clone(), + key: format!("{}{}", self.key, suffix), + } + } +} + +impl std::fmt::Display for S3Uri { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "s3://{}/{}", self.bucket, self.key) + } +} + +impl clap::builder::TypedValueParser for S3Uri { + type Value = Self; + + fn parse_ref( + &self, + _cmd: &clap::Command, + _arg: Option<&clap::Arg>, + value: &std::ffi::OsStr, + ) -> Result { + let value_str = value.to_str().ok_or_else(|| { + clap::Error::raw( + clap::error::ErrorKind::InvalidUtf8, + "Invalid UTF-8 sequence", + ) + })?; + S3Uri::from_str(value_str).map_err(|e| { + clap::Error::raw( + clap::error::ErrorKind::InvalidValue, + format!("Failed to parse S3 URI: {}", e), + ) + }) + } +} diff --git a/compute_tools/src/bin/fast_import/s5cmd.rs b/compute_tools/src/bin/fast_import/s5cmd.rs new file mode 100644 index 0000000000..d2d9a79736 --- /dev/null +++ b/compute_tools/src/bin/fast_import/s5cmd.rs @@ -0,0 +1,27 @@ +use anyhow::Context; +use camino::Utf8Path; + +use super::s3_uri::S3Uri; + +pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> { + let mut builder = tokio::process::Command::new("s5cmd"); + // s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL + if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") { + builder.arg("--endpoint-url").arg(val); + } + builder + .arg("sync") + .arg(local.as_str()) + .arg(remote.to_string()); + let st = builder + .spawn() + .context("spawn s5cmd")? + .wait() + .await + .context("wait for s5cmd")?; + if st.success() { + Ok(()) + } else { + Err(anyhow::anyhow!("s5cmd failed")) + } +} diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index c4063bbd1a..1ea443b026 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1153,6 +1153,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re timeline_info.timeline_id ); } + // TODO: rename to import-basebackup-plus-wal TimelineCmd::Import(args) => { let tenant_id = get_tenant_id(args.tenant_id, env)?; let timeline_id = args.timeline_id; diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 8710904cec..79da05da6c 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -33,6 +33,7 @@ remote_storage.workspace = true postgres_backend.workspace = true nix = {workspace = true, optional = true} reqwest.workspace = true +rand.workspace = true [dev-dependencies] bincode.workspace = true diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index ee20613d6d..7666728427 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -97,6 +97,15 @@ pub struct ConfigToml { pub control_plane_api: Option, pub control_plane_api_token: Option, pub control_plane_emergency_mode: bool, + /// Unstable feature: subject to change or removal without notice. + /// See . + pub import_pgdata_upcall_api: Option, + /// Unstable feature: subject to change or removal without notice. + /// See . + pub import_pgdata_upcall_api_token: Option, + /// Unstable feature: subject to change or removal without notice. + /// See . + pub import_pgdata_aws_endpoint_url: Option, pub heatmap_upload_concurrency: usize, pub secondary_download_concurrency: usize, pub virtual_file_io_engine: Option, @@ -386,6 +395,10 @@ impl Default for ConfigToml { control_plane_api_token: (None), control_plane_emergency_mode: (false), + import_pgdata_upcall_api: (None), + import_pgdata_upcall_api_token: (None), + import_pgdata_aws_endpoint_url: (None), + heatmap_upload_concurrency: (DEFAULT_HEATMAP_UPLOAD_CONCURRENCY), secondary_download_concurrency: (DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY), diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 401887d362..c55b9e9484 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -48,7 +48,7 @@ pub struct ShardedRange<'a> { // Calculate the size of a range within the blocks of the same relation, or spanning only the // top page in the previous relation's space. -fn contiguous_range_len(range: &Range) -> u32 { +pub fn contiguous_range_len(range: &Range) -> u32 { debug_assert!(is_contiguous_range(range)); if range.start.field6 == 0xffffffff { range.end.field6 + 1 @@ -67,7 +67,7 @@ fn contiguous_range_len(range: &Range) -> u32 { /// This matters, because: /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse. /// - Within such ranges, we may calculate distances using simple subtraction of field6. -fn is_contiguous_range(range: &Range) -> bool { +pub fn is_contiguous_range(range: &Range) -> bool { range.start.field1 == range.end.field1 && range.start.field2 == range.end.field2 && range.start.field3 == range.end.field3 diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 0dfa1ba817..1b86bfd91a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -2,6 +2,8 @@ pub mod detach_ancestor; pub mod partitioning; pub mod utilization; +#[cfg(feature = "testing")] +use camino::Utf8PathBuf; pub use utilization::PageserverUtilization; use std::{ @@ -227,6 +229,9 @@ pub enum TimelineCreateRequestMode { // we continue to accept it by having it here. pg_version: Option, }, + ImportPgdata { + import_pgdata: TimelineCreateRequestModeImportPgdata, + }, // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap. // (serde picks the first matching enum variant, in declaration order). Bootstrap { @@ -236,6 +241,42 @@ pub enum TimelineCreateRequestMode { }, } +#[derive(Serialize, Deserialize, Clone)] +pub struct TimelineCreateRequestModeImportPgdata { + pub location: ImportPgdataLocation, + pub idempotency_key: ImportPgdataIdempotencyKey, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ImportPgdataLocation { + #[cfg(feature = "testing")] + LocalFs { path: Utf8PathBuf }, + AwsS3 { + region: String, + bucket: String, + /// A better name for this would be `prefix`; changing requires coordination with cplane. + /// See . + key: String, + }, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(transparent)] +pub struct ImportPgdataIdempotencyKey(pub String); + +impl ImportPgdataIdempotencyKey { + pub fn random() -> Self { + use rand::{distributions::Alphanumeric, Rng}; + Self( + rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(20) + .map(char::from) + .collect(), + ) + } +} + #[derive(Serialize, Deserialize, Clone)] pub struct LsnLeaseRequest { pub lsn: Lsn, diff --git a/libs/postgres_initdb/Cargo.toml b/libs/postgres_initdb/Cargo.toml new file mode 100644 index 0000000000..1605279bce --- /dev/null +++ b/libs/postgres_initdb/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "postgres_initdb" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +tokio.workspace = true +camino.workspace = true +thiserror.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/postgres_initdb/src/lib.rs b/libs/postgres_initdb/src/lib.rs new file mode 100644 index 0000000000..2f072354fb --- /dev/null +++ b/libs/postgres_initdb/src/lib.rs @@ -0,0 +1,103 @@ +//! The canonical way we run `initdb` in Neon. +//! +//! initdb has implicit defaults that are dependent on the environment, e.g., locales & collations. +//! +//! This module's job is to eliminate the environment-dependence as much as possible. + +use std::fmt; + +use camino::Utf8Path; + +pub struct RunInitdbArgs<'a> { + pub superuser: &'a str, + pub locale: &'a str, + pub initdb_bin: &'a Utf8Path, + pub pg_version: u32, + pub library_search_path: &'a Utf8Path, + pub pgdata: &'a Utf8Path, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + Spawn(std::io::Error), + Failed { + status: std::process::ExitStatus, + stderr: Vec, + }, + WaitOutput(std::io::Error), + Other(anyhow::Error), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Spawn(e) => write!(f, "Error spawning command: {:?}", e), + Error::Failed { status, stderr } => write!( + f, + "Command failed with status {:?}: {}", + status, + String::from_utf8_lossy(stderr) + ), + Error::WaitOutput(e) => write!(f, "Error waiting for command output: {:?}", e), + Error::Other(e) => write!(f, "Error: {:?}", e), + } + } +} + +pub async fn do_run_initdb(args: RunInitdbArgs<'_>) -> Result<(), Error> { + let RunInitdbArgs { + superuser, + locale, + initdb_bin: initdb_bin_path, + pg_version, + library_search_path, + pgdata, + } = args; + let mut initdb_command = tokio::process::Command::new(initdb_bin_path); + initdb_command + .args(["--pgdata", pgdata.as_ref()]) + .args(["--username", superuser]) + .args(["--encoding", "utf8"]) + .args(["--locale", locale]) + .arg("--no-instructions") + .arg("--no-sync") + .env_clear() + .env("LD_LIBRARY_PATH", library_search_path) + .env("DYLD_LIBRARY_PATH", library_search_path) + .stdin(std::process::Stdio::null()) + // stdout invocation produces the same output every time, we don't need it + .stdout(std::process::Stdio::null()) + // we would be interested in the stderr output, if there was any + .stderr(std::process::Stdio::piped()); + + // Before version 14, only the libc provide was available. + if pg_version > 14 { + // Version 17 brought with it a builtin locale provider which only provides + // C and C.UTF-8. While being safer for collation purposes since it is + // guaranteed to be consistent throughout a major release, it is also more + // performant. + let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" }; + + initdb_command.args(["--locale-provider", locale_provider]); + } + + let initdb_proc = initdb_command.spawn().map_err(Error::Spawn)?; + + // Ideally we'd select here with the cancellation token, but the problem is that + // we can't safely terminate initdb: it launches processes of its own, and killing + // initdb doesn't kill them. After we return from this function, we want the target + // directory to be able to be cleaned up. + // See https://github.com/neondatabase/neon/issues/6385 + let initdb_output = initdb_proc + .wait_with_output() + .await + .map_err(Error::WaitOutput)?; + if !initdb_output.status.success() { + return Err(Error::Failed { + status: initdb_output.status, + stderr: initdb_output.stderr, + }); + } + + Ok(()) +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 143d8236df..140b287ccc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -43,6 +43,7 @@ postgres.workspace = true postgres_backend.workspace = true postgres-protocol.workspace = true postgres-types.workspace = true +postgres_initdb.workspace = true rand.workspace = true range-set-blaze = { version = "0.1.16", features = ["alloc"] } regex.workspace = true @@ -68,6 +69,7 @@ url.workspace = true walkdir.workspace = true metrics.workspace = true pageserver_api.workspace = true +pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that pageserver_compaction.workspace = true postgres_connection.workspace = true postgres_ffi.workspace = true diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f7be6ecaab..59ea6fb941 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -144,6 +144,10 @@ pub struct PageServerConf { /// JWT token for use with the control plane API. pub control_plane_api_token: Option, + pub import_pgdata_upcall_api: Option, + pub import_pgdata_upcall_api_token: Option, + pub import_pgdata_aws_endpoint_url: Option, + /// If true, pageserver will make best-effort to operate without a control plane: only /// for use in major incidents. pub control_plane_emergency_mode: bool, @@ -328,6 +332,9 @@ impl PageServerConf { control_plane_api, control_plane_api_token, control_plane_emergency_mode, + import_pgdata_upcall_api, + import_pgdata_upcall_api_token, + import_pgdata_aws_endpoint_url, heatmap_upload_concurrency, secondary_download_concurrency, ingest_batch_size, @@ -383,6 +390,9 @@ impl PageServerConf { timeline_offloading, ephemeral_bytes_per_memory_kb, server_side_batch_timeout, + import_pgdata_upcall_api, + import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from), + import_pgdata_aws_endpoint_url, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 2bc7f5ad39..7fb9247feb 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -623,6 +623,8 @@ paths: existing_initdb_timeline_id: type: string format: hex + import_pgdata: + $ref: "#/components/schemas/TimelineCreateRequestImportPgdata" responses: "201": description: Timeline was created, or already existed with matching parameters @@ -979,6 +981,34 @@ components: $ref: "#/components/schemas/TenantConfig" effective_config: $ref: "#/components/schemas/TenantConfig" + TimelineCreateRequestImportPgdata: + type: object + required: + - location + - idempotency_key + properties: + idempotency_key: + type: string + location: + $ref: "#/components/schemas/TimelineCreateRequestImportPgdataLocation" + TimelineCreateRequestImportPgdataLocation: + type: object + properties: + AwsS3: + $ref: "#/components/schemas/TimelineCreateRequestImportPgdataLocationAwsS3" + TimelineCreateRequestImportPgdataLocationAwsS3: + type: object + properties: + region: + type: string + bucket: + type: string + key: + type: string + required: + - region + - bucket + - key TimelineInfo: type: object required: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7168850ed6..ceb1c3b012 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -40,6 +40,7 @@ use pageserver_api::models::TenantSorting; use pageserver_api::models::TenantState; use pageserver_api::models::TimelineArchivalConfigRequest; use pageserver_api::models::TimelineCreateRequestMode; +use pageserver_api::models::TimelineCreateRequestModeImportPgdata; use pageserver_api::models::TimelinesInfoAndOffloaded; use pageserver_api::models::TopTenantShardItem; use pageserver_api::models::TopTenantShardsRequest; @@ -81,6 +82,7 @@ use crate::tenant::secondary::SecondaryController; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::storage_layer::LayerName; +use crate::tenant::timeline::import_pgdata; use crate::tenant::timeline::offload::offload_timeline; use crate::tenant::timeline::offload::OffloadError; use crate::tenant::timeline::CompactFlags; @@ -580,6 +582,35 @@ async fn timeline_create_handler( ancestor_timeline_id, ancestor_start_lsn, }), + TimelineCreateRequestMode::ImportPgdata { + import_pgdata: + TimelineCreateRequestModeImportPgdata { + location, + idempotency_key, + }, + } => tenant::CreateTimelineParams::ImportPgdata(tenant::CreateTimelineParamsImportPgdata { + idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new( + idempotency_key.0, + ), + new_timeline_id, + location: { + use import_pgdata::index_part_format::Location; + use pageserver_api::models::ImportPgdataLocation; + match location { + #[cfg(feature = "testing")] + ImportPgdataLocation::LocalFs { path } => Location::LocalFs { path }, + ImportPgdataLocation::AwsS3 { + region, + bucket, + key, + } => Location::AwsS3 { + region, + bucket, + key, + }, + } + }, + }), }; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 5995d1cc57..f4f184be5a 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -2276,9 +2276,9 @@ impl<'a> Version<'a> { //--- Metadata structs stored in key-value pairs in the repository. #[derive(Debug, Serialize, Deserialize)] -struct DbDirectory { +pub(crate) struct DbDirectory { // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist) - dbdirs: HashMap<(Oid, Oid), bool>, + pub(crate) dbdirs: HashMap<(Oid, Oid), bool>, } // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of @@ -2287,8 +2287,8 @@ struct DbDirectory { // "pg_twophsae/0000000A000002E4". #[derive(Debug, Serialize, Deserialize)] -struct TwoPhaseDirectory { - xids: HashSet, +pub(crate) struct TwoPhaseDirectory { + pub(crate) xids: HashSet, } #[derive(Debug, Serialize, Deserialize)] @@ -2297,12 +2297,12 @@ struct TwoPhaseDirectoryV17 { } #[derive(Debug, Serialize, Deserialize, Default)] -struct RelDirectory { +pub(crate) struct RelDirectory { // Set of relations that exist. (relfilenode, forknum) // // TODO: Store it as a btree or radix tree or something else that spans multiple // key-value pairs, if you have a lot of relations - rels: HashSet<(Oid, u8)>, + pub(crate) rels: HashSet<(Oid, u8)>, } #[derive(Debug, Serialize, Deserialize)] @@ -2311,9 +2311,9 @@ struct RelSizeEntry { } #[derive(Debug, Serialize, Deserialize, Default)] -struct SlruSegmentDirectory { +pub(crate) struct SlruSegmentDirectory { // Set of SLRU segments that exist. - segments: HashSet, + pub(crate) segments: HashSet, } #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)] diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 6a4e90dd55..622738022a 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -381,6 +381,8 @@ pub enum TaskKind { UnitTest, DetachAncestor, + + ImportPgdata, } #[derive(Default)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2e5f69e3c9..0214ee68fa 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -43,7 +43,9 @@ use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; +use timeline::import_pgdata; use timeline::offload::offload_timeline; +use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; use tokio::task::JoinSet; @@ -373,7 +375,6 @@ pub struct Tenant { l0_flush_global_state: L0FlushGlobalState, } - impl std::fmt::Debug for Tenant { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} ({})", self.tenant_shard_id, self.current_state()) @@ -860,6 +861,7 @@ impl Debug for SetStoppingError { pub(crate) enum CreateTimelineParams { Bootstrap(CreateTimelineParamsBootstrap), Branch(CreateTimelineParamsBranch), + ImportPgdata(CreateTimelineParamsImportPgdata), } #[derive(Debug)] @@ -877,7 +879,14 @@ pub(crate) struct CreateTimelineParamsBranch { pub(crate) ancestor_start_lsn: Option, } -/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`]. +#[derive(Debug)] +pub(crate) struct CreateTimelineParamsImportPgdata { + pub(crate) new_timeline_id: TimelineId, + pub(crate) location: import_pgdata::index_part_format::Location, + pub(crate) idempotency_key: import_pgdata::index_part_format::IdempotencyKey, +} + +/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`] in [`Tenant::start_creating_timeline`]. /// /// Each [`Timeline`] object holds [`Self`] as an immutable property in [`Timeline::create_idempotency`]. /// @@ -907,19 +916,50 @@ pub(crate) enum CreateTimelineIdempotency { ancestor_timeline_id: TimelineId, ancestor_start_lsn: Lsn, }, + ImportPgdata(CreatingTimelineIdempotencyImportPgdata), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct CreatingTimelineIdempotencyImportPgdata { + idempotency_key: import_pgdata::index_part_format::IdempotencyKey, } /// What is returned by [`Tenant::start_creating_timeline`]. #[must_use] -enum StartCreatingTimelineResult<'t> { - CreateGuard(TimelineCreateGuard<'t>), +enum StartCreatingTimelineResult { + CreateGuard(TimelineCreateGuard), Idempotent(Arc), } +enum TimelineInitAndSyncResult { + ReadyToActivate(Arc), + NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata), +} + +impl TimelineInitAndSyncResult { + fn ready_to_activate(self) -> Option> { + match self { + Self::ReadyToActivate(timeline) => Some(timeline), + _ => None, + } + } +} + +#[must_use] +struct TimelineInitAndSyncNeedsSpawnImportPgdata { + timeline: Arc, + import_pgdata: import_pgdata::index_part_format::Root, + guard: TimelineCreateGuard, +} + /// What is returned by [`Tenant::create_timeline`]. enum CreateTimelineResult { Created(Arc), Idempotent(Arc), + /// IMPORTANT: This [`Arc`] object is not in [`Tenant::timelines`] when + /// we return this result, nor will this concrete object ever be added there. + /// Cf method comment on [`Tenant::create_timeline_import_pgdata`]. + ImportSpawned(Arc), } impl CreateTimelineResult { @@ -927,18 +967,19 @@ impl CreateTimelineResult { match self { Self::Created(_) => "Created", Self::Idempotent(_) => "Idempotent", + Self::ImportSpawned(_) => "ImportSpawned", } } fn timeline(&self) -> &Arc { match self { - Self::Created(t) | Self::Idempotent(t) => t, + Self::Created(t) | Self::Idempotent(t) | Self::ImportSpawned(t) => t, } } /// Unit test timelines aren't activated, test has to do it if it needs to. #[cfg(test)] fn into_timeline_for_test(self) -> Arc { match self { - Self::Created(t) | Self::Idempotent(t) => t, + Self::Created(t) | Self::Idempotent(t) | Self::ImportSpawned(t) => t, } } } @@ -962,33 +1003,13 @@ pub enum CreateTimelineError { } #[derive(thiserror::Error, Debug)] -enum InitdbError { - Other(anyhow::Error), +pub enum InitdbError { + #[error("Operation was cancelled")] Cancelled, - Spawn(std::io::Result<()>), - Failed(std::process::ExitStatus, Vec), -} - -impl fmt::Display for InitdbError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - InitdbError::Cancelled => write!(f, "Operation was cancelled"), - InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e), - InitdbError::Failed(status, stderr) => write!( - f, - "Command failed with status {:?}: {}", - status, - String::from_utf8_lossy(stderr) - ), - InitdbError::Other(e) => write!(f, "Error: {:?}", e), - } - } -} - -impl From for InitdbError { - fn from(error: std::io::Error) -> Self { - InitdbError::Spawn(Err(error)) - } + #[error(transparent)] + Other(anyhow::Error), + #[error(transparent)] + Inner(postgres_initdb::Error), } enum CreateTimelineCause { @@ -996,6 +1017,15 @@ enum CreateTimelineCause { Delete, } +enum LoadTimelineCause { + Attach, + Unoffload, + ImportPgdata { + create_guard: TimelineCreateGuard, + activate: ActivateTimelineArgs, + }, +} + #[derive(thiserror::Error, Debug)] pub(crate) enum GcError { // The tenant is shutting down @@ -1072,24 +1102,35 @@ impl Tenant { /// it is marked as Active. #[allow(clippy::too_many_arguments)] async fn timeline_init_and_sync( - &self, + self: &Arc, timeline_id: TimelineId, resources: TimelineResources, - index_part: IndexPart, + mut index_part: IndexPart, metadata: TimelineMetadata, ancestor: Option>, - _ctx: &RequestContext, - ) -> anyhow::Result<()> { + cause: LoadTimelineCause, + ctx: &RequestContext, + ) -> anyhow::Result { let tenant_id = self.tenant_shard_id; - let idempotency = if metadata.ancestor_timeline().is_none() { - CreateTimelineIdempotency::Bootstrap { - pg_version: metadata.pg_version(), + let import_pgdata = index_part.import_pgdata.take(); + let idempotency = match &import_pgdata { + Some(import_pgdata) => { + CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata { + idempotency_key: import_pgdata.idempotency_key().clone(), + }) } - } else { - CreateTimelineIdempotency::Branch { - ancestor_timeline_id: metadata.ancestor_timeline().unwrap(), - ancestor_start_lsn: metadata.ancestor_lsn(), + None => { + if metadata.ancestor_timeline().is_none() { + CreateTimelineIdempotency::Bootstrap { + pg_version: metadata.pg_version(), + } + } else { + CreateTimelineIdempotency::Branch { + ancestor_timeline_id: metadata.ancestor_timeline().unwrap(), + ancestor_start_lsn: metadata.ancestor_lsn(), + } + } } }; @@ -1121,39 +1162,91 @@ impl Tenant { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; - { - // avoiding holding it across awaits - let mut timelines_accessor = self.timelines.lock().unwrap(); - match timelines_accessor.entry(timeline_id) { - // We should never try and load the same timeline twice during startup - Entry::Occupied(_) => { - unreachable!( - "Timeline {tenant_id}/{timeline_id} already exists in the tenant map" - ); + match import_pgdata { + Some(import_pgdata) if !import_pgdata.is_done() => { + match cause { + LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), + LoadTimelineCause::ImportPgdata { .. } => { + unreachable!("ImportPgdata should not be reloading timeline import is done and persisted as such in s3") + } } - Entry::Vacant(v) => { - v.insert(Arc::clone(&timeline)); - timeline.maybe_spawn_flush_loop(); + let mut guard = self.timelines_creating.lock().unwrap(); + if !guard.insert(timeline_id) { + // We should never try and load the same timeline twice during startup + unreachable!("Timeline {tenant_id}/{timeline_id} is already being created") } + let timeline_create_guard = TimelineCreateGuard { + _tenant_gate_guard: self.gate.enter()?, + owning_tenant: self.clone(), + timeline_id, + idempotency, + // The users of this specific return value don't need the timline_path in there. + timeline_path: timeline + .conf + .timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id), + }; + Ok(TimelineInitAndSyncResult::NeedsSpawnImportPgdata( + TimelineInitAndSyncNeedsSpawnImportPgdata { + timeline, + import_pgdata, + guard: timeline_create_guard, + }, + )) } - }; + Some(_) | None => { + { + let mut timelines_accessor = self.timelines.lock().unwrap(); + match timelines_accessor.entry(timeline_id) { + // We should never try and load the same timeline twice during startup + Entry::Occupied(_) => { + unreachable!( + "Timeline {tenant_id}/{timeline_id} already exists in the tenant map" + ); + } + Entry::Vacant(v) => { + v.insert(Arc::clone(&timeline)); + timeline.maybe_spawn_flush_loop(); + } + } + } - // Sanity check: a timeline should have some content. - anyhow::ensure!( - ancestor.is_some() - || timeline - .layers - .read() - .await - .layer_map() - .expect("currently loading, layer manager cannot be shutdown already") - .iter_historic_layers() - .next() - .is_some(), - "Timeline has no ancestor and no layer files" - ); + // Sanity check: a timeline should have some content. + anyhow::ensure!( + ancestor.is_some() + || timeline + .layers + .read() + .await + .layer_map() + .expect("currently loading, layer manager cannot be shutdown already") + .iter_historic_layers() + .next() + .is_some(), + "Timeline has no ancestor and no layer files" + ); - Ok(()) + match cause { + LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), + LoadTimelineCause::ImportPgdata { + create_guard, + activate, + } => { + // TODO: see the comment in the task code above how I'm not so certain + // it is safe to activate here because of concurrent shutdowns. + match activate { + ActivateTimelineArgs::Yes { broker_client } => { + info!("activating timeline after reload from pgdata import task"); + timeline.activate(self.clone(), broker_client, None, ctx); + } + ActivateTimelineArgs::No => (), + } + drop(create_guard); + } + } + + Ok(TimelineInitAndSyncResult::ReadyToActivate(timeline)) + } + } } /// Attach a tenant that's available in cloud storage. @@ -1578,24 +1671,46 @@ impl Tenant { } // TODO again handle early failure - self.load_remote_timeline( - timeline_id, - index_part, - remote_metadata, - TimelineResources { - remote_client, - timeline_get_throttle: self.timeline_get_throttle.clone(), - l0_flush_global_state: self.l0_flush_global_state.clone(), - }, - ctx, - ) - .await - .with_context(|| { - format!( - "failed to load remote timeline {} for tenant {}", - timeline_id, self.tenant_shard_id + let effect = self + .load_remote_timeline( + timeline_id, + index_part, + remote_metadata, + TimelineResources { + remote_client, + timeline_get_throttle: self.timeline_get_throttle.clone(), + l0_flush_global_state: self.l0_flush_global_state.clone(), + }, + LoadTimelineCause::Attach, + ctx, ) - })?; + .await + .with_context(|| { + format!( + "failed to load remote timeline {} for tenant {}", + timeline_id, self.tenant_shard_id + ) + })?; + + match effect { + TimelineInitAndSyncResult::ReadyToActivate(_) => { + // activation happens later, on Tenant::activate + } + TimelineInitAndSyncResult::NeedsSpawnImportPgdata( + TimelineInitAndSyncNeedsSpawnImportPgdata { + timeline, + import_pgdata, + guard, + }, + ) => { + tokio::task::spawn(self.clone().create_timeline_import_pgdata_task( + timeline, + import_pgdata, + ActivateTimelineArgs::No, + guard, + )); + } + } } // Walk through deleted timelines, resume deletion @@ -1719,13 +1834,14 @@ impl Tenant { #[instrument(skip_all, fields(timeline_id=%timeline_id))] async fn load_remote_timeline( - &self, + self: &Arc, timeline_id: TimelineId, index_part: IndexPart, remote_metadata: TimelineMetadata, resources: TimelineResources, + cause: LoadTimelineCause, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { span::debug_assert_current_span_has_tenant_id(); info!("downloading index file for timeline {}", timeline_id); @@ -1752,6 +1868,7 @@ impl Tenant { index_part, remote_metadata, ancestor, + cause, ctx, ) .await @@ -1938,6 +2055,7 @@ impl Tenant { TimelineArchivalError::Other(anyhow::anyhow!("Timeline already exists")) } TimelineExclusionError::Other(e) => TimelineArchivalError::Other(e), + TimelineExclusionError::ShuttingDown => TimelineArchivalError::Cancelled, })?; let timeline_preload = self @@ -1976,6 +2094,7 @@ impl Tenant { index_part, remote_metadata, timeline_resources, + LoadTimelineCause::Unoffload, &ctx, ) .await @@ -2213,7 +2332,7 @@ impl Tenant { /// /// Tests should use `Tenant::create_test_timeline` to set up the minimum required metadata keys. pub(crate) async fn create_empty_timeline( - &self, + self: &Arc, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, @@ -2263,7 +2382,7 @@ impl Tenant { // Our current tests don't need the background loops. #[cfg(test)] pub async fn create_test_timeline( - &self, + self: &Arc, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, @@ -2302,7 +2421,7 @@ impl Tenant { #[cfg(test)] #[allow(clippy::too_many_arguments)] pub async fn create_test_timeline_with_layers( - &self, + self: &Arc, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, @@ -2439,6 +2558,16 @@ impl Tenant { self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx) .await? } + CreateTimelineParams::ImportPgdata(params) => { + self.create_timeline_import_pgdata( + params, + ActivateTimelineArgs::Yes { + broker_client: broker_client.clone(), + }, + ctx, + ) + .await? + } }; // At this point we have dropped our guard on [`Self::timelines_creating`], and @@ -2481,11 +2610,202 @@ impl Tenant { ); timeline } + CreateTimelineResult::ImportSpawned(timeline) => { + info!("import task spawned, timeline will become visible and activated once the import is done"); + timeline + } }; Ok(activated_timeline) } + /// The returned [`Arc`] is NOT in the [`Tenant::timelines`] map until the import + /// completes in the background. A DIFFERENT [`Arc`] will be inserted into the + /// [`Tenant::timelines`] map when the import completes. + /// We only return an [`Arc`] here so the API handler can create a [`pageserver_api::models::TimelineInfo`] + /// for the response. + async fn create_timeline_import_pgdata( + self: &Arc, + params: CreateTimelineParamsImportPgdata, + activate: ActivateTimelineArgs, + ctx: &RequestContext, + ) -> Result { + let CreateTimelineParamsImportPgdata { + new_timeline_id, + location, + idempotency_key, + } = params; + + let started_at = chrono::Utc::now().naive_utc(); + + // + // There's probably a simpler way to upload an index part, but, remote_timeline_client + // is the canonical way we do it. + // - create an empty timeline in-memory + // - use its remote_timeline_client to do the upload + // - dispose of the uninit timeline + // - keep the creation guard alive + + let timeline_create_guard = match self + .start_creating_timeline( + new_timeline_id, + CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata { + idempotency_key: idempotency_key.clone(), + }), + ) + .await? + { + StartCreatingTimelineResult::CreateGuard(guard) => guard, + StartCreatingTimelineResult::Idempotent(timeline) => { + return Ok(CreateTimelineResult::Idempotent(timeline)) + } + }; + + let mut uninit_timeline = { + let this = &self; + let initdb_lsn = Lsn(0); + let _ctx = ctx; + async move { + let new_metadata = TimelineMetadata::new( + // Initialize disk_consistent LSN to 0, The caller must import some data to + // make it valid, before calling finish_creation() + Lsn(0), + None, + None, + Lsn(0), + initdb_lsn, + initdb_lsn, + 15, + ); + this.prepare_new_timeline( + new_timeline_id, + &new_metadata, + timeline_create_guard, + initdb_lsn, + None, + ) + .await + } + } + .await?; + + let in_progress = import_pgdata::index_part_format::InProgress { + idempotency_key, + location, + started_at, + }; + let index_part = import_pgdata::index_part_format::Root::V1( + import_pgdata::index_part_format::V1::InProgress(in_progress), + ); + uninit_timeline + .raw_timeline() + .unwrap() + .remote_client + .schedule_index_upload_for_import_pgdata_state_update(Some(index_part.clone()))?; + + // wait_completion happens in caller + + let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself(); + + tokio::spawn(self.clone().create_timeline_import_pgdata_task( + timeline.clone(), + index_part, + activate, + timeline_create_guard, + )); + + // NB: the timeline doesn't exist in self.timelines at this point + Ok(CreateTimelineResult::ImportSpawned(timeline)) + } + + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))] + async fn create_timeline_import_pgdata_task( + self: Arc, + timeline: Arc, + index_part: import_pgdata::index_part_format::Root, + activate: ActivateTimelineArgs, + timeline_create_guard: TimelineCreateGuard, + ) { + debug_assert_current_span_has_tenant_and_timeline_id(); + info!("starting"); + scopeguard::defer! {info!("exiting")}; + + let res = self + .create_timeline_import_pgdata_task_impl( + timeline, + index_part, + activate, + timeline_create_guard, + ) + .await; + if let Err(err) = &res { + error!(?err, "task failed"); + // TODO sleep & retry, sensitive to tenant shutdown + // TODO: allow timeline deletion requests => should cancel the task + } + } + + async fn create_timeline_import_pgdata_task_impl( + self: Arc, + timeline: Arc, + index_part: import_pgdata::index_part_format::Root, + activate: ActivateTimelineArgs, + timeline_create_guard: TimelineCreateGuard, + ) -> Result<(), anyhow::Error> { + let ctx = RequestContext::new(TaskKind::ImportPgdata, DownloadBehavior::Warn); + + info!("importing pgdata"); + import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone()) + .await + .context("import")?; + info!("import done"); + + // + // Reload timeline from remote. + // This proves that the remote state is attachable, and it reuses the code. + // + // TODO: think about whether this is safe to do with concurrent Tenant::shutdown. + // timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit. + // But our activate() call might launch new background tasks after Tenant::shutdown + // already went past shutting down the Tenant::timelines, which this timeline here is no part of. + // I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting + // down while bootstrapping/branching + activating), but, the race condition is much more likely + // to manifest because of the long runtime of this import task. + + // in theory this shouldn't even .await anything except for coop yield + info!("shutting down timeline"); + timeline.shutdown(ShutdownMode::Hard).await; + info!("timeline shut down, reloading from remote"); + // TODO: we can't do the following check because create_timeline_import_pgdata must return an Arc + // let Some(timeline) = Arc::into_inner(timeline) else { + // anyhow::bail!("implementation error: timeline that we shut down was still referenced from somewhere"); + // }; + let timeline_id = timeline.timeline_id; + + // load from object storage like Tenant::attach does + let resources = self.build_timeline_resources(timeline_id); + let index_part = resources + .remote_client + .download_index_file(&self.cancel) + .await?; + let index_part = match index_part { + MaybeDeletedIndexPart::Deleted(_) => { + // likely concurrent delete call, cplane should prevent this + anyhow::bail!("index part says deleted but we are not done creating yet, this should not happen but") + } + MaybeDeletedIndexPart::IndexPart(p) => p, + }; + let metadata = index_part.metadata.clone(); + self + .load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{ + create_guard: timeline_create_guard, activate, }, &ctx) + .await? + .ready_to_activate() + .context("implementation error: reloaded timeline still needs import after import reported success")?; + + anyhow::Ok(()) + } + pub(crate) async fn delete_timeline( self: Arc, timeline_id: TimelineId, @@ -3337,6 +3657,13 @@ where Ok(result) } +enum ActivateTimelineArgs { + Yes { + broker_client: storage_broker::BrokerClientChannel, + }, + No, +} + impl Tenant { pub fn tenant_specific_overrides(&self) -> TenantConfOpt { self.tenant_conf.load().tenant_conf.clone() @@ -3520,6 +3847,7 @@ impl Tenant { /// `validate_ancestor == false` is used when a timeline is created for deletion /// and we might not have the ancestor present anymore which is fine for to be /// deleted timelines. + #[allow(clippy::too_many_arguments)] fn create_timeline_struct( &self, new_timeline_id: TimelineId, @@ -4283,16 +4611,17 @@ impl Tenant { /// If the timeline was already created in the meantime, we check whether this /// request conflicts or is idempotent , based on `state`. async fn start_creating_timeline( - &self, + self: &Arc, new_timeline_id: TimelineId, idempotency: CreateTimelineIdempotency, - ) -> Result, CreateTimelineError> { + ) -> Result { let allow_offloaded = false; match self.create_timeline_create_guard(new_timeline_id, idempotency, allow_offloaded) { Ok(create_guard) => { pausable_failpoint!("timeline-creation-after-uninit"); Ok(StartCreatingTimelineResult::CreateGuard(create_guard)) } + Err(TimelineExclusionError::ShuttingDown) => Err(CreateTimelineError::ShuttingDown), Err(TimelineExclusionError::AlreadyCreating) => { // Creation is in progress, we cannot create it again, and we cannot // check if this request matches the existing one, so caller must try @@ -4582,7 +4911,7 @@ impl Tenant { &'a self, new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, - create_guard: TimelineCreateGuard<'a>, + create_guard: TimelineCreateGuard, start_lsn: Lsn, ancestor: Option>, ) -> anyhow::Result> { @@ -4642,7 +4971,7 @@ impl Tenant { /// The `allow_offloaded` parameter controls whether to tolerate the existence of /// offloaded timelines or not. fn create_timeline_create_guard( - &self, + self: &Arc, timeline_id: TimelineId, idempotency: CreateTimelineIdempotency, allow_offloaded: bool, @@ -4902,48 +5231,16 @@ async fn run_initdb( let _permit = INIT_DB_SEMAPHORE.acquire().await; - let mut initdb_command = tokio::process::Command::new(&initdb_bin_path); - initdb_command - .args(["--pgdata", initdb_target_dir.as_ref()]) - .args(["--username", &conf.superuser]) - .args(["--encoding", "utf8"]) - .args(["--locale", &conf.locale]) - .arg("--no-instructions") - .arg("--no-sync") - .env_clear() - .env("LD_LIBRARY_PATH", &initdb_lib_dir) - .env("DYLD_LIBRARY_PATH", &initdb_lib_dir) - .stdin(std::process::Stdio::null()) - // stdout invocation produces the same output every time, we don't need it - .stdout(std::process::Stdio::null()) - // we would be interested in the stderr output, if there was any - .stderr(std::process::Stdio::piped()); - - // Before version 14, only the libc provide was available. - if pg_version > 14 { - // Version 17 brought with it a builtin locale provider which only provides - // C and C.UTF-8. While being safer for collation purposes since it is - // guaranteed to be consistent throughout a major release, it is also more - // performant. - let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" }; - - initdb_command.args(["--locale-provider", locale_provider]); - } - - let initdb_proc = initdb_command.spawn()?; - - // Ideally we'd select here with the cancellation token, but the problem is that - // we can't safely terminate initdb: it launches processes of its own, and killing - // initdb doesn't kill them. After we return from this function, we want the target - // directory to be able to be cleaned up. - // See https://github.com/neondatabase/neon/issues/6385 - let initdb_output = initdb_proc.wait_with_output().await?; - if !initdb_output.status.success() { - return Err(InitdbError::Failed( - initdb_output.status, - initdb_output.stderr, - )); - } + let res = postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs { + superuser: &conf.superuser, + locale: &conf.locale, + initdb_bin: &initdb_bin_path, + pg_version, + library_search_path: &initdb_lib_dir, + pgdata: initdb_target_dir, + }) + .await + .map_err(InitdbError::Inner); // This isn't true cancellation support, see above. Still return an error to // excercise the cancellation code path. @@ -4951,7 +5248,7 @@ async fn run_initdb( return Err(InitdbError::Cancelled); } - Ok(()) + res } /// Dump contents of a layer file to stdout. diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 4c88282214..007bd3eef0 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -244,6 +244,7 @@ use self::index::IndexPart; use super::config::AttachedLocationConfig; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; +use super::timeline::import_pgdata; use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; use super::{DeleteTimelineError, Generation}; @@ -813,6 +814,18 @@ impl RemoteTimelineClient { Ok(need_wait) } + /// Launch an index-file upload operation in the background, setting `import_pgdata` field. + pub(crate) fn schedule_index_upload_for_import_pgdata_state_update( + self: &Arc, + state: Option, + ) -> anyhow::Result<()> { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + upload_queue.dirty.import_pgdata = state; + self.schedule_index_upload(upload_queue)?; + Ok(()) + } + /// /// Launch an index-file upload operation in the background, if necessary. /// diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index efcd20d1bf..d632e595ad 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -706,7 +706,7 @@ where .and_then(|x| x) } -async fn download_retry_forever( +pub(crate) async fn download_retry_forever( op: O, description: &str, cancel: &CancellationToken, diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index d8a881a2c4..506990fb2f 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -12,6 +12,7 @@ use utils::id::TimelineId; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::storage_layer::LayerName; +use crate::tenant::timeline::import_pgdata; use crate::tenant::Generation; use pageserver_api::shard::ShardIndex; @@ -37,6 +38,13 @@ pub struct IndexPart { #[serde(skip_serializing_if = "Option::is_none")] pub archived_at: Option, + /// This field supports import-from-pgdata ("fast imports" platform feature). + /// We don't currently use fast imports, so, this field is None for all production timelines. + /// See for more information. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub import_pgdata: Option, + /// Per layer file name metadata, which can be present for a present or missing layer file. /// /// Older versions of `IndexPart` will not have this property or have only a part of metadata @@ -90,10 +98,11 @@ impl IndexPart { /// - 7: metadata_bytes is no longer written, but still read /// - 8: added `archived_at` /// - 9: +gc_blocking - const LATEST_VERSION: usize = 9; + /// - 10: +import_pgdata + const LATEST_VERSION: usize = 10; // Versions we may see when reading from a bucket. - pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9]; + pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; pub const FILE_NAME: &'static str = "index_part.json"; @@ -108,6 +117,7 @@ impl IndexPart { lineage: Default::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, } } @@ -381,6 +391,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -425,6 +436,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -470,6 +482,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -518,6 +531,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap(); @@ -561,6 +575,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -607,6 +622,7 @@ mod tests { }, gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -658,6 +674,7 @@ mod tests { }, gc_blocking: None, last_aux_file_policy: Some(AuxFilePolicy::V2), + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -714,6 +731,7 @@ mod tests { lineage: Default::default(), gc_blocking: None, last_aux_file_policy: Default::default(), + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -771,6 +789,7 @@ mod tests { lineage: Default::default(), gc_blocking: None, last_aux_file_policy: Default::default(), + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -833,6 +852,83 @@ mod tests { }), last_aux_file_policy: Default::default(), archived_at: None, + import_pgdata: None, + }; + + let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + + #[test] + fn v10_importpgdata_is_parsed() { + let example = r#"{ + "version": 10, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata": { + "disk_consistent_lsn": "0/16960E8", + "prev_record_lsn": "0/1696070", + "ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e", + "ancestor_lsn": "0/0", + "latest_gc_cutoff_lsn": "0/1696070", + "initdb_lsn": "0/1696070", + "pg_version": 14 + }, + "gc_blocking": { + "started_at": "2024-07-19T09:00:00.123", + "reasons": ["DetachAncestor"] + }, + "import_pgdata": { + "V1": { + "Done": { + "idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5", + "started_at": "2024-11-13T09:23:42.123", + "finished_at": "2024-11-13T09:42:23.123" + } + } + } + }"#; + + let expected = IndexPart { + version: 10, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata { + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::new( + Lsn::from_str("0/16960E8").unwrap(), + Some(Lsn::from_str("0/1696070").unwrap()), + Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()), + Lsn::INVALID, + Lsn::from_str("0/1696070").unwrap(), + Lsn::from_str("0/1696070").unwrap(), + 14, + ).with_recalculated_checksum().unwrap(), + deleted_at: None, + lineage: Default::default(), + gc_blocking: Some(GcBlocking { + started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"), + reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]), + }), + last_aux_file_policy: Default::default(), + archived_at: None, + import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{ + started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"), + finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"), + idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()), + }))) }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d1285e7c8a..4881be33a6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4,6 +4,7 @@ pub mod delete; pub(crate) mod detach_ancestor; mod eviction_task; pub(crate) mod handle; +pub(crate) mod import_pgdata; mod init; pub mod layer_manager; pub(crate) mod logical_size; @@ -2708,20 +2709,23 @@ impl Timeline { { Some(cancel) => cancel.cancel(), None => { - let state = self.current_state(); - if matches!( - state, - TimelineState::Broken { .. } | TimelineState::Stopping - ) { - - // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). - // Don't make noise. - } else { - warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work"); - debug_assert!(false); + match self.current_state() { + TimelineState::Broken { .. } | TimelineState::Stopping => { + // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). + // Don't make noise. + } + TimelineState::Loading => { + // Import does not return an activated timeline. + info!("discarding priority boost for logical size calculation because timeline is not yet active"); + } + TimelineState::Active => { + // activation should be setting the once cell + warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work"); + debug_assert!(false); + } } } - }; + } } } diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs new file mode 100644 index 0000000000..de56468580 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -0,0 +1,218 @@ +use std::sync::Arc; + +use anyhow::{bail, Context}; +use remote_storage::RemotePath; +use tokio_util::sync::CancellationToken; +use tracing::{info, info_span, Instrument}; +use utils::lsn::Lsn; + +use crate::{context::RequestContext, tenant::metadata::TimelineMetadata}; + +use super::Timeline; + +mod flow; +mod importbucket_client; +mod importbucket_format; +pub(crate) mod index_part_format; +pub(crate) mod upcall_api; + +pub async fn doit( + timeline: &Arc, + index_part: index_part_format::Root, + ctx: &RequestContext, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let index_part_format::Root::V1(v1) = index_part; + let index_part_format::InProgress { + location, + idempotency_key, + started_at, + } = match v1 { + index_part_format::V1::Done(_) => return Ok(()), + index_part_format::V1::InProgress(in_progress) => in_progress, + }; + + let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; + + info!("get spec early so we know we'll be able to upcall when done"); + let Some(spec) = storage.get_spec().await? else { + bail!("spec not found") + }; + + let upcall_client = + upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?; + + // + // send an early progress update to clean up k8s job early and generate potentially useful logs + // + info!("send early progress update"); + upcall_client + .send_progress_until_success(&spec) + .instrument(info_span!("early_progress_update")) + .await?; + + let status_prefix = RemotePath::from_string("status").unwrap(); + + // + // See if shard is done. + // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing. + // + let shard_status_key = + status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug())); + let shard_status: Option = + storage.get_json(&shard_status_key).await?; + info!(?shard_status, "peeking shard status"); + if shard_status.map(|st| st.done).unwrap_or(false) { + info!("shard status indicates that the shard is done, skipping import"); + } else { + // TODO: checkpoint the progress into the IndexPart instead of restarting + // from the beginning. + + // + // Wipe the slate clean - the flow does not allow resuming. + // We can implement resuming in the future by checkpointing the progress into the IndexPart. + // + info!("wipe the slate clean"); + { + // TODO: do we need to hold GC lock for this? + let mut guard = timeline.layers.write().await; + assert!( + guard.layer_map()?.open_layer.is_none(), + "while importing, there should be no in-memory layer" // this just seems like a good place to assert it + ); + let all_layers_keys = guard.all_persistent_layers(); + let all_layers: Vec<_> = all_layers_keys + .iter() + .map(|key| guard.get_from_key(key)) + .collect(); + let open = guard.open_mut().context("open_mut")?; + + timeline.remote_client.schedule_gc_update(&all_layers)?; + open.finish_gc_timeline(&all_layers); + } + + // + // Wait for pgdata to finish uploading + // + info!("wait for pgdata to reach status 'done'"); + let pgdata_status_key = status_prefix.join("pgdata"); + loop { + let res = async { + let pgdata_status: Option = storage + .get_json(&pgdata_status_key) + .await + .context("get pgdata status")?; + info!(?pgdata_status, "peeking pgdata status"); + if pgdata_status.map(|st| st.done).unwrap_or(false) { + Ok(()) + } else { + Err(anyhow::anyhow!("pgdata not done yet")) + } + } + .await; + match res { + Ok(_) => break, + Err(err) => { + info!(?err, "indefintely waiting for pgdata to finish"); + if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled()) + .await + .is_ok() + { + bail!("cancelled while waiting for pgdata"); + } + } + } + } + + // + // Do the import + // + info!("do the import"); + let control_file = storage.get_control_file().await?; + let base_lsn = control_file.base_lsn(); + + info!("update TimelineMetadata based on LSNs from control file"); + { + let pg_version = control_file.pg_version(); + let _ctx: &RequestContext = ctx; + async move { + // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the + // checkpoint record, and prev_record_lsn should point to its beginning. + // We should read the real end of the record from the WAL, but here we + // just fake it. + let disk_consistent_lsn = Lsn(base_lsn.0 + 8); + let prev_record_lsn = base_lsn; + let metadata = TimelineMetadata::new( + disk_consistent_lsn, + Some(prev_record_lsn), + None, // no ancestor + Lsn(0), // no ancestor lsn + base_lsn, // latest_gc_cutoff_lsn + base_lsn, // initdb_lsn + pg_version, + ); + + let _start_lsn = disk_consistent_lsn + 1; + + timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata)?; + + timeline.remote_client.wait_completion().await?; + + anyhow::Ok(()) + } + } + .await?; + + flow::run( + timeline.clone(), + base_lsn, + control_file, + storage.clone(), + ctx, + ) + .await?; + + // + // Communicate that shard is done. + // + storage + .put_json( + &shard_status_key, + &importbucket_format::ShardStatus { done: true }, + ) + .await + .context("put shard status")?; + } + + // + // Ensure at-least-once deliver of the upcall to cplane + // before we mark the task as done and never come here again. + // + info!("send final progress update"); + upcall_client + .send_progress_until_success(&spec) + .instrument(info_span!("final_progress_update")) + .await?; + + // + // Mark as done in index_part. + // This makes subsequent timeline loads enter the normal load code path + // instead of spawning the import task and calling this here function. + // + info!("mark import as complete in index part"); + timeline + .remote_client + .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1( + index_part_format::V1::Done(index_part_format::Done { + idempotency_key, + started_at, + finished_at: chrono::Utc::now().naive_utc(), + }), + )))?; + + timeline.remote_client.wait_completion().await?; + + Ok(()) +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs new file mode 100644 index 0000000000..cbd4168c06 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -0,0 +1,798 @@ +//! Import a PGDATA directory into an empty root timeline. +//! +//! This module is adapted hackathon code by Heikki and Stas. +//! Other code in the parent module was written by Christian as part of a customer PoC. +//! +//! The hackathon code was producing image layer files as a free-standing program. +//! +//! It has been modified to +//! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard) +//! - => sharding-awareness: produce image layers with only the data relevant for this shard +//! - => S3 as the source for the PGDATA instead of local filesystem +//! +//! TODOs before productionization: +//! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding. +//! => produced image layers likely too small. +//! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size. +//! - asserts / unwraps need to be replaced with errors +//! - don't trust remote objects will be small (=prevent OOMs in those cases) +//! - limit all in-memory buffers in size, or download to disk and read from there +//! - limit task concurrency +//! - generally play nice with other tenants in the system +//! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits +//! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc +//! - integrate with layer eviction system +//! - audit for Tenant::cancel nor Timeline::cancel responsivity +//! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!) +//! +//! An incomplete set of TODOs from the Hackathon: +//! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest) + +use std::sync::Arc; + +use anyhow::{bail, ensure}; +use bytes::Bytes; + +use itertools::Itertools; +use pageserver_api::{ + key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, + reltag::RelTag, + shard::ShardIdentity, +}; +use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, BLCKSZ}; +use tokio::task::JoinSet; +use tracing::{debug, info_span, instrument, Instrument}; + +use crate::{ + assert_u64_eq_usize::UsizeIsU64, + pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory}, +}; +use crate::{ + context::{DownloadBehavior, RequestContext}, + pgdatadir_mapping::{DbDirectory, RelDirectory}, + task_mgr::TaskKind, + tenant::storage_layer::{ImageLayerWriter, Layer}, +}; + +use pageserver_api::key::Key; +use pageserver_api::key::{ + slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, CHECKPOINT_KEY, CONTROLFILE_KEY, + TWOPHASEDIR_KEY, +}; +use pageserver_api::keyspace::singleton_range; +use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range}; +use pageserver_api::reltag::SlruKind; +use utils::bin_ser::BeSer; +use utils::lsn::Lsn; + +use std::collections::HashSet; +use std::ops::Range; + +use super::{ + importbucket_client::{ControlFile, RemoteStorageWrapper}, + Timeline, +}; + +use remote_storage::RemotePath; + +pub async fn run( + timeline: Arc, + pgdata_lsn: Lsn, + control_file: ControlFile, + storage: RemoteStorageWrapper, + ctx: &RequestContext, +) -> anyhow::Result<()> { + Flow { + timeline, + pgdata_lsn, + control_file, + tasks: Vec::new(), + storage, + } + .run(ctx) + .await +} + +struct Flow { + timeline: Arc, + pgdata_lsn: Lsn, + control_file: ControlFile, + tasks: Vec, + storage: RemoteStorageWrapper, +} + +impl Flow { + /// Perform the ingestion into [`Self::timeline`]. + /// Assumes the timeline is empty (= no layers). + pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> { + let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align(); + + self.pgdata_lsn = pgdata_lsn; + + let datadir = PgDataDir::new(&self.storage).await?; + + // Import dbdir (00:00:00 keyspace) + // This is just constructed here, but will be written to the image layer in the first call to import_db() + let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory { + dbdirs: datadir + .dbs + .iter() + .map(|db| ((db.spcnode, db.dboid), true)) + .collect(), + })?); + self.tasks + .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into()); + + // Import databases (00:spcnode:dbnode keyspace for each db) + for db in datadir.dbs { + self.import_db(&db).await?; + } + + // Import SLRUs + + // pg_xact (01:00 keyspace) + self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact")) + .await?; + // pg_multixact/members (01:01 keyspace) + self.import_slru( + SlruKind::MultiXactMembers, + &self.storage.pgdata().join("pg_multixact/members"), + ) + .await?; + // pg_multixact/offsets (01:02 keyspace) + self.import_slru( + SlruKind::MultiXactOffsets, + &self.storage.pgdata().join("pg_multixact/offsets"), + ) + .await?; + + // Import pg_twophase. + // TODO: as empty + let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory { + xids: HashSet::new(), + })?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + TWOPHASEDIR_KEY, + Bytes::from(twophasedir_buf), + ))); + + // Controlfile, checkpoint + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + CONTROLFILE_KEY, + self.control_file.control_file_buf().clone(), + ))); + + let checkpoint_buf = self + .control_file + .control_file_data() + .checkPointCopy + .encode()?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + CHECKPOINT_KEY, + checkpoint_buf, + ))); + + // Assigns parts of key space to later parallel jobs + let mut last_end_key = Key::MIN; + let mut current_chunk = Vec::new(); + let mut current_chunk_size: usize = 0; + let mut parallel_jobs = Vec::new(); + for task in std::mem::take(&mut self.tasks).into_iter() { + if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 { + let key_range = last_end_key..task.key_range().start; + parallel_jobs.push(ChunkProcessingJob::new( + key_range.clone(), + std::mem::take(&mut current_chunk), + &self, + )); + last_end_key = key_range.end; + current_chunk_size = 0; + } + current_chunk_size += task.total_size(); + current_chunk.push(task); + } + parallel_jobs.push(ChunkProcessingJob::new( + last_end_key..Key::MAX, + current_chunk, + &self, + )); + + // Start all jobs simultaneosly + let mut work = JoinSet::new(); + // TODO: semaphore? + for job in parallel_jobs { + let ctx: RequestContext = + ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error); + work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job"))); + } + let mut results = Vec::new(); + while let Some(result) = work.join_next().await { + match result { + Ok(res) => { + results.push(res); + } + Err(_joinset_err) => { + results.push(Err(anyhow::anyhow!( + "parallel job panicked or cancelled, check pageserver logs" + ))); + } + } + } + + if results.iter().all(|r| r.is_ok()) { + Ok(()) + } else { + let mut msg = String::new(); + for result in results { + if let Err(err) = result { + msg.push_str(&format!("{err:?}\n\n")); + } + } + bail!("Some parallel jobs failed:\n\n{msg}"); + } + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))] + async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> { + debug!("start"); + scopeguard::defer! { + debug!("return"); + } + + // Import relmap (00:spcnode:dbnode:00:*:00) + let relmap_key = relmap_file_key(db.spcnode, db.dboid); + debug!("Constructing relmap entry, key {relmap_key}"); + let relmap_path = db.path.join("pg_filenode.map"); + let relmap_buf = self.storage.get(&relmap_path).await?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + relmap_key, relmap_buf, + ))); + + // Import reldir (00:spcnode:dbnode:00:*:01) + let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); + debug!("Constructing reldirs entry, key {reldir_key}"); + let reldir_buf = RelDirectory::ser(&RelDirectory { + rels: db + .files + .iter() + .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)) + .collect(), + })?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + reldir_key, + Bytes::from(reldir_buf), + ))); + + // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last + // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff) + for file in &db.files { + debug!(%file.path, %file.filesize, "importing file"); + let len = file.filesize; + ensure!(len % 8192 == 0); + let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192); + let start_key = rel_block_to_key(file.rel_tag, start_blk); + let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32); + self.tasks + .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new( + *self.timeline.get_shard_identity(), + start_key..end_key, + &file.path, + self.storage.clone(), + ))); + + // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff) + if let Some(nblocks) = file.nblocks { + let size_key = rel_size_to_key(file.rel_tag); + //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}"); + let buf = nblocks.to_le_bytes(); + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + size_key, + Bytes::from(buf.to_vec()), + ))); + } + } + + Ok(()) + } + + async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> { + let segments = self.storage.listfilesindir(path).await?; + let segments: Vec<(String, u32, usize)> = segments + .into_iter() + .filter_map(|(path, size)| { + let filename = path.object_name()?; + let segno = u32::from_str_radix(filename, 16).ok()?; + Some((filename.to_string(), segno, size)) + }) + .collect(); + + // Write SlruDir + let slrudir_key = slru_dir_to_key(kind); + let segnos: HashSet = segments + .iter() + .map(|(_path, segno, _size)| *segno) + .collect(); + let slrudir = SlruSegmentDirectory { segments: segnos }; + let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + slrudir_key, + Bytes::from(slrudir_buf), + ))); + + for (segpath, segno, size) in segments { + // SlruSegBlocks for each segment + let p = path.join(&segpath); + let file_size = size; + ensure!(file_size % 8192 == 0); + let nblocks = u32::try_from(file_size / 8192)?; + let start_key = slru_block_to_key(kind, segno, 0); + let end_key = slru_block_to_key(kind, segno, nblocks); + debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment"); + self.tasks + .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new( + *self.timeline.get_shard_identity(), + start_key..end_key, + &p, + self.storage.clone(), + ))); + + // Followed by SlruSegSize + let segsize_key = slru_segment_size_to_key(kind, segno); + let segsize_buf = nblocks.to_le_bytes(); + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + segsize_key, + Bytes::copy_from_slice(&segsize_buf), + ))); + } + Ok(()) + } +} + +// +// dbdir iteration tools +// + +struct PgDataDir { + pub dbs: Vec, // spcnode, dboid, path +} + +struct PgDataDirDb { + pub spcnode: u32, + pub dboid: u32, + pub path: RemotePath, + pub files: Vec, +} + +struct PgDataDirDbFile { + pub path: RemotePath, + pub rel_tag: RelTag, + pub segno: u32, + pub filesize: usize, + // Cummulative size of the given fork, set only for the last segment of that fork + pub nblocks: Option, +} + +impl PgDataDir { + async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result { + let datadir_path = storage.pgdata(); + // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first + // Traverse database in increasing oid order + + let basedir = &datadir_path.join("base"); + let db_oids: Vec<_> = storage + .listdir(basedir) + .await? + .into_iter() + .filter_map(|path| path.object_name().and_then(|name| name.parse::().ok())) + .sorted() + .collect(); + debug!(?db_oids, "found databases"); + let mut databases = Vec::new(); + for dboid in db_oids { + databases.push( + PgDataDirDb::new( + storage, + &basedir.join(dboid.to_string()), + pg_constants::DEFAULTTABLESPACE_OID, + dboid, + &datadir_path, + ) + .await?, + ); + } + + // special case for global catalogs + databases.push( + PgDataDirDb::new( + storage, + &datadir_path.join("global"), + postgres_ffi::pg_constants::GLOBALTABLESPACE_OID, + 0, + &datadir_path, + ) + .await?, + ); + + databases.sort_by_key(|db| (db.spcnode, db.dboid)); + + Ok(Self { dbs: databases }) + } +} + +impl PgDataDirDb { + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))] + async fn new( + storage: &RemoteStorageWrapper, + db_path: &RemotePath, + spcnode: u32, + dboid: u32, + datadir_path: &RemotePath, + ) -> anyhow::Result { + let mut files: Vec = storage + .listfilesindir(db_path) + .await? + .into_iter() + .filter_map(|(path, size)| { + debug!(%path, %size, "found file in dbdir"); + path.object_name().and_then(|name| { + // returns (relnode, forknum, segno) + parse_relfilename(name).ok().map(|x| (size, x)) + }) + }) + .sorted_by_key(|(_, relfilename)| *relfilename) + .map(|(filesize, (relnode, forknum, segno))| { + let rel_tag = RelTag { + spcnode, + dbnode: dboid, + relnode, + forknum, + }; + + let path = datadir_path.join(rel_tag.to_segfile_name(segno)); + assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error + let nblocks = filesize / BLCKSZ as usize; + + PgDataDirDbFile { + path, + filesize, + rel_tag, + segno, + nblocks: Some(nblocks), // first non-cummulative sizes + } + }) + .collect(); + + // Set cummulative sizes. Do all of that math here, so that later we could easier + // parallelize over segments and know with which segments we need to write relsize + // entry. + let mut cumulative_nblocks: usize = 0; + let mut prev_rel_tag: Option = None; + for i in 0..files.len() { + if prev_rel_tag == Some(files[i].rel_tag) { + cumulative_nblocks += files[i].nblocks.unwrap(); + } else { + cumulative_nblocks = files[i].nblocks.unwrap(); + } + + files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag { + Some(cumulative_nblocks) + } else { + None + }; + + prev_rel_tag = Some(files[i].rel_tag); + } + + Ok(PgDataDirDb { + files, + path: db_path.clone(), + spcnode, + dboid, + }) + } +} + +trait ImportTask { + fn key_range(&self) -> Range; + + fn total_size(&self) -> usize { + // TODO: revisit this + if is_contiguous_range(&self.key_range()) { + contiguous_range_len(&self.key_range()) as usize * 8192 + } else { + u32::MAX as usize + } + } + + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result; +} + +struct ImportSingleKeyTask { + key: Key, + buf: Bytes, +} + +impl ImportSingleKeyTask { + fn new(key: Key, buf: Bytes) -> Self { + ImportSingleKeyTask { key, buf } + } +} + +impl ImportTask for ImportSingleKeyTask { + fn key_range(&self) -> Range { + singleton_range(self.key) + } + + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + layer_writer.put_image(self.key, self.buf, ctx).await?; + Ok(1) + } +} + +struct ImportRelBlocksTask { + shard_identity: ShardIdentity, + key_range: Range, + path: RemotePath, + storage: RemoteStorageWrapper, +} + +impl ImportRelBlocksTask { + fn new( + shard_identity: ShardIdentity, + key_range: Range, + path: &RemotePath, + storage: RemoteStorageWrapper, + ) -> Self { + ImportRelBlocksTask { + shard_identity, + key_range, + path: path.clone(), + storage, + } + } +} + +impl ImportTask for ImportRelBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))] + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + debug!("Importing relation file"); + + let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?; + let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?; + assert_eq!(rel_tag, rel_tag_end); + + let ranges = (start_blk..end_blk) + .enumerate() + .filter_map(|(i, blknum)| { + let key = rel_block_to_key(rel_tag, blknum); + if self.shard_identity.is_key_disposable(&key) { + return None; + } + let file_offset = i.checked_mul(8192).unwrap(); + Some(( + vec![key], + file_offset, + file_offset.checked_add(8192).unwrap(), + )) + }) + .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| { + assert_eq!(key.len(), 1); + assert!(!acc.is_empty()); + assert!(acc_end > acc_start); + if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ { + acc.push(key.pop().unwrap()); + Ok((acc, acc_start, end)) + } else { + Err(((acc, acc_start, acc_end), (key, start, end))) + } + }); + + let mut nimages = 0; + for (keys, range_start, range_end) in ranges { + let range_buf = self + .storage + .get_range(&self.path, range_start.into_u64(), range_end.into_u64()) + .await?; + let mut buf = Bytes::from(range_buf); + // TODO: batched writes + for key in keys { + let image = buf.split_to(8192); + layer_writer.put_image(key, image, ctx).await?; + nimages += 1; + } + } + + Ok(nimages) + } +} + +struct ImportSlruBlocksTask { + shard_identity: ShardIdentity, + key_range: Range, + path: RemotePath, + storage: RemoteStorageWrapper, +} + +impl ImportSlruBlocksTask { + fn new( + shard_identity: ShardIdentity, + key_range: Range, + path: &RemotePath, + storage: RemoteStorageWrapper, + ) -> Self { + ImportSlruBlocksTask { + shard_identity, + key_range, + path: path.clone(), + storage, + } + } +} + +impl ImportTask for ImportSlruBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + debug!("Importing SLRU segment file {}", self.path); + let buf = self.storage.get(&self.path).await?; + + let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?; + let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?; + let mut blknum = start_blk; + let mut nimages = 0; + let mut file_offset = 0; + while blknum < end_blk { + let key = slru_block_to_key(kind, segno, blknum); + assert!( + !self.shard_identity.is_key_disposable(&key), + "SLRU keys need to go into every shard" + ); + let buf = &buf[file_offset..(file_offset + 8192)]; + file_offset += 8192; + layer_writer + .put_image(key, Bytes::copy_from_slice(buf), ctx) + .await?; + blknum += 1; + nimages += 1; + } + Ok(nimages) + } +} + +enum AnyImportTask { + SingleKey(ImportSingleKeyTask), + RelBlocks(ImportRelBlocksTask), + SlruBlocks(ImportSlruBlocksTask), +} + +impl ImportTask for AnyImportTask { + fn key_range(&self) -> Range { + match self { + Self::SingleKey(t) => t.key_range(), + Self::RelBlocks(t) => t.key_range(), + Self::SlruBlocks(t) => t.key_range(), + } + } + /// returns the number of images put into the `layer_writer` + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + match self { + Self::SingleKey(t) => t.doit(layer_writer, ctx).await, + Self::RelBlocks(t) => t.doit(layer_writer, ctx).await, + Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await, + } + } +} + +impl From for AnyImportTask { + fn from(t: ImportSingleKeyTask) -> Self { + Self::SingleKey(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportRelBlocksTask) -> Self { + Self::RelBlocks(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportSlruBlocksTask) -> Self { + Self::SlruBlocks(t) + } +} + +struct ChunkProcessingJob { + timeline: Arc, + range: Range, + tasks: Vec, + + pgdata_lsn: Lsn, +} + +impl ChunkProcessingJob { + fn new(range: Range, tasks: Vec, env: &Flow) -> Self { + assert!(env.pgdata_lsn.is_valid()); + Self { + timeline: env.timeline.clone(), + range, + tasks, + pgdata_lsn: env.pgdata_lsn, + } + } + + async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> { + let mut writer = ImageLayerWriter::new( + self.timeline.conf, + self.timeline.timeline_id, + self.timeline.tenant_shard_id, + &self.range, + self.pgdata_lsn, + ctx, + ) + .await?; + + let mut nimages = 0; + for task in self.tasks { + nimages += task.doit(&mut writer, ctx).await?; + } + + let resident_layer = if nimages > 0 { + let (desc, path) = writer.finish(ctx).await?; + Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)? + } else { + // dropping the writer cleans up + return Ok(()); + }; + + // this is sharing the same code as create_image_layers + let mut guard = self.timeline.layers.write().await; + guard + .open_mut()? + .track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics); + crate::tenant::timeline::drop_wlock(guard); + + // Schedule the layer for upload but don't add barriers such as + // wait for completion or index upload, so we don't inhibit upload parallelism. + // TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?) + // TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level. + self.timeline + .remote_client + .schedule_layer_file_upload(resident_layer)?; + + Ok(()) + } +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs new file mode 100644 index 0000000000..8d5ab1780f --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -0,0 +1,315 @@ +use std::{ops::Bound, sync::Arc}; + +use anyhow::Context; +use bytes::Bytes; +use postgres_ffi::ControlFileData; +use remote_storage::{ + Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingObject, RemotePath, +}; +use serde::de::DeserializeOwned; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, instrument}; +use utils::lsn::Lsn; + +use crate::{assert_u64_eq_usize::U64IsUsize, config::PageServerConf}; + +use super::{importbucket_format, index_part_format}; + +pub async fn new( + conf: &'static PageServerConf, + location: &index_part_format::Location, + cancel: CancellationToken, +) -> Result { + // FIXME: we probably want some timeout, and we might be able to assume the max file + // size on S3 is 1GiB (postgres segment size). But the problem is that the individual + // downloaders don't know enough about concurrent downloads to make a guess on the + // expected bandwidth and resulting best timeout. + let timeout = std::time::Duration::from_secs(24 * 60 * 60); + let location_storage = match location { + #[cfg(feature = "testing")] + index_part_format::Location::LocalFs { path } => { + GenericRemoteStorage::LocalFs(remote_storage::LocalFs::new(path.clone(), timeout)?) + } + index_part_format::Location::AwsS3 { + region, + bucket, + key, + } => { + // TODO: think about security implications of letting the client specify the bucket & prefix. + // It's the most flexible right now, but, possibly we want to move bucket name into PS conf + // and force the timeline_id into the prefix? + GenericRemoteStorage::AwsS3(Arc::new( + remote_storage::S3Bucket::new( + &remote_storage::S3Config { + bucket_name: bucket.clone(), + prefix_in_bucket: Some(key.clone()), + bucket_region: region.clone(), + endpoint: conf + .import_pgdata_aws_endpoint_url + .clone() + .map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env + concurrency_limit: 100.try_into().unwrap(), // TODO: think about this + max_keys_per_list_response: Some(1000), // TODO: think about this + upload_storage_class: None, // irrelevant + }, + timeout, + ) + .await + .context("setup s3 bucket")?, + )) + } + }; + let storage_wrapper = RemoteStorageWrapper::new(location_storage, cancel); + Ok(storage_wrapper) +} + +/// Wrap [`remote_storage`] APIs to make it look a bit more like a filesystem API +/// such as [`tokio::fs`], which was used in the original implementation of the import code. +#[derive(Clone)] +pub struct RemoteStorageWrapper { + storage: GenericRemoteStorage, + cancel: CancellationToken, +} + +impl RemoteStorageWrapper { + pub fn new(storage: GenericRemoteStorage, cancel: CancellationToken) -> Self { + Self { storage, cancel } + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn listfilesindir( + &self, + path: &RemotePath, + ) -> Result, DownloadError> { + assert!( + path.object_name().is_some(), + "must specify dirname, without trailing slash" + ); + let path = path.add_trailing_slash(); + + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Listing { keys, prefixes: _ } = self + .storage + .list( + Some(&path), + remote_storage::ListingMode::WithDelimiter, + None, + &self.cancel, + ) + .await?; + let res = keys + .into_iter() + .map(|ListingObject { key, size, .. }| (key, size.into_usize())) + .collect(); + Ok(res) + }, + &format!("listfilesindir {path:?}"), + &self.cancel, + ) + .await; + debug!(?res, "returning"); + res + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn listdir(&self, path: &RemotePath) -> Result, DownloadError> { + assert!( + path.object_name().is_some(), + "must specify dirname, without trailing slash" + ); + let path = path.add_trailing_slash(); + + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Listing { keys, prefixes } = self + .storage + .list( + Some(&path), + remote_storage::ListingMode::WithDelimiter, + None, + &self.cancel, + ) + .await?; + let res = keys + .into_iter() + .map(|ListingObject { key, .. }| key) + .chain(prefixes.into_iter()) + .collect(); + Ok(res) + }, + &format!("listdir {path:?}"), + &self.cancel, + ) + .await; + debug!(?res, "returning"); + res + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn get(&self, path: &RemotePath) -> Result { + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Download { + download_stream, .. + } = self + .storage + .download(path, &DownloadOpts::default(), &self.cancel) + .await?; + let mut reader = tokio_util::io::StreamReader::new(download_stream); + + // XXX optimize this, can we get the capacity hint from somewhere? + let mut buf = Vec::new(); + tokio::io::copy_buf(&mut reader, &mut buf).await?; + Ok(Bytes::from(buf)) + }, + &format!("download {path:?}"), + &self.cancel, + ) + .await; + debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done"); + res + } + + pub async fn get_spec(&self) -> Result, anyhow::Error> { + self.get_json(&RemotePath::from_string("spec.json").unwrap()) + .await + .context("get spec") + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn get_json( + &self, + path: &RemotePath, + ) -> Result, DownloadError> { + let buf = match self.get(path).await { + Ok(buf) => buf, + Err(DownloadError::NotFound) => return Ok(None), + Err(err) => return Err(err), + }; + let res = serde_json::from_slice(&buf) + .context("serialize") + // TODO: own error type + .map_err(DownloadError::Other)?; + Ok(Some(res)) + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn put_json(&self, path: &RemotePath, value: &T) -> anyhow::Result<()> + where + T: serde::Serialize, + { + let buf = serde_json::to_vec(value)?; + let bytes = Bytes::from(buf); + utils::backoff::retry( + || async { + let size = bytes.len(); + let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone()))); + self.storage + .upload_storage_object(bytes, size, path, &self.cancel) + .await + }, + remote_storage::TimeoutOrCancel::caused_by_cancel, + 1, + u32::MAX, + &format!("put json {path}"), + &self.cancel, + ) + .await + .expect("practically infinite retries") + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn get_range( + &self, + path: &RemotePath, + start_inclusive: u64, + end_exclusive: u64, + ) -> Result, DownloadError> { + let len = end_exclusive + .checked_sub(start_inclusive) + .unwrap() + .into_usize(); + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Download { + download_stream, .. + } = self + .storage + .download( + path, + &DownloadOpts { + etag: None, + byte_start: Bound::Included(start_inclusive), + byte_end: Bound::Excluded(end_exclusive) + }, + &self.cancel) + .await?; + let mut reader = tokio_util::io::StreamReader::new(download_stream); + + let mut buf = Vec::with_capacity(len); + tokio::io::copy_buf(&mut reader, &mut buf).await?; + Ok(buf) + }, + &format!("download range len=0x{len:x} [0x{start_inclusive:x},0x{end_exclusive:x}) from {path:?}"), + &self.cancel, + ) + .await; + debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done"); + res + } + + pub fn pgdata(&self) -> RemotePath { + RemotePath::from_string("pgdata").unwrap() + } + + pub async fn get_control_file(&self) -> Result { + let control_file_path = self.pgdata().join("global/pg_control"); + info!("get control file from {control_file_path}"); + let control_file_buf = self.get(&control_file_path).await?; + ControlFile::new(control_file_buf) + } +} + +pub struct ControlFile { + control_file_data: ControlFileData, + control_file_buf: Bytes, +} + +impl ControlFile { + pub(crate) fn new(control_file_buf: Bytes) -> Result { + // XXX ControlFileData is version-specific, we're always using v14 here. v17 had changes. + let control_file_data = ControlFileData::decode(&control_file_buf)?; + let control_file = ControlFile { + control_file_data, + control_file_buf, + }; + control_file.try_pg_version()?; // so that we can offer infallible pg_version() + Ok(control_file) + } + pub(crate) fn base_lsn(&self) -> Lsn { + Lsn(self.control_file_data.checkPoint).align() + } + pub(crate) fn pg_version(&self) -> u32 { + self.try_pg_version() + .expect("prepare() checks that try_pg_version doesn't error") + } + pub(crate) fn control_file_data(&self) -> &ControlFileData { + &self.control_file_data + } + pub(crate) fn control_file_buf(&self) -> &Bytes { + &self.control_file_buf + } + fn try_pg_version(&self) -> anyhow::Result { + Ok(match self.control_file_data.catalog_version_no { + // thesea are from catversion.h + 202107181 => 14, + 202209061 => 15, + 202307071 => 16, + /* XXX pg17 */ + catversion => { + anyhow::bail!("unrecognized catalog version {catversion}") + } + }) + } +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs new file mode 100644 index 0000000000..04ba3c6f1f --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs @@ -0,0 +1,20 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +pub struct PgdataStatus { + pub done: bool, + // TODO: remaining fields +} + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +pub struct ShardStatus { + pub done: bool, + // TODO: remaining fields +} + +// TODO: dedupe with fast_import code +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +pub struct Spec { + pub project_id: String, + pub branch_id: String, +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs new file mode 100644 index 0000000000..310d97a6a9 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs @@ -0,0 +1,68 @@ +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "testing")] +use camino::Utf8PathBuf; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum Root { + V1(V1), +} +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum V1 { + InProgress(InProgress), + Done(Done), +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(transparent)] +pub struct IdempotencyKey(String); + +impl IdempotencyKey { + pub fn new(s: String) -> Self { + Self(s) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct InProgress { + pub idempotency_key: IdempotencyKey, + pub location: Location, + pub started_at: chrono::NaiveDateTime, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct Done { + pub idempotency_key: IdempotencyKey, + pub started_at: chrono::NaiveDateTime, + pub finished_at: chrono::NaiveDateTime, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum Location { + #[cfg(feature = "testing")] + LocalFs { path: Utf8PathBuf }, + AwsS3 { + region: String, + bucket: String, + key: String, + }, +} + +impl Root { + pub fn is_done(&self) -> bool { + match self { + Root::V1(v1) => match v1 { + V1::Done(_) => true, + V1::InProgress(_) => false, + }, + } + } + pub fn idempotency_key(&self) -> &IdempotencyKey { + match self { + Root::V1(v1) => match v1 { + V1::InProgress(in_progress) => &in_progress.idempotency_key, + V1::Done(done) => &done.idempotency_key, + }, + } + } +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs new file mode 100644 index 0000000000..c5210f9a30 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs @@ -0,0 +1,119 @@ +//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate. +use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt}; +use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; +use tracing::error; + +use crate::config::PageServerConf; +use reqwest::Method; + +use super::importbucket_format::Spec; + +pub struct Client { + base_url: String, + authorization_header: Option, + client: reqwest::Client, + cancel: CancellationToken, +} + +pub type Result = std::result::Result; + +#[derive(Serialize, Deserialize, Debug)] +struct ImportProgressRequest { + // no fields yet, not sure if there every will be any +} + +#[derive(Serialize, Deserialize, Debug)] +struct ImportProgressResponse { + // we don't care +} + +impl Client { + pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result { + let Some(ref base_url) = conf.import_pgdata_upcall_api else { + anyhow::bail!("import_pgdata_upcall_api is not configured") + }; + Ok(Self { + base_url: base_url.to_string(), + client: reqwest::Client::new(), + cancel, + authorization_header: conf + .import_pgdata_upcall_api_token + .as_ref() + .map(|secret_string| secret_string.get_contents()) + .map(|jwt| format!("Bearer {jwt}")), + }) + } + + fn start_request( + &self, + method: Method, + uri: U, + ) -> reqwest::RequestBuilder { + let req = self.client.request(method, uri); + if let Some(value) = &self.authorization_header { + req.header(reqwest::header::AUTHORIZATION, value) + } else { + req + } + } + + async fn request_noerror( + &self, + method: Method, + uri: U, + body: B, + ) -> Result { + self.start_request(method, uri) + .json(&body) + .send() + .await + .map_err(Error::ReceiveBody) + } + + async fn request( + &self, + method: Method, + uri: U, + body: B, + ) -> Result { + let res = self.request_noerror(method, uri, body).await?; + let response = res.error_from_body().await?; + Ok(response) + } + + pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> { + let url = format!( + "{}/projects/{}/branches/{}/import_progress", + self.base_url, spec.project_id, spec.branch_id + ); + let ImportProgressResponse {} = self + .request(Method::POST, url, &ImportProgressRequest {}) + .await? + .json() + .await + .map_err(Error::ReceiveBody)?; + Ok(()) + } + + pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> { + loop { + match self.send_progress_once(spec).await { + Ok(()) => return Ok(()), + Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")), + Err(err) => { + error!(?err, "error sending progress, retrying"); + if tokio::time::timeout( + std::time::Duration::from_secs(10), + self.cancel.cancelled(), + ) + .await + .is_ok() + { + anyhow::bail!("cancelled while sending early progress update"); + } + } + } + } + } +} diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index a93bdde3f8..80a09b4840 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -3,7 +3,7 @@ use std::{collections::hash_map::Entry, fs, sync::Arc}; use anyhow::Context; use camino::Utf8PathBuf; use tracing::{error, info, info_span}; -use utils::{fs_ext, id::TimelineId, lsn::Lsn}; +use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard}; use crate::{ context::RequestContext, @@ -23,14 +23,14 @@ use super::Timeline; pub struct UninitializedTimeline<'t> { pub(crate) owning_tenant: &'t Tenant, timeline_id: TimelineId, - raw_timeline: Option<(Arc, TimelineCreateGuard<'t>)>, + raw_timeline: Option<(Arc, TimelineCreateGuard)>, } impl<'t> UninitializedTimeline<'t> { pub(crate) fn new( owning_tenant: &'t Tenant, timeline_id: TimelineId, - raw_timeline: Option<(Arc, TimelineCreateGuard<'t>)>, + raw_timeline: Option<(Arc, TimelineCreateGuard)>, ) -> Self { Self { owning_tenant, @@ -87,6 +87,10 @@ impl<'t> UninitializedTimeline<'t> { } } + pub(crate) fn finish_creation_myself(&mut self) -> (Arc, TimelineCreateGuard) { + self.raw_timeline.take().expect("already checked") + } + /// Prepares timeline data by loading it from the basebackup archive. pub(crate) async fn import_basebackup_from_tar( self, @@ -167,9 +171,10 @@ pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) { /// A guard for timeline creations in process: as long as this object exists, the timeline ID /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline. #[must_use] -pub(crate) struct TimelineCreateGuard<'t> { - owning_tenant: &'t Tenant, - timeline_id: TimelineId, +pub(crate) struct TimelineCreateGuard { + pub(crate) _tenant_gate_guard: GateGuard, + pub(crate) owning_tenant: Arc, + pub(crate) timeline_id: TimelineId, pub(crate) timeline_path: Utf8PathBuf, pub(crate) idempotency: CreateTimelineIdempotency, } @@ -184,20 +189,27 @@ pub(crate) enum TimelineExclusionError { }, #[error("Already creating")] AlreadyCreating, + #[error("Shutting down")] + ShuttingDown, // e.g. I/O errors, or some failure deep in postgres initdb #[error(transparent)] Other(#[from] anyhow::Error), } -impl<'t> TimelineCreateGuard<'t> { +impl TimelineCreateGuard { pub(crate) fn new( - owning_tenant: &'t Tenant, + owning_tenant: &Arc, timeline_id: TimelineId, timeline_path: Utf8PathBuf, idempotency: CreateTimelineIdempotency, allow_offloaded: bool, ) -> Result { + let _tenant_gate_guard = owning_tenant + .gate + .enter() + .map_err(|_| TimelineExclusionError::ShuttingDown)?; + // Lock order: this is the only place we take both locks. During drop() we only // lock creating_timelines let timelines = owning_tenant.timelines.lock().unwrap(); @@ -225,8 +237,12 @@ impl<'t> TimelineCreateGuard<'t> { return Err(TimelineExclusionError::AlreadyCreating); } creating_timelines.insert(timeline_id); + drop(creating_timelines); + drop(timelines_offloaded); + drop(timelines); Ok(Self { - owning_tenant, + _tenant_gate_guard, + owning_tenant: Arc::clone(owning_tenant), timeline_id, timeline_path, idempotency, @@ -234,7 +250,7 @@ impl<'t> TimelineCreateGuard<'t> { } } -impl Drop for TimelineCreateGuard<'_> { +impl Drop for TimelineCreateGuard { fn drop(&mut self) { self.owning_tenant .timelines_creating diff --git a/test_runner/fixtures/common_types.py b/test_runner/fixtures/common_types.py index c73d5411fa..6c22b31e00 100644 --- a/test_runner/fixtures/common_types.py +++ b/test_runner/fixtures/common_types.py @@ -190,6 +190,25 @@ class TenantTimelineId: ) +@dataclass +class ShardIndex: + shard_number: int + shard_count: int + + # cf impl Display for ShardIndex + @override + def __str__(self) -> str: + return f"{self.shard_number:02x}{self.shard_count:02x}" + + @classmethod + def parse(cls: type[ShardIndex], input: str) -> ShardIndex: + assert len(input) == 4 + return cls( + shard_number=int(input[0:2], 16), + shard_count=int(input[2:4], 16), + ) + + class TenantShardId: def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int): self.tenant_id = tenant_id @@ -222,6 +241,10 @@ class TenantShardId: # Unsharded case: equivalent of Rust TenantShardId::unsharded(tenant_id) return str(self.tenant_id) + @property + def shard_index(self) -> ShardIndex: + return ShardIndex(self.shard_number, self.shard_count) + @override def __repr__(self): return self.__str__() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d8d2b87b4e..78e2422171 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1883,6 +1883,20 @@ class NeonStorageController(MetricsGetter, LogUtils): response.raise_for_status() log.info(f"tenant_create success: {response.json()}") + def timeline_create( + self, + tenant_id: TenantId, + body: dict[str, Any], + ): + response = self.request( + "POST", + f"{self.api}/v1/tenant/{tenant_id}/timeline", + json=body, + headers=self.headers(TokenScope.PAGE_SERVER_API), + ) + response.raise_for_status() + log.info(f"timeline_create success: {response.json()}") + def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: """ :return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int} diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 56386fdd37..4cf3ece396 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -1,5 +1,9 @@ from __future__ import annotations +import dataclasses +import json +import random +import string import time from collections import defaultdict from dataclasses import dataclass @@ -10,7 +14,14 @@ import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId +from fixtures.common_types import ( + Id, + Lsn, + TenantId, + TenantShardId, + TimelineArchivalState, + TimelineId, +) from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics from fixtures.pg_version import PgVersion @@ -24,6 +35,69 @@ class PageserverApiException(Exception): self.status_code = status_code +@dataclass +class ImportPgdataIdemptencyKey: + key: str + + @staticmethod + def random() -> ImportPgdataIdemptencyKey: + return ImportPgdataIdemptencyKey( + "".join(random.choices(string.ascii_letters + string.digits, k=20)) + ) + + +@dataclass +class LocalFs: + path: str + + +@dataclass +class AwsS3: + region: str + bucket: str + key: str + + +@dataclass +class ImportPgdataLocation: + LocalFs: None | LocalFs = None + AwsS3: None | AwsS3 = None + + +@dataclass +class TimelineCreateRequestModeImportPgdata: + location: ImportPgdataLocation + idempotency_key: ImportPgdataIdemptencyKey + + +@dataclass +class TimelineCreateRequestMode: + Branch: None | dict[str, Any] = None + Bootstrap: None | dict[str, Any] = None + ImportPgdata: None | TimelineCreateRequestModeImportPgdata = None + + +@dataclass +class TimelineCreateRequest: + new_timeline_id: TimelineId + mode: TimelineCreateRequestMode + + def to_json(self) -> str: + class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o): + if dataclasses.is_dataclass(o) and not isinstance(o, type): + return dataclasses.asdict(o) + elif isinstance(o, Id): + return o.id.hex() + return super().default(o) + + # mode is flattened + this = dataclasses.asdict(self) + mode = this.pop("mode") + this.update(mode) + return json.dumps(self, cls=EnhancedJSONEncoder) + + class TimelineCreate406(PageserverApiException): def __init__(self, res: requests.Response): assert res.status_code == 406 diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 010801be6c..30720e648d 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -674,6 +674,13 @@ def run_only_on_default_postgres(reason: str): ) +def run_only_on_postgres(versions: Iterable[PgVersion], reason: str): + return pytest.mark.skipif( + PgVersion(os.getenv("DEFAULT_PG_VERSION", PgVersion.DEFAULT)) not in versions, + reason=reason, + ) + + def skip_in_debug_build(reason: str): return pytest.mark.skipif( os.getenv("BUILD_TYPE", "debug") == "debug", diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py new file mode 100644 index 0000000000..29229b73c1 --- /dev/null +++ b/test_runner/regress/test_import_pgdata.py @@ -0,0 +1,307 @@ +import json +import re +import time +from enum import Enum + +import psycopg2 +import psycopg2.errors +import pytest +from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, VanillaPostgres +from fixtures.pageserver.http import ( + ImportPgdataIdemptencyKey, + PageserverApiException, +) +from fixtures.pg_version import PgVersion +from fixtures.remote_storage import RemoteStorageKind +from fixtures.utils import run_only_on_postgres +from pytest_httpserver import HTTPServer +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + +num_rows = 1000 + + +class RelBlockSize(Enum): + ONE_STRIPE_SIZE = 1 + TWO_STRPES_PER_SHARD = 2 + MULTIPLE_RELATION_SEGMENTS = 3 + + +smoke_params = [ + # unsharded (the stripe size needs to be given for rel block size calculations) + *[(None, 1024, s) for s in RelBlockSize], + # many shards, small stripe size to speed up test + *[(8, 1024, s) for s in RelBlockSize], +] + + +@run_only_on_postgres( + [PgVersion.V14, PgVersion.V15, PgVersion.V16], + "newer control file catalog version and struct format isn't supported", +) +@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params) +def test_pgdata_import_smoke( + vanilla_pg: VanillaPostgres, + neon_env_builder: NeonEnvBuilder, + shard_count: int | None, + stripe_size: int, + rel_block_size: RelBlockSize, + make_httpserver: HTTPServer, +): + # + # Setup fake control plane for import progress + # + def handler(request: Request) -> Response: + log.info(f"control plane request: {request.json}") + return Response(json.dumps({}), status=200) + + cplane_mgmt_api_server = make_httpserver + cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler) + + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + env = neon_env_builder.init_start() + + env.pageserver.patch_config_toml_nonrecursive( + { + "import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api" + } + ) + env.pageserver.stop() + env.pageserver.start() + + # + # Put data in vanilla pg + # + + vanilla_pg.start() + vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") + + log.info("create relblock data") + if rel_block_size == RelBlockSize.ONE_STRIPE_SIZE: + target_relblock_size = stripe_size * 8192 + elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD: + target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2 + elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS: + target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192) + else: + raise ValueError + + # fillfactor so we don't need to produce that much data + # 900 byte per row is > 10% => 1 row per page + vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""") + + nrows = 0 + while True: + relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')") + log.info( + f"relblock size: {relblock_size/8192} pages (target: {target_relblock_size//8192}) pages" + ) + if relblock_size >= target_relblock_size: + break + addrows = int((target_relblock_size - relblock_size) // 8192) + assert addrows >= 1, "forward progress" + vanilla_pg.safe_psql(f"insert into t select generate_series({nrows+1}, {nrows + addrows})") + nrows += addrows + expect_nrows = nrows + expect_sum = ( + (nrows) * (nrows + 1) // 2 + ) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n + + def validate_vanilla_equivalence(ep): + # TODO: would be nicer to just compare pgdump + assert ep.safe_psql("select count(*), sum(data::bigint)::bigint from t") == [ + (expect_nrows, expect_sum) + ] + + validate_vanilla_equivalence(vanilla_pg) + + vanilla_pg.stop() + + # + # We have a Postgres data directory now. + # Make a localfs remote storage that looks like how after `fast_import` ran. + # TODO: actually exercise fast_import here + # TODO: test s3 remote storage + # + importbucket = neon_env_builder.repo_dir / "importbucket" + importbucket.mkdir() + # what cplane writes before scheduling fast_import + specpath = importbucket / "spec.json" + specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) + # what fast_import writes + vanilla_pg.pgdatadir.rename(importbucket / "pgdata") + statusdir = importbucket / "status" + statusdir.mkdir() + (statusdir / "pgdata").write_text(json.dumps({"done": True})) + + # + # Do the import + # + + tenant_id = TenantId.generate() + env.storage_controller.tenant_create( + tenant_id, shard_count=shard_count, shard_stripe_size=stripe_size + ) + + timeline_id = TimelineId.generate() + log.info("starting import") + start = time.monotonic() + + idempotency = ImportPgdataIdemptencyKey.random() + log.info(f"idempotency key {idempotency}") + # TODO: teach neon_local CLI about the idempotency & 429 error so we can run inside the loop + # and check for 429 + + import_branch_name = "imported" + env.storage_controller.timeline_create( + tenant_id, + { + "new_timeline_id": str(timeline_id), + "import_pgdata": { + "idempotency_key": str(idempotency), + "location": {"LocalFs": {"path": str(importbucket.absolute())}}, + }, + }, + ) + env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id) + + while True: + locations = env.storage_controller.locate(tenant_id) + active_count = 0 + for location in locations: + shard_id = TenantShardId.parse(location["shard_id"]) + ps = env.get_pageserver(location["node_id"]) + try: + detail = ps.http_client().timeline_detail(shard_id, timeline_id) + state = detail["state"] + log.info(f"shard {shard_id} state: {state}") + if state == "Active": + active_count += 1 + except PageserverApiException as e: + if e.status_code == 404: + log.info("not found, import is in progress") + continue + elif e.status_code == 429: + log.info("import is in progress") + continue + else: + raise + + shard_status_file = statusdir / f"shard-{shard_id.shard_index}" + if state == "Active": + shard_status_file_contents = ( + shard_status_file.read_text() + ) # Active state implies import is done + shard_status = json.loads(shard_status_file_contents) + assert shard_status["done"] is True + + if active_count == len(locations): + log.info("all shards are active") + break + time.sleep(1) + + import_duration = time.monotonic() - start + log.info(f"import complete; duration={import_duration:.2f}s") + + # + # Get some timeline details for later. + # + locations = env.storage_controller.locate(tenant_id) + [shard_zero] = [ + loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0 + ] + shard_zero_ps = env.get_pageserver(shard_zero["node_id"]) + shard_zero_http = shard_zero_ps.http_client() + shard_zero_timeline_info = shard_zero_http.timeline_detail(shard_zero["shard_id"], timeline_id) + initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"]) + latest_gc_cutoff_lsn = Lsn(shard_zero_timeline_info["latest_gc_cutoff_lsn"]) + last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"]) + disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"]) + _remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"]) + remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"]) + # assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None` + assert remote_consistent_lsn_visible == disk_consistent_lsn + assert initdb_lsn == latest_gc_cutoff_lsn + assert disk_consistent_lsn == initdb_lsn + 8 + assert last_record_lsn == disk_consistent_lsn + # TODO: assert these values are the same everywhere + + # + # Validate the resulting remote storage state. + # + + # + # Validate the imported data + # + + ro_endpoint = env.endpoints.create_start( + branch_name=import_branch_name, endpoint_id="ro", tenant_id=tenant_id, lsn=last_record_lsn + ) + + validate_vanilla_equivalence(ro_endpoint) + + # ensure the import survives restarts + ro_endpoint.stop() + env.pageserver.stop(immediate=True) + env.pageserver.start() + ro_endpoint.start() + validate_vanilla_equivalence(ro_endpoint) + + # + # validate the layer files in each shard only have the shard-specific data + # (the implementation would be functional but not efficient without this characteristic) + # + + shards = env.storage_controller.locate(tenant_id) + for shard in shards: + shard_ps = env.get_pageserver(shard["node_id"]) + result = shard_ps.timeline_scan_no_disposable_keys(shard["shard_id"], timeline_id) + assert result.tally.disposable_count == 0 + assert ( + result.tally.not_disposable_count > 0 + ), "sanity check, each shard should have some data" + + # + # validate that we can write + # + rw_endpoint = env.endpoints.create_start( + branch_name=import_branch_name, endpoint_id="rw", tenant_id=tenant_id + ) + rw_endpoint.safe_psql("create table othertable(values text)") + rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()")) + + # TODO: consider using `class Workload` here + # to do compaction and whatnot? + + # + # validate that we can branch (important use case) + # + + # ... at the tip + _ = env.create_branch( + new_branch_name="br-tip", + ancestor_branch_name=import_branch_name, + tenant_id=tenant_id, + ancestor_start_lsn=rw_lsn, + ) + br_tip_endpoint = env.endpoints.create_start( + branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id + ) + validate_vanilla_equivalence(br_tip_endpoint) + br_tip_endpoint.safe_psql("select * from othertable") + + # ... at the initdb lsn + _ = env.create_branch( + new_branch_name="br-initdb", + ancestor_branch_name=import_branch_name, + tenant_id=tenant_id, + ancestor_start_lsn=initdb_lsn, + ) + br_initdb_endpoint = env.endpoints.create_start( + branch_name="br-initdb", endpoint_id="br-initdb-ro", tenant_id=tenant_id + ) + validate_vanilla_equivalence(br_initdb_endpoint) + with pytest.raises(psycopg2.errors.UndefinedTable): + br_initdb_endpoint.safe_psql("select * from othertable") diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 667d54df02..a73d9d6352 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -19,7 +19,8 @@ ahash = { version = "0.8" } anyhow = { version = "1", features = ["backtrace"] } axum = { version = "0.7", features = ["ws"] } axum-core = { version = "0.4", default-features = false, features = ["tracing"] } -base64 = { version = "0.21", features = ["alloc"] } +base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] } +base64-647d43efb71741da = { package = "base64", version = "0.21", features = ["alloc"] } base64ct = { version = "1", default-features = false, features = ["std"] } bytes = { version = "1", features = ["serde"] } camino = { version = "1", default-features = false, features = ["serde1"] }