diff --git a/pageserver/src/bin/import.rs b/pageserver/src/bin/import.rs index a4d96bcc9c..7873f4d5d6 100644 --- a/pageserver/src/bin/import.rs +++ b/pageserver/src/bin/import.rs @@ -2,8 +2,11 @@ use anyhow; use camino::Utf8PathBuf; use clap::Parser; use pageserver::{pg_import, virtual_file::{self, api::IoEngineKind}}; +use utils::id::{TenantId, TimelineId}; use utils::logging::{self, LogFormat, TracingErrorLayerEnablement}; +use std::str::FromStr; + //project_git_version!(GIT_VERSION); #[derive(Parser)] @@ -18,6 +21,11 @@ struct CliOpts { /// Path to local dir where the layer files will be stored dest_path: Utf8PathBuf, + + #[arg(long, default_value_t = TenantId::from_str("42424242424242424242424242424242").unwrap())] + tenant_id: TenantId, + #[arg(long, default_value_t = TimelineId::from_str("42424242424242424242424242424242").unwrap())] + timeline_id: TimelineId, } fn main() -> anyhow::Result<()> { @@ -45,7 +53,9 @@ fn main() -> anyhow::Result<()> { } async fn async_main(cli: CliOpts) -> anyhow::Result<()> { - let mut import = pg_import::PgImportEnv::init().await?; - import.import_datadir(&cli.pgdata, &cli.dest_path).await?; + let mut import = pg_import::PgImportEnv::init(&cli.dest_path, cli.tenant_id, cli.timeline_id).await?; + + import.import_datadir(&cli.pgdata).await?; + Ok(()) } diff --git a/pageserver/src/pg_import.rs b/pageserver/src/pg_import.rs index 9c459f0152..4ef896c375 100644 --- a/pageserver/src/pg_import.rs +++ b/pageserver/src/pg_import.rs @@ -16,6 +16,14 @@ use crate::{context::{DownloadBehavior, RequestContext}, import_datadir, pgdatad use crate::config::PageServerConf; use tokio::io::AsyncReadExt; +use crate::tenant::storage_layer::PersistentLayerDesc; +use utils::generation::Generation; +use utils::lsn::Lsn; +use crate::tenant::IndexPart; +use crate::tenant::metadata::TimelineMetadata; +use crate::tenant::remote_timeline_client; +use crate::tenant::remote_timeline_client::LayerFileMetadata; +use pageserver_api::shard::ShardIndex; use pageserver_api::key::Key; use utils::bin_ser::BeSer; @@ -24,24 +32,24 @@ pub struct PgImportEnv { conf: &'static PageServerConf, tli: TimelineId, tsi: TenantShardId, + + pgdata_lsn: Lsn, } impl PgImportEnv { - pub async fn init() -> anyhow::Result { + pub async fn init(dstdir: &Utf8Path, tenant_id: TenantId, timeline_id: TimelineId) -> anyhow::Result { let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let config = toml_edit::Document::new(); let conf = PageServerConf::parse_and_validate( NodeId(42), &config, - &Utf8PathBuf::from("layers") + dstdir )?; let conf = Box::leak(Box::new(conf)); - let tni = TenantId::from_str("42424242424242424242424242424242")?; - let tli = TimelineId::from_str("42424242424242424242424242424242")?; let tsi = TenantShardId { - tenant_id: tni, + tenant_id, shard_number: ShardNumber(0), shard_count: ShardCount(0), }; @@ -49,16 +57,20 @@ impl PgImportEnv { Ok(PgImportEnv { ctx, conf, - tli, + tli: timeline_id, tsi, + pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported }) } - pub async fn import_datadir(&mut self, pgdata_path: &Utf8PathBuf, _tenant_path: &Utf8Path) -> anyhow::Result<()> { + pub async fn import_datadir(&mut self, pgdata_path: &Utf8PathBuf) -> anyhow::Result<()> { let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align(); - println!("Importing {pgdata_path} to {_tenant_path} as lsn {pgdata_lsn}..."); + let timeline_path = self.conf.timeline_path(&self.tsi, &self.tli); + + println!("Importing {pgdata_path} to {timeline_path} as lsn {pgdata_lsn}..."); + self.pgdata_lsn = pgdata_lsn; let datadir = PgDataDir::new(pgdata_path); @@ -85,10 +97,13 @@ impl PgImportEnv { self.import_db(&mut one_big_layer, &db).await?; } - one_big_layer.finish_layer(&self.ctx).await?; + let layerdesc = one_big_layer.finish_layer(&self.ctx).await?; // should we anything about the wal? + // Create index_part.json file + self.create_index_part(&[layerdesc]).await?; + Ok(()) } @@ -168,6 +183,37 @@ impl PgImportEnv { Ok(()) } + async fn create_index_part(&mut self, layers: &[PersistentLayerDesc]) -> anyhow::Result<()> { + let dstdir = &self.conf.workdir; + + let metadata = TimelineMetadata::new( + self.pgdata_lsn, + None, // prev_record_lsn + None, // no ancestor + Lsn(0), + self.pgdata_lsn, // latest_gc_cutoff_lsn + self.pgdata_lsn, // initdb_lsn + 16, // FIXME: Postgres version. Read from control file + ); + let generation = Generation::none(); + let mut index_part = IndexPart::empty(metadata); + + for l in layers { + let name = l.layer_name(); + let metadata = LayerFileMetadata::new(l.file_size, generation, ShardIndex::unsharded()); + if let Some(_) = index_part.layer_metadata.insert(name.clone(), metadata) { + bail!("duplicate layer filename {name}"); + } + } + + let data = index_part.to_s3_bytes()?; + let path = remote_timeline_client::remote_index_path(&self.tsi, &self.tli, generation); + let path = dstdir.join(path.get_path()); + std::fs::write(&path, data) + .context("could not write {path}")?; + + Ok(()) + } } // diff --git a/test_runner/regress/test_pg_import.py b/test_runner/regress/test_pg_import.py index 88e722347b..10248f77e4 100644 --- a/test_runner/regress/test_pg_import.py +++ b/test_runner/regress/test_pg_import.py @@ -1,14 +1,16 @@ import os from pathlib import Path -from fixtures.common_types import Lsn +from fixtures.common_types import Lsn, TenantId, TimelineId from fixtures.log_helper import log from fixtures.neon_fixtures import ( + AbstractNeonCli, NeonEnvBuilder, PgBin, VanillaPostgres, ) from fixtures.port_distributor import PortDistributor +from fixtures.remote_storage import RemoteStorageKind from fixtures.utils import query_scalar, subprocess_capture num_rows = 1000 @@ -20,8 +22,8 @@ class ImportCli(AbstractNeonCli): COMMAND = "import" - def import(self, pgdatadir: Path, dest_dir: Path): - res = self.raw_cli([str(pgdatadir), str(dest_dir)]) + def run_import(self, pgdatadir: Path, dest_dir: Path, tenant_id: TenantId, timeline_id: TimelineId): + res = self.raw_cli(["--tenant-id", str(tenant_id), "--timeline-id", str(timeline_id), str(pgdatadir), str(dest_dir)]) res.check_returncode() @@ -39,18 +41,33 @@ def test_pg_import(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): # We have a Postgres data directory to import now + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start() # Set up pageserver for import - neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) # Run pg_import utility, pointing directly to a directory in the remote storage dir - tenant = TenantId.generate() - timeline = TimelineId.generate() + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() - timelinedir = env.pagserver.workdir() / "tenants" / tenant_id / "timelines" / timeline + dst_path = env.pageserver_remote_storage.root + tline_path = env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) + tline_path.mkdir(parents=True) cli = ImportCli(env) - cli.import(vanilla_pg.pgdata(), + cli.run_import(vanilla_pg.pgdatadir, dst_path, tenant_id=tenant_id, timeline_id=timeline_id) # TODO: tell pageserver / storage controller that the tenant/timeline now exists + env.pageserver.tenant_attach( + tenant_id, + generation=100, + override_storage_controller_generation=True, + ) + + env.neon_cli.map_branch("imported", tenant_id, timeline_id) + + endpoint = env.endpoints.create_start(branch_name="imported", tenant_id=tenant_id) + conn = endpoint.connect() + cur = conn.cursor() + + assert endpoint.safe_psql("select count(*) from t") == [(300000,)]