resolve conflicts

This commit is contained in:
Stas Kelvich
2024-09-12 13:13:16 +01:00
3 changed files with 92 additions and 19 deletions

View File

@@ -2,8 +2,11 @@ use anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use pageserver::{pg_import, virtual_file::{self, api::IoEngineKind}};
use utils::id::{TenantId, TimelineId};
use utils::logging::{self, LogFormat, TracingErrorLayerEnablement};
use std::str::FromStr;
//project_git_version!(GIT_VERSION);
#[derive(Parser)]
@@ -18,6 +21,11 @@ struct CliOpts {
/// Path to local dir where the layer files will be stored
dest_path: Utf8PathBuf,
#[arg(long, default_value_t = TenantId::from_str("42424242424242424242424242424242").unwrap())]
tenant_id: TenantId,
#[arg(long, default_value_t = TimelineId::from_str("42424242424242424242424242424242").unwrap())]
timeline_id: TimelineId,
}
fn main() -> anyhow::Result<()> {
@@ -45,7 +53,9 @@ fn main() -> anyhow::Result<()> {
}
async fn async_main(cli: CliOpts) -> anyhow::Result<()> {
let mut import = pg_import::PgImportEnv::init().await?;
import.import_datadir(&cli.pgdata, &cli.dest_path).await?;
let mut import = pg_import::PgImportEnv::init(&cli.dest_path, cli.tenant_id, cli.timeline_id).await?;
import.import_datadir(&cli.pgdata).await?;
Ok(())
}

View File

@@ -16,6 +16,14 @@ use crate::{context::{DownloadBehavior, RequestContext}, import_datadir, pgdatad
use crate::config::PageServerConf;
use tokio::io::AsyncReadExt;
use crate::tenant::storage_layer::PersistentLayerDesc;
use utils::generation::Generation;
use utils::lsn::Lsn;
use crate::tenant::IndexPart;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use pageserver_api::key::Key;
use utils::bin_ser::BeSer;
@@ -24,24 +32,24 @@ pub struct PgImportEnv {
conf: &'static PageServerConf,
tli: TimelineId,
tsi: TenantShardId,
pgdata_lsn: Lsn,
}
impl PgImportEnv {
pub async fn init() -> anyhow::Result<PgImportEnv> {
pub async fn init(dstdir: &Utf8Path, tenant_id: TenantId, timeline_id: TimelineId) -> anyhow::Result<PgImportEnv> {
let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let config = toml_edit::Document::new();
let conf = PageServerConf::parse_and_validate(
NodeId(42),
&config,
&Utf8PathBuf::from("layers")
dstdir
)?;
let conf = Box::leak(Box::new(conf));
let tni = TenantId::from_str("42424242424242424242424242424242")?;
let tli = TimelineId::from_str("42424242424242424242424242424242")?;
let tsi = TenantShardId {
tenant_id: tni,
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
};
@@ -49,16 +57,20 @@ impl PgImportEnv {
Ok(PgImportEnv {
ctx,
conf,
tli,
tli: timeline_id,
tsi,
pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported
})
}
pub async fn import_datadir(&mut self, pgdata_path: &Utf8PathBuf, _tenant_path: &Utf8Path) -> anyhow::Result<()> {
pub async fn import_datadir(&mut self, pgdata_path: &Utf8PathBuf) -> anyhow::Result<()> {
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
println!("Importing {pgdata_path} to {_tenant_path} as lsn {pgdata_lsn}...");
let timeline_path = self.conf.timeline_path(&self.tsi, &self.tli);
println!("Importing {pgdata_path} to {timeline_path} as lsn {pgdata_lsn}...");
self.pgdata_lsn = pgdata_lsn;
let datadir = PgDataDir::new(pgdata_path);
@@ -85,10 +97,13 @@ impl PgImportEnv {
self.import_db(&mut one_big_layer, &db).await?;
}
one_big_layer.finish_layer(&self.ctx).await?;
let layerdesc = one_big_layer.finish_layer(&self.ctx).await?;
// should we anything about the wal?
// Create index_part.json file
self.create_index_part(&[layerdesc]).await?;
Ok(())
}
@@ -168,6 +183,37 @@ impl PgImportEnv {
Ok(())
}
async fn create_index_part(&mut self, layers: &[PersistentLayerDesc]) -> anyhow::Result<()> {
let dstdir = &self.conf.workdir;
let metadata = TimelineMetadata::new(
self.pgdata_lsn,
None, // prev_record_lsn
None, // no ancestor
Lsn(0),
self.pgdata_lsn, // latest_gc_cutoff_lsn
self.pgdata_lsn, // initdb_lsn
16, // FIXME: Postgres version. Read from control file
);
let generation = Generation::none();
let mut index_part = IndexPart::empty(metadata);
for l in layers {
let name = l.layer_name();
let metadata = LayerFileMetadata::new(l.file_size, generation, ShardIndex::unsharded());
if let Some(_) = index_part.layer_metadata.insert(name.clone(), metadata) {
bail!("duplicate layer filename {name}");
}
}
let data = index_part.to_s3_bytes()?;
let path = remote_timeline_client::remote_index_path(&self.tsi, &self.tli, generation);
let path = dstdir.join(path.get_path());
std::fs::write(&path, data)
.context("could not write {path}")?;
Ok(())
}
}
//

View File

@@ -1,14 +1,16 @@
import os
from pathlib import Path
from fixtures.common_types import Lsn
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AbstractNeonCli,
NeonEnvBuilder,
PgBin,
VanillaPostgres,
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import query_scalar, subprocess_capture
num_rows = 1000
@@ -20,8 +22,8 @@ class ImportCli(AbstractNeonCli):
COMMAND = "import"
def import(self, pgdatadir: Path, dest_dir: Path):
res = self.raw_cli([str(pgdatadir), str(dest_dir)])
def run_import(self, pgdatadir: Path, dest_dir: Path, tenant_id: TenantId, timeline_id: TimelineId):
res = self.raw_cli(["--tenant-id", str(tenant_id), "--timeline-id", str(timeline_id), str(pgdatadir), str(dest_dir)])
res.check_returncode()
@@ -39,18 +41,33 @@ def test_pg_import(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
# We have a Postgres data directory to import now
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
# Set up pageserver for import
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# Run pg_import utility, pointing directly to a directory in the remote storage dir
tenant = TenantId.generate()
timeline = TimelineId.generate()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
timelinedir = env.pagserver.workdir() / "tenants" / tenant_id / "timelines" / timeline
dst_path = env.pageserver_remote_storage.root
tline_path = env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id)
tline_path.mkdir(parents=True)
cli = ImportCli(env)
cli.import(vanilla_pg.pgdata(),
cli.run_import(vanilla_pg.pgdatadir, dst_path, tenant_id=tenant_id, timeline_id=timeline_id)
# TODO: tell pageserver / storage controller that the tenant/timeline now exists
env.pageserver.tenant_attach(
tenant_id,
generation=100,
override_storage_controller_generation=True,
)
env.neon_cli.map_branch("imported", tenant_id, timeline_id)
endpoint = env.endpoints.create_start(branch_name="imported", tenant_id=tenant_id)
conn = endpoint.connect()
cur = conn.cursor()
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]