Compare commits

...

35 Commits

Author SHA1 Message Date
Stas Kelvich
2bf9a350ef Process chunks in parallel 2024-09-14 14:07:35 +01:00
Heikki Linnakangas
2e7e5f4f3a Refactor how the image layer partitioning is done
It got pretty ugly after the last commit. Refactor it so that we first
collect all the key ranges that need to be written out into a list of
tasks, then partition the tasks into image layers, and then write them
out. This will be much easier to parallelize, but that's not included
in this commit yet.
2024-09-13 05:41:35 +03:00
Heikki Linnakangas
9f3d5826be Fix with files > 4 GB 2024-09-13 02:27:26 +03:00
Heikki Linnakangas
4f39501641 Fix handling relations > 1 GB 2024-09-13 02:04:48 +03:00
Heikki Linnakangas
67d1606f82 Write out multiple image layers 2024-09-13 01:47:10 +03:00
Heikki Linnakangas
9351ba26ff Fake LSN
Test passes, yay!
2024-09-12 21:44:49 +03:00
Stas Kelvich
8df388330b Merge branch 'hack/fast-import' of github.com:neondatabase/neon into hack/fast-import 2024-09-12 19:26:16 +01:00
Stas Kelvich
357c07dd35 track rel file import time 2024-09-12 19:26:03 +01:00
Heikki Linnakangas
7b90ec6e19 Create controlfile and checkpoint entries
XXX: untested, not sure if it works..
2024-09-12 21:01:04 +03:00
Heikki Linnakangas
85f4e966e8 Import dummy pg_twophase dir entry 2024-09-12 20:54:16 +03:00
Heikki Linnakangas
4d27048d6d Import SLRUs 2024-09-12 20:46:20 +03:00
Stas Kelvich
3a452d8f56 remove old timeline init code 2024-09-12 18:20:13 +01:00
Stas Kelvich
b81dbc887b import relation sizes 2024-09-12 18:19:25 +01:00
Stas Kelvich
80fed9cfb1 fix oder of insertion for relmaps and reldirs 2024-09-12 15:43:54 +01:00
Stas Kelvich
189386b22f Merge branch 'hack/fast-import' of github.com:neondatabase/neon into hack/fast-import 2024-09-12 13:52:11 +01:00
Stas Kelvich
38dfecb026 clean imports 2024-09-12 13:51:48 +01:00
Stas Kelvich
be28bd8312 merge 2024-09-12 13:49:34 +01:00
Heikki Linnakangas
9759d6ec72 Rename the image layer to not have the temp suffix 2024-09-12 15:49:21 +03:00
Stas Kelvich
0c64d55a6b Import dbdir, relmaps, reldirs 2024-09-12 13:48:29 +01:00
Heikki Linnakangas
578da1dc02 Parse postgres version from control file 2024-09-12 15:21:59 +03:00
Stas Kelvich
842ac7cfda resolve conflicts 2024-09-12 13:13:16 +01:00
Stas Kelvich
71340e3c00 common iterators for pg data dirs 2024-09-12 13:10:35 +01:00
Heikki Linnakangas
e6e0b27dc3 Create index_part.json 2024-09-12 14:53:29 +03:00
Heikki Linnakangas
04ec8bd7de test: Attach the tenant, start endpoint on it
Doesn't work yet, I think because index_part.json is missing
2024-09-12 13:52:14 +03:00
Heikki Linnakangas
6563be1a4c Test passes now
It runs the command successfully. Doesn't try to attach it to the
pageserver on it yet

    BUILD_TYPE=debug DEFAULT_PG_VERSION=16 poetry run pytest --preserve-database-files test_runner/regress/test_pg_import.py
2024-09-12 13:36:42 +03:00
Heikki Linnakangas
fe975acc71 Add --tenant-id and --timeline-id options 2024-09-12 13:28:12 +03:00
Heikki Linnakangas
abed35589b Test fix 2024-09-12 12:59:45 +03:00
Stas Kelvich
3fe8b69968 Merge branch 'hack/fast-import' of github.com:neondatabase/neon into hack/fast-import 2024-09-12 10:59:24 +01:00
Stas Kelvich
0c856443c4 now it produces an image layer 2024-09-12 10:57:50 +01:00
Heikki Linnakangas
0fc584ef9a Add python test 2024-09-12 12:43:12 +03:00
Stas Kelvich
daedec65ac fix awaits 2024-09-12 10:42:08 +01:00
Stas Kelvich
94c393bf8f resolve conflicts 2024-09-12 10:37:07 +01:00
Stas Kelvich
28616b0907 compiles 2024-09-12 10:33:14 +01:00
Heikki Linnakangas
241724f3fc CLI args parsing 2024-09-12 12:31:07 +03:00
Stas Kelvich
98d128d993 first sketch 2024-09-12 09:59:36 +01:00
7 changed files with 846 additions and 14 deletions

View File

@@ -48,7 +48,7 @@ pub struct ShardedRange<'a> {
// Calculate the size of a range within the blocks of the same relation, or spanning only the
// top page in the previous relation's space.
fn contiguous_range_len(range: &Range<Key>) -> u32 {
pub fn contiguous_range_len(range: &Range<Key>) -> u32 {
debug_assert!(is_contiguous_range(range));
if range.start.field6 == 0xffffffff {
range.end.field6 + 1
@@ -67,7 +67,7 @@ fn contiguous_range_len(range: &Range<Key>) -> u32 {
/// This matters, because:
/// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse.
/// - Within such ranges, we may calculate distances using simple subtraction of field6.
fn is_contiguous_range(range: &Range<Key>) -> bool {
pub fn is_contiguous_range(range: &Range<Key>) -> bool {
range.start.field1 == range.end.field1
&& range.start.field2 == range.end.field2
&& range.start.field3 == range.end.field3

View File

@@ -0,0 +1,61 @@
use anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use pageserver::{pg_import, virtual_file::{self, api::IoEngineKind}};
use utils::id::{TenantId, TimelineId};
use utils::logging::{self, LogFormat, TracingErrorLayerEnablement};
use std::str::FromStr;
//project_git_version!(GIT_VERSION);
#[derive(Parser)]
#[command(
//version = GIT_VERSION,
about = "Utility to import a Postgres data directory directly into image layers",
//long_about = "..."
)]
struct CliOpts {
/// Input Postgres data directory
pgdata: Utf8PathBuf,
/// Path to local dir where the layer files will be stored
dest_path: Utf8PathBuf,
#[arg(long, default_value_t = TenantId::from_str("42424242424242424242424242424242").unwrap())]
tenant_id: TenantId,
#[arg(long, default_value_t = TimelineId::from_str("42424242424242424242424242424242").unwrap())]
timeline_id: TimelineId,
}
fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)?;
virtual_file::init(
100,
IoEngineKind::StdFs,
512,
);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let cli = CliOpts::parse();
rt.block_on(async_main(cli))?;
Ok(())
}
async fn async_main(cli: CliOpts) -> anyhow::Result<()> {
let mut import = pg_import::PgImportEnv::init(&cli.dest_path, cli.tenant_id, cli.timeline_id).await?;
import.import_datadir(&cli.pgdata).await?;
Ok(())
}

View File

@@ -32,6 +32,7 @@ pub mod virtual_file;
pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod pg_import;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;

650
pageserver/src/pg_import.rs Normal file
View File

@@ -0,0 +1,650 @@
use std::fs::metadata;
use anyhow::{bail, ensure, Context};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use itertools::Itertools;
use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag};
use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, ControlFileData, BLCKSZ};
use tokio::{io::AsyncRead, task::{self, JoinHandle}};
use tracing::debug;
use utils::{id::{NodeId, TenantId, TimelineId}, shard::{ShardCount, ShardNumber, TenantShardId}};
use walkdir::WalkDir;
use crate::{context::{DownloadBehavior, RequestContext}, pgdatadir_mapping::{DbDirectory, RelDirectory}, task_mgr::TaskKind, tenant::storage_layer::ImageLayerWriter};
use crate::pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory};
use crate::config::PageServerConf;
use tokio::io::AsyncReadExt;
use crate::tenant::storage_layer::PersistentLayerDesc;
use utils::generation::Generation;
use utils::lsn::Lsn;
use crate::tenant::IndexPart;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{is_contiguous_range, contiguous_range_len};
use pageserver_api::keyspace::singleton_range;
use pageserver_api::reltag::SlruKind;
use pageserver_api::key::{slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, TWOPHASEDIR_KEY, CONTROLFILE_KEY, CHECKPOINT_KEY};
use utils::bin_ser::BeSer;
use std::collections::HashSet;
use std::ops::Range;
pub struct PgImportEnv {
conf: &'static PageServerConf,
tli: TimelineId,
tsi: TenantShardId,
pgdata_lsn: Lsn,
tasks: Vec<AnyImportTask>,
layers: Vec<PersistentLayerDesc>,
}
impl PgImportEnv {
pub async fn init(dstdir: &Utf8Path, tenant_id: TenantId, timeline_id: TimelineId) -> anyhow::Result<PgImportEnv> {
let config = toml_edit::Document::new();
let conf = PageServerConf::parse_and_validate(
NodeId(42),
&config,
dstdir
)?;
let conf = Box::leak(Box::new(conf));
let tsi = TenantShardId {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
};
Ok(PgImportEnv {
conf,
tli: timeline_id,
tsi,
pgdata_lsn: Lsn(0), // Will be filled in later, when the control file is imported
tasks: Vec::new(),
layers: Vec::new(),
})
}
pub async fn import_datadir(&mut self, pgdata_path: &Utf8PathBuf) -> anyhow::Result<()> {
// Read control file
let controlfile_path = pgdata_path.join("global").join("pg_control");
let controlfile_buf = std::fs::read(&controlfile_path)
.with_context(|| format!("reading controlfile: {controlfile_path}"))?;
let control_file = ControlFileData::decode(&controlfile_buf)?;
let pgdata_lsn = Lsn(control_file.checkPoint).align();
let timeline_path = self.conf.timeline_path(&self.tsi, &self.tli);
println!("Importing {pgdata_path} to {timeline_path} as lsn {pgdata_lsn}...");
self.pgdata_lsn = pgdata_lsn;
let datadir = PgDataDir::new(pgdata_path);
// Import dbdir (00:00:00 keyspace)
// This is just constructed here, but will be written to the image layer in the first call to import_db()
let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
dbdirs: datadir.dbs.iter().map(|db| ((db.spcnode, db.dboid), true)).collect(),
})?);
self.tasks.push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
// Import databases (00:spcnode:dbnode keyspace for each db)
for db in datadir.dbs {
self.import_db(&db).await?;
}
// Import SLRUs
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &pgdata_path.join("pg_xact")).await?;
// pg_multixact/members (01:01 keyspace)
self.import_slru(SlruKind::MultiXactMembers, &pgdata_path.join("pg_multixact/members")).await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(SlruKind::MultiXactOffsets, &pgdata_path.join("pg_multixact/offsets")).await?;
// Import pg_twophase.
// TODO: as empty
let twophasedir_buf = TwoPhaseDirectory::ser(
&TwoPhaseDirectory { xids: HashSet::new() }
)?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(TWOPHASEDIR_KEY, Bytes::from(twophasedir_buf))));
// Controlfile, checkpoint
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CONTROLFILE_KEY, Bytes::from(controlfile_buf))));
let checkpoint_buf = control_file.checkPointCopy.encode()?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CHECKPOINT_KEY, checkpoint_buf)));
// Assigns parts of key space to later parallel jobs
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
let mut current_chunk_size: usize = 0;
let mut parallel_jobs = Vec::new();
for task in std::mem::take(&mut self.tasks).into_iter() {
if current_chunk_size + task.total_size() > 1024*1024*1024 {
let key_range = last_end_key..task.key_range().start;
parallel_jobs.push(ChunkProcessingJob::new(
key_range.clone(),
std::mem::take(&mut current_chunk),
self
));
last_end_key = key_range.end;
current_chunk_size = 0;
}
current_chunk_size += task.total_size();
current_chunk.push(task);
}
parallel_jobs.push(ChunkProcessingJob::new(
last_end_key..Key::NON_L0_MAX,
current_chunk,
self
));
// Start all jobs simultaneosly
// TODO: semaphore?
let mut handles = vec![];
for job in parallel_jobs {
let handle: JoinHandle<anyhow::Result<PersistentLayerDesc>> = task::spawn(async move {
let layerdesc = job.run().await?;
Ok(layerdesc)
});
handles.push(handle);
}
// Wait for all jobs to complete
for handle in handles {
let layerdesc = handle.await??;
self.layers.push(layerdesc);
}
// Create index_part.json file
self.create_index_part(&control_file).await?;
Ok(())
}
async fn import_db(
&mut self,
db: &PgDataDirDb,
) -> anyhow::Result<()> {
debug!(
"Importing database (path={}, tablespace={}, dboid={})",
db.path, db.spcnode, db.dboid
);
// Import relmap (00:spcnode:dbnode:00:*:00)
let relmap_key = relmap_file_key(db.spcnode, db.dboid);
debug!("Constructing relmap entry, key {relmap_key}");
let mut relmap_file = tokio::fs::File::open(&db.path.join("pg_filenode.map")).await?;
let relmap_buf = read_all_bytes(&mut relmap_file).await?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(relmap_key, relmap_buf)));
// Import reldir (00:spcnode:dbnode:00:*:01)
let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
debug!("Constructing reldirs entry, key {reldir_key}");
let reldir_buf = RelDirectory::ser(&RelDirectory {
rels: db.files.iter().map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)).collect(),
})?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(reldir_key, Bytes::from(reldir_buf))));
// Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
// segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
for file in &db.files {
let len = metadata(&file.path)?.len() as usize;
ensure!(len % 8192 == 0);
let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
let start_key = rel_block_to_key(file.rel_tag, start_blk);
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
self.tasks.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(start_key..end_key, &file.path)));
// Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
if let Some(nblocks) = file.nblocks {
let size_key = rel_size_to_key(file.rel_tag);
//debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
let buf = nblocks.to_le_bytes();
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(size_key, Bytes::from(buf.to_vec()))));
}
}
Ok(())
}
async fn import_slru(
&mut self,
kind: SlruKind,
path: &Utf8PathBuf,
) -> anyhow::Result<()> {
let segments: Vec<(String, u32)> = WalkDir::new(path)
.max_depth(1)
.into_iter()
.filter_map(|entry| {
let entry = entry.ok()?;
let filename = entry.file_name();
let filename = filename.to_string_lossy();
let segno = u32::from_str_radix(&filename, 16).ok()?;
Some((filename.to_string(), segno))
}).collect();
// Write SlruDir
let slrudir_key = slru_dir_to_key(kind);
let segnos: HashSet<u32> = segments.iter().map(|(_path, segno)| { *segno }).collect();
let slrudir = SlruSegmentDirectory {
segments: segnos,
};
let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(slrudir_key, Bytes::from(slrudir_buf))));
for (segpath, segno) in segments {
// SlruSegBlocks for each segment
let p = path.join(Utf8PathBuf::from(segpath));
let file_size = std::fs::metadata(&p)?.len();
ensure!(file_size % 8192 == 0);
let nblocks = u32::try_from(file_size / 8192)?;
let start_key = slru_block_to_key(kind, segno, 0);
let end_key = slru_block_to_key(kind, segno, nblocks);
self.tasks.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(start_key..end_key, &p)));
// Followed by SlruSegSize
let segsize_key = slru_segment_size_to_key(kind, segno);
let segsize_buf = nblocks.to_le_bytes();
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(segsize_key, Bytes::copy_from_slice(&segsize_buf))));
}
Ok(())
}
async fn create_index_part(&mut self, control_file: &ControlFileData) -> anyhow::Result<()> {
let dstdir = &self.conf.workdir;
let pg_version = match control_file.catalog_version_no {
// thesea are from catversion.h
202107181 => 14,
202209061 => 15,
202307071 => 16,
catversion => { bail!("unrecognized catalog version {catversion}")},
};
let metadata = TimelineMetadata::new(
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
Lsn(self.pgdata_lsn.0 + 8),
Some(self.pgdata_lsn),
None, // no ancestor
Lsn(0),
self.pgdata_lsn, // latest_gc_cutoff_lsn
self.pgdata_lsn, // initdb_lsn
pg_version,
);
let generation = Generation::none();
let mut index_part = IndexPart::empty(metadata);
for l in self.layers.iter() {
let name = l.layer_name();
let metadata = LayerFileMetadata::new(l.file_size, generation, ShardIndex::unsharded());
if let Some(_) = index_part.layer_metadata.insert(name.clone(), metadata) {
bail!("duplicate layer filename {name}");
}
}
let data = index_part.to_s3_bytes()?;
let path = remote_timeline_client::remote_index_path(&self.tsi, &self.tli, generation);
let path = dstdir.join(path.get_path());
std::fs::write(&path, data)
.context("could not write {path}")?;
Ok(())
}
}
//
// dbdir iteration tools
//
struct PgDataDir {
pub dbs: Vec<PgDataDirDb> // spcnode, dboid, path
}
struct PgDataDirDb {
pub spcnode: u32,
pub dboid: u32,
pub path: Utf8PathBuf,
pub files: Vec<PgDataDirDbFile>
}
struct PgDataDirDbFile {
pub path: Utf8PathBuf,
pub rel_tag: RelTag,
pub segno: u32,
// Cummulative size of the given fork, set only for the last segment of that fork
pub nblocks: Option<usize>,
}
impl PgDataDir {
fn new(datadir_path: &Utf8PathBuf) -> Self {
// Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
// Traverse database in increasing oid order
let mut databases = WalkDir::new(datadir_path.join("base"))
.max_depth(1)
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
path.file_name().to_string_lossy().parse::<u32>().ok()
})
})
.sorted()
.map(|dboid| {
PgDataDirDb::new(
datadir_path.join("base").join(dboid.to_string()),
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
datadir_path
)
})
.collect::<Vec<_>>();
// special case for global catalogs
databases.push(PgDataDirDb::new(
datadir_path.join("global"),
postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
0,
datadir_path,
));
databases.sort_by_key(|db| (db.spcnode, db.dboid));
Self {
dbs: databases
}
}
}
impl PgDataDirDb {
fn new(db_path: Utf8PathBuf, spcnode: u32, dboid: u32, datadir_path: &Utf8PathBuf) -> Self {
let mut files: Vec<PgDataDirDbFile> = WalkDir::new(&db_path)
.min_depth(1)
.max_depth(2)
.into_iter()
.filter_map(|entry| {
entry.ok().and_then(|path| {
let relfile = path.file_name().to_string_lossy();
// returns (relnode, forknum, segno)
parse_relfilename(&relfile).ok()
})
})
.sorted()
.map(|(relnode, forknum, segno)| {
let rel_tag = RelTag {
spcnode,
dbnode: dboid,
relnode,
forknum,
};
let path = datadir_path.join(rel_tag.to_segfile_name(segno));
let len = metadata(&path).unwrap().len() as usize;
assert!(len % BLCKSZ as usize == 0);
let nblocks = len / BLCKSZ as usize;
PgDataDirDbFile {
path,
rel_tag,
segno,
nblocks: Some(nblocks), // first non-cummulative sizes
}
})
.collect();
// Set cummulative sizes. Do all of that math here, so that later we could easier
// parallelize over segments and know with which segments we need to write relsize
// entry.
let mut cumulative_nblocks: usize= 0;
let mut prev_rel_tag: Option<RelTag> = None;
for i in 0..files.len() {
if prev_rel_tag == Some(files[i].rel_tag) {
cumulative_nblocks += files[i].nblocks.unwrap();
} else {
cumulative_nblocks = files[i].nblocks.unwrap();
}
files[i].nblocks = if i == files.len() - 1 || files[i+1].rel_tag != files[i].rel_tag {
Some(cumulative_nblocks)
} else {
None
};
prev_rel_tag = Some(files[i].rel_tag);
}
PgDataDirDb {
files,
path: db_path,
spcnode,
dboid,
}
}
}
async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> anyhow::Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
}
trait ImportTask {
fn key_range(&self) -> Range<Key>;
fn total_size(&self) -> usize {
if is_contiguous_range(&self.key_range()) {
contiguous_range_len(&self.key_range()) as usize * 8192
} else {
u32::MAX as usize
}
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()>;
}
struct ImportSingleKeyTask {
key: Key,
buf: Bytes,
}
impl ImportSingleKeyTask {
fn new(key: Key, buf: Bytes) -> Self {
ImportSingleKeyTask { key, buf }
}
}
impl ImportTask for ImportSingleKeyTask {
fn key_range(&self) -> Range<Key> {
singleton_range(self.key)
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
layer_writer.put_image(self.key, self.buf, ctx).await?;
Ok(())
}
}
struct ImportRelBlocksTask {
key_range: Range<Key>,
path: Utf8PathBuf,
}
impl ImportRelBlocksTask {
fn new(key_range: Range<Key>, path: &Utf8Path) -> Self {
ImportRelBlocksTask {
key_range,
path: path.into()
}
}
}
impl ImportTask for ImportRelBlocksTask {
fn key_range(&self) -> Range<Key> {
self.key_range.clone()
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
debug!("Importing relation file {}", self.path);
let mut reader = tokio::fs::File::open(&self.path).await?;
let mut buf: [u8; 8192] = [0u8; 8192];
let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
let (_rel_tag, end_blk) = self.key_range.end.to_rel_block()?;
let mut blknum = start_blk;
while blknum < end_blk {
reader.read_exact(&mut buf).await?;
let key = rel_block_to_key(rel_tag.clone(), blknum);
layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?;
blknum += 1;
}
Ok(())
}
}
struct ImportSlruBlocksTask {
key_range: Range<Key>,
path: Utf8PathBuf,
}
impl ImportSlruBlocksTask {
fn new(key_range: Range<Key>, path: &Utf8Path) -> Self {
ImportSlruBlocksTask {
key_range,
path: path.into()
}
}
}
impl ImportTask for ImportSlruBlocksTask {
fn key_range(&self) -> Range<Key> {
self.key_range.clone()
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
debug!("Importing SLRU segment file {}", self.path);
let mut reader = tokio::fs::File::open(&self.path).await
.context(format!("opening {}", &self.path))?;
let mut buf: [u8; 8192] = [0u8; 8192];
let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
let mut blknum = start_blk;
while blknum < end_blk {
reader.read_exact(&mut buf).await?;
let key = slru_block_to_key(kind, segno, blknum);
layer_writer.put_image(key, Bytes::copy_from_slice(&buf), ctx).await?;
blknum += 1;
}
Ok(())
}
}
enum AnyImportTask {
SingleKey(ImportSingleKeyTask),
RelBlocks(ImportRelBlocksTask),
SlruBlocks(ImportSlruBlocksTask),
}
impl ImportTask for AnyImportTask {
fn key_range(&self) -> Range<Key> {
match self {
Self::SingleKey(t) => t.key_range(),
Self::RelBlocks(t) => t.key_range(),
Self::SlruBlocks(t) => t.key_range()
}
}
async fn doit(self, layer_writer: &mut ImageLayerWriter, ctx: &RequestContext) -> anyhow::Result<()> {
match self {
Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
}
}
}
impl From<ImportSingleKeyTask> for AnyImportTask {
fn from(t: ImportSingleKeyTask) -> Self {
Self::SingleKey(t)
}
}
impl From<ImportRelBlocksTask> for AnyImportTask {
fn from(t: ImportRelBlocksTask) -> Self {
Self::RelBlocks(t)
}
}
impl From<ImportSlruBlocksTask> for AnyImportTask {
fn from(t: ImportSlruBlocksTask) -> Self {
Self::SlruBlocks(t)
}
}
struct ChunkProcessingJob {
range: Range<Key>,
tasks: Vec<AnyImportTask>,
dstdir: Utf8PathBuf,
tenant_id: TenantId,
timeline_id: TimelineId,
pgdata_lsn: Lsn,
}
impl ChunkProcessingJob {
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &PgImportEnv) -> Self {
assert!(env.pgdata_lsn.is_valid());
Self {
range,
tasks,
dstdir: env.conf.workdir.clone(),
tenant_id: env.tsi.tenant_id,
timeline_id: env.tli,
pgdata_lsn: env.pgdata_lsn,
}
}
async fn run(self) -> anyhow::Result<PersistentLayerDesc> {
let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let config = toml_edit::Document::new();
let conf: &'static PageServerConf = Box::leak(Box::new(PageServerConf::parse_and_validate(
NodeId(42),
&config,
&self.dstdir
)?));
let tsi = TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
};
let mut layer = ImageLayerWriter::new(
&conf,
self.timeline_id,
tsi,
&self.range,
self.pgdata_lsn,
&ctx,
).await?;
for task in self.tasks {
task.doit(&mut layer, &ctx).await?;
}
let layerdesc = layer.finish_raw(&ctx).await?;
Ok(layerdesc)
}
}

View File

@@ -1982,23 +1982,23 @@ impl<'a> Version<'a> {
//--- Metadata structs stored in key-value pairs in the repository.
#[derive(Debug, Serialize, Deserialize)]
struct DbDirectory {
pub struct DbDirectory {
// (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
dbdirs: HashMap<(Oid, Oid), bool>,
pub dbdirs: HashMap<(Oid, Oid), bool>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TwoPhaseDirectory {
xids: HashSet<TransactionId>,
pub(crate) struct TwoPhaseDirectory {
pub(crate) xids: HashSet<TransactionId>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct RelDirectory {
pub struct RelDirectory {
// Set of relations that exist. (relfilenode, forknum)
//
// TODO: Store it as a btree or radix tree or something else that spans multiple
// key-value pairs, if you have a lot of relations
rels: HashSet<(Oid, u8)>,
pub rels: HashSet<(Oid, u8)>,
}
#[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
@@ -2022,9 +2022,9 @@ struct RelSizeEntry {
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct SlruSegmentDirectory {
pub(crate) struct SlruSegmentDirectory {
// Set of SLRU segments that exist.
segments: HashSet<u32>,
pub(crate) segments: HashSet<u32>,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]

View File

@@ -796,12 +796,11 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
async fn finish(
async fn finish_layer(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<PersistentLayerDesc> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -877,8 +876,22 @@ impl ImageLayerWriterInner {
// fsync the file
file.sync_all().await?;
Ok(desc)
}
async fn finish(
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<ResidentLayer> {
let path = self.path.clone();
let conf = self.conf;
let desc = self.finish_layer(ctx, end_key).await?;
// FIXME: why not carry the virtualfile here, it supports renaming?
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
let layer = Layer::finish_creating(conf, timeline, desc, &path)?;
info!("created image layer {}", layer.local_path());
@@ -967,6 +980,32 @@ impl ImageLayerWriter {
self.inner.take().unwrap().finish(timeline, ctx, None).await
}
/// Like finish(), but doesn't create the ResidentLayer struct. This can be used
/// by utilities that don't have a full-blown Timeline.
pub(crate) async fn finish_raw(
mut self,
ctx: &RequestContext,
) -> anyhow::Result<super::PersistentLayerDesc> {
let inner = self.inner.take().unwrap();
let name = ImageLayerName {
key_range: inner.key_range.clone(),
lsn: inner.lsn,
};
let temp_path = inner.path.clone();
let final_path = inner.conf.timeline_path(&inner.tenant_shard_id, &inner.timeline_id)
.join(name.to_string());
let desc = inner.finish_layer(ctx, None).await?;
// Rename the file to final name like Layer::finish_creating() does
utils::fs_ext::rename_noreplace(temp_path.as_std_path(), final_path.as_std_path())
.with_context(|| format!("rename temporary file as {final_path}"))?;
Ok(desc)
}
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,

View File

@@ -0,0 +1,81 @@
import os
from pathlib import Path
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AbstractNeonCli,
NeonEnvBuilder,
PgBin,
VanillaPostgres,
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import query_scalar, subprocess_capture
num_rows = 1000
class ImportCli(AbstractNeonCli):
"""
A typed wrapper around the `import` utility CLI tool.
"""
COMMAND = "import"
def run_import(self, pgdatadir: Path, dest_dir: Path, tenant_id: TenantId, timeline_id: TimelineId):
res = self.raw_cli(["--tenant-id", str(tenant_id), "--timeline-id", str(timeline_id), str(pgdatadir), str(dest_dir)])
res.check_returncode()
def test_pg_import(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
# Put data in vanilla pg
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql(
"""create table t as select 'long string to consume some space' || g
from generate_series(1,300000) g"""
)
assert vanilla_pg.safe_psql("select count(*) from t") == [(300000,)]
vanilla_pg.stop()
# We have a Postgres data directory to import now
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
# Set up pageserver for import
# Run pg_import utility, pointing directly to a directory in the remote storage dir
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
dst_path = env.pageserver_remote_storage.root
tline_path = env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id)
tline_path.mkdir(parents=True)
cli = ImportCli(env)
cli.run_import(vanilla_pg.pgdatadir, dst_path, tenant_id=tenant_id, timeline_id=timeline_id)
# TODO: tell pageserver / storage controller that the tenant/timeline now exists
env.pageserver.tenant_attach(
tenant_id,
generation=100,
override_storage_controller_generation=True,
)
env.neon_cli.map_branch("imported", tenant_id, timeline_id)
endpoint = env.endpoints.create_start(branch_name="imported", tenant_id=tenant_id)
conn = endpoint.connect()
cur = conn.cursor()
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
# test writing after the import
endpoint.safe_psql("insert into t select g from generate_series(1, 1000) g")
assert endpoint.safe_psql("select count(*) from t") == [(301000,)]
endpoint.stop()
endpoint.start()
assert endpoint.safe_psql("select count(*) from t") == [(301000,)]