mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-19 10:30:37 +00:00
Compare commits
33 Commits
luist18/po
...
vlad/for_g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c1de868dc | ||
|
|
156b798d1d | ||
|
|
b6c01925b7 | ||
|
|
b79e5f52ab | ||
|
|
80fa55bc16 | ||
|
|
6cd4b9cb70 | ||
|
|
8cd8164926 | ||
|
|
99a4588c02 | ||
|
|
0aa4fea432 | ||
|
|
fd09d3164c | ||
|
|
7754a28cf5 | ||
|
|
6ace425d3d | ||
|
|
cf8654b3e5 | ||
|
|
0e405444b2 | ||
|
|
0dda2ad7ec | ||
|
|
047b986f7f | ||
|
|
ed189d733f | ||
|
|
14318afcc0 | ||
|
|
ebc4735bf4 | ||
|
|
4c2ee6a011 | ||
|
|
c09d817c98 | ||
|
|
a80dcfa544 | ||
|
|
337ad52c37 | ||
|
|
fd0acb6195 | ||
|
|
a7f8b9f6b5 | ||
|
|
1e9707f7ee | ||
|
|
6d297857a7 | ||
|
|
6e5a0add43 | ||
|
|
e291fb7edc | ||
|
|
ebe26e218b | ||
|
|
2f0a127e0c | ||
|
|
131ab74be8 | ||
|
|
3a66eebf4e |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -6774,7 +6774,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#33e00106a268644d02ba0461bbd64476073b0ee1"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"nix 0.26.4",
|
||||
@@ -7369,7 +7369,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#33e00106a268644d02ba0461bbd64476073b0ee1"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"io-uring",
|
||||
|
||||
@@ -31,6 +31,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Parser;
|
||||
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
|
||||
use nix::unistd::Pid;
|
||||
use tokio::io::BufReader;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use utils::fs_ext::is_directory_empty;
|
||||
|
||||
@@ -52,6 +53,8 @@ struct Args {
|
||||
s3_prefix: Option<s3_uri::S3Uri>,
|
||||
#[clap(long)]
|
||||
source_connection_string: Option<String>,
|
||||
#[clap(long)]
|
||||
restore_connection_string: Option<String>, // will not run postgres if specified, will do pg_restore to this connection string
|
||||
#[clap(short, long)]
|
||||
interactive: bool,
|
||||
#[clap(long)]
|
||||
@@ -68,6 +71,8 @@ struct Spec {
|
||||
encryption_secret: EncryptionSecret,
|
||||
#[serde_as(as = "serde_with::base64::Base64")]
|
||||
source_connstring_ciphertext_base64: Vec<u8>,
|
||||
#[serde_as(as = "Option<serde_with::base64::Base64>")]
|
||||
restore_connstring_ciphertext_base64: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
@@ -83,6 +88,189 @@ const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
|
||||
"C.UTF-8"
|
||||
};
|
||||
|
||||
async fn decode_connstring(
|
||||
kms_client: &aws_sdk_kms::Client,
|
||||
key_id: &String,
|
||||
connstring_ciphertext_base64: Vec<u8>,
|
||||
) -> Result<String, anyhow::Error> {
|
||||
let mut output = kms_client
|
||||
.decrypt()
|
||||
.key_id(key_id)
|
||||
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
|
||||
connstring_ciphertext_base64,
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.context("decrypt connection string")?;
|
||||
|
||||
let plaintext = output
|
||||
.plaintext
|
||||
.take()
|
||||
.context("get plaintext connection string")?;
|
||||
|
||||
String::from_utf8(plaintext.into_inner()).context("parse connection string as utf8")
|
||||
}
|
||||
|
||||
struct PostgresProcess {
|
||||
pgdata_dir: Utf8PathBuf,
|
||||
pg_bin_dir: Utf8PathBuf,
|
||||
pgbin: Utf8PathBuf,
|
||||
pg_lib_dir: Utf8PathBuf,
|
||||
postgres_proc: Option<tokio::process::Child>,
|
||||
}
|
||||
|
||||
impl PostgresProcess {
|
||||
fn new(pgdata_dir: Utf8PathBuf, pg_bin_dir: Utf8PathBuf, pg_lib_dir: Utf8PathBuf) -> Self {
|
||||
Self {
|
||||
pgdata_dir,
|
||||
pgbin: pg_bin_dir.join("postgres"),
|
||||
pg_bin_dir,
|
||||
pg_lib_dir,
|
||||
postgres_proc: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn prepare(&self, initdb_user: &str) -> Result<(), anyhow::Error> {
|
||||
tokio::fs::create_dir(&self.pgdata_dir)
|
||||
.await
|
||||
.context("create pgdata directory")?;
|
||||
|
||||
let pg_version = match get_pg_version(self.pgbin.as_ref()) {
|
||||
PostgresMajorVersion::V14 => 14,
|
||||
PostgresMajorVersion::V15 => 15,
|
||||
PostgresMajorVersion::V16 => 16,
|
||||
PostgresMajorVersion::V17 => 17,
|
||||
};
|
||||
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser: initdb_user,
|
||||
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
|
||||
pg_version,
|
||||
initdb_bin: self.pg_bin_dir.join("initdb").as_ref(),
|
||||
library_search_path: &self.pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
|
||||
pgdata: &self.pgdata_dir,
|
||||
})
|
||||
.await
|
||||
.context("initdb")
|
||||
}
|
||||
|
||||
async fn start(
|
||||
&mut self,
|
||||
initdb_user: &str,
|
||||
port: u16,
|
||||
nproc: usize,
|
||||
) -> Result<&tokio::process::Child, anyhow::Error> {
|
||||
self.prepare(initdb_user).await?;
|
||||
|
||||
//
|
||||
// Launch postgres process
|
||||
//
|
||||
let mut proc = tokio::process::Command::new(&self.pgbin)
|
||||
.arg("-D")
|
||||
.arg(&self.pgdata_dir)
|
||||
.args(["-p", &format!("{port}")])
|
||||
.args(["-c", "wal_level=minimal"])
|
||||
.args(["-c", "shared_buffers=10GB"])
|
||||
.args(["-c", "max_wal_senders=0"])
|
||||
.args(["-c", "fsync=off"])
|
||||
.args(["-c", "full_page_writes=off"])
|
||||
.args(["-c", "synchronous_commit=off"])
|
||||
.args(["-c", "maintenance_work_mem=8388608"])
|
||||
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
|
||||
.args(["-c", &format!("max_worker_processes={nproc}")])
|
||||
.args(["-c", "effective_io_concurrency=100"])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &self.pg_lib_dir)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.context("spawn postgres")?;
|
||||
|
||||
info!("spawned postgres, waiting for it to become ready");
|
||||
tokio::spawn(
|
||||
child_stdio_to_log::relay_process_output(proc.stdout.take(), proc.stderr.take())
|
||||
.instrument(info_span!("postgres")),
|
||||
);
|
||||
|
||||
self.postgres_proc = Some(proc);
|
||||
Ok(self.postgres_proc.as_ref().unwrap())
|
||||
}
|
||||
|
||||
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
|
||||
let proc: &mut tokio::process::Child = self.postgres_proc.as_mut().unwrap();
|
||||
info!("shutdown postgres");
|
||||
{
|
||||
nix::sys::signal::kill(
|
||||
Pid::from_raw(i32::try_from(proc.id().unwrap()).expect("convert child pid to i32")),
|
||||
nix::sys::signal::SIGTERM,
|
||||
)
|
||||
.context("signal postgres to shut down")?;
|
||||
proc.wait()
|
||||
.await
|
||||
.context("wait for postgres to shut down")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_until_ready(connstring: String, create_dbname: String) {
|
||||
// Create neondb database in the running postgres
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
if start_time.elapsed() > PG_WAIT_TIMEOUT {
|
||||
error!(
|
||||
"timeout exceeded: failed to poll postgres and create database within 10 minutes"
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
match tokio_postgres::connect(
|
||||
&connstring.replace("dbname=neondb", "dbname=postgres"),
|
||||
tokio_postgres::NoTls,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((client, connection)) => {
|
||||
// Spawn the connection handling task to maintain the connection
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
warn!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
match client
|
||||
.simple_query(format!("CREATE DATABASE {create_dbname};").as_str())
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
info!("created {} database", create_dbname);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed to create database: {}, retying in {}s",
|
||||
e,
|
||||
PG_WAIT_RETRY_INTERVAL.as_secs_f32()
|
||||
);
|
||||
tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
info!(
|
||||
"postgres not ready yet, retrying in {}s",
|
||||
PG_WAIT_RETRY_INTERVAL.as_secs_f32()
|
||||
);
|
||||
tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
utils::logging::init(
|
||||
@@ -106,10 +294,6 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
let working_directory = args.working_directory;
|
||||
let pg_bin_dir = args.pg_bin_dir;
|
||||
let pg_lib_dir = args.pg_lib_dir;
|
||||
let pg_port = args.pg_port.unwrap_or_else(|| {
|
||||
info!("pg_port not specified, using default 5432");
|
||||
5432
|
||||
});
|
||||
|
||||
// Initialize AWS clients only if s3_prefix is specified
|
||||
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
|
||||
@@ -120,8 +304,18 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
// Get source connection string either from S3 spec or direct argument
|
||||
let source_connection_string = if let Some(s3_prefix) = &args.s3_prefix {
|
||||
let superuser = "cloud_admin";
|
||||
let pg_port = || {
|
||||
args.pg_port.unwrap_or_else(|| {
|
||||
info!("pg_port not specified, using default 5432");
|
||||
5432
|
||||
})
|
||||
};
|
||||
|
||||
let mut run_postgres = true;
|
||||
|
||||
// Get connection strings either from S3 spec or direct arguments
|
||||
let (source_connstring, restore_connstring) = if let Some(s3_prefix) = &args.s3_prefix {
|
||||
let spec: Spec = {
|
||||
let spec_key = s3_prefix.append("/spec.json");
|
||||
let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
|
||||
@@ -141,28 +335,47 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
|
||||
match spec.encryption_secret {
|
||||
EncryptionSecret::KMS { key_id } => {
|
||||
let mut output = kms_client
|
||||
.unwrap()
|
||||
.decrypt()
|
||||
.key_id(key_id)
|
||||
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
|
||||
spec.source_connstring_ciphertext_base64,
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.context("decrypt source connection string")?;
|
||||
let plaintext = output
|
||||
.plaintext
|
||||
.take()
|
||||
.context("get plaintext source connection string")?;
|
||||
String::from_utf8(plaintext.into_inner())
|
||||
.context("parse source connection string as utf8")?
|
||||
let source = decode_connstring(
|
||||
kms_client.as_ref().unwrap(),
|
||||
&key_id,
|
||||
spec.source_connstring_ciphertext_base64,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let restore =
|
||||
if let Some(restore_ciphertext) = spec.restore_connstring_ciphertext_base64 {
|
||||
run_postgres = false;
|
||||
decode_connstring(kms_client.as_ref().unwrap(), &key_id, restore_ciphertext)
|
||||
.await?
|
||||
} else {
|
||||
// restoring to local postgres otherwise
|
||||
format!(
|
||||
"host=localhost port={} user={} dbname=neondb",
|
||||
pg_port(),
|
||||
superuser
|
||||
)
|
||||
};
|
||||
|
||||
(source, restore)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
args.source_connection_string.unwrap()
|
||||
(
|
||||
args.source_connection_string.unwrap(),
|
||||
if let Some(val) = args.restore_connection_string {
|
||||
run_postgres = false;
|
||||
val
|
||||
} else {
|
||||
format!(
|
||||
"host=localhost port={} user={} dbname=neondb",
|
||||
pg_port(),
|
||||
superuser
|
||||
)
|
||||
},
|
||||
)
|
||||
};
|
||||
|
||||
// unused if run_postgres is false, but needed for shutdown
|
||||
match tokio::fs::create_dir(&working_directory).await {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
@@ -177,127 +390,21 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
|
||||
}
|
||||
|
||||
let pgdata_dir = working_directory.join("pgdata");
|
||||
tokio::fs::create_dir(&pgdata_dir)
|
||||
.await
|
||||
.context("create pgdata directory")?;
|
||||
|
||||
let pgbin = pg_bin_dir.join("postgres");
|
||||
let pg_version = match get_pg_version(pgbin.as_ref()) {
|
||||
PostgresMajorVersion::V14 => 14,
|
||||
PostgresMajorVersion::V15 => 15,
|
||||
PostgresMajorVersion::V16 => 16,
|
||||
PostgresMajorVersion::V17 => 17,
|
||||
let postgres_proc = if run_postgres {
|
||||
assert!(restore_connstring.contains("host=localhost"));
|
||||
let mut proc =
|
||||
PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());
|
||||
let nproc = num_cpus::get();
|
||||
proc.start(superuser, pg_port(), nproc).await?;
|
||||
wait_until_ready(restore_connstring.clone(), "neondb".to_string()).await;
|
||||
|
||||
Some(proc)
|
||||
} else {
|
||||
info!("restore_connection_string specified, not running postgres process");
|
||||
None
|
||||
};
|
||||
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
|
||||
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser,
|
||||
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
|
||||
pg_version,
|
||||
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
|
||||
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
|
||||
pgdata: &pgdata_dir,
|
||||
})
|
||||
.await
|
||||
.context("initdb")?;
|
||||
|
||||
let nproc = num_cpus::get();
|
||||
|
||||
//
|
||||
// Launch postgres process
|
||||
//
|
||||
let mut postgres_proc = tokio::process::Command::new(pgbin)
|
||||
.arg("-D")
|
||||
.arg(&pgdata_dir)
|
||||
.args(["-p", &format!("{pg_port}")])
|
||||
.args(["-c", "wal_level=minimal"])
|
||||
.args(["-c", "shared_buffers=10GB"])
|
||||
.args(["-c", "max_wal_senders=0"])
|
||||
.args(["-c", "fsync=off"])
|
||||
.args(["-c", "full_page_writes=off"])
|
||||
.args(["-c", "synchronous_commit=off"])
|
||||
.args(["-c", "maintenance_work_mem=8388608"])
|
||||
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers={nproc}")])
|
||||
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
|
||||
.args(["-c", &format!("max_worker_processes={nproc}")])
|
||||
.args([
|
||||
"-c",
|
||||
&format!(
|
||||
"effective_io_concurrency={}",
|
||||
if cfg!(target_os = "macos") { 0 } else { 100 }
|
||||
),
|
||||
])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.context("spawn postgres")?;
|
||||
|
||||
info!("spawned postgres, waiting for it to become ready");
|
||||
tokio::spawn(
|
||||
child_stdio_to_log::relay_process_output(
|
||||
postgres_proc.stdout.take(),
|
||||
postgres_proc.stderr.take(),
|
||||
)
|
||||
.instrument(info_span!("postgres")),
|
||||
);
|
||||
|
||||
// Create neondb database in the running postgres
|
||||
let restore_pg_connstring =
|
||||
format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
|
||||
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
if start_time.elapsed() > PG_WAIT_TIMEOUT {
|
||||
error!(
|
||||
"timeout exceeded: failed to poll postgres and create database within 10 minutes"
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
|
||||
Ok((client, connection)) => {
|
||||
// Spawn the connection handling task to maintain the connection
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
warn!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
match client.simple_query("CREATE DATABASE neondb;").await {
|
||||
Ok(_) => {
|
||||
info!("created neondb database");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed to create database: {}, retying in {}s",
|
||||
e,
|
||||
PG_WAIT_RETRY_INTERVAL.as_secs_f32()
|
||||
);
|
||||
tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
info!(
|
||||
"postgres not ready yet, retrying in {}s",
|
||||
PG_WAIT_RETRY_INTERVAL.as_secs_f32()
|
||||
);
|
||||
tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let restore_pg_connstring = restore_pg_connstring.replace("dbname=postgres", "dbname=neondb");
|
||||
|
||||
let dumpdir = working_directory.join("dumpdir");
|
||||
|
||||
let common_args = [
|
||||
// schema mapping (prob suffices to specify them on one side)
|
||||
@@ -309,64 +416,66 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
"--no-tablespaces".to_string(),
|
||||
// format
|
||||
"--format".to_string(),
|
||||
"directory".to_string(),
|
||||
"custom".to_string(),
|
||||
// concurrency
|
||||
"--jobs".to_string(),
|
||||
num_cpus::get().to_string(),
|
||||
// "--jobs".to_string(),
|
||||
// num_cpus::get().to_string(),
|
||||
// progress updates
|
||||
"--verbose".to_string(),
|
||||
];
|
||||
|
||||
info!("dump into the working directory");
|
||||
{
|
||||
let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
|
||||
.args(&common_args)
|
||||
.arg("-f")
|
||||
.arg(&dumpdir)
|
||||
.arg("--no-sync")
|
||||
// POSITIONAL args
|
||||
// source db (db name included in connection string)
|
||||
.arg(&source_connection_string)
|
||||
// how we run it
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.context("spawn pg_dump")?;
|
||||
let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
|
||||
.args(&common_args)
|
||||
.arg("--no-sync")
|
||||
// POSITIONAL args
|
||||
// source db (db name included in connection string)
|
||||
.arg(&source_connstring)
|
||||
// how we run it
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.context("spawn pg_dump")?;
|
||||
|
||||
const BUF_SIZE: usize = 64 * 1024 * 1024;
|
||||
let mut buf = BufReader::with_capacity(BUF_SIZE, pg_dump.stdout.take().unwrap());
|
||||
|
||||
tokio::spawn(async move {
|
||||
info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump");
|
||||
|
||||
tokio::spawn(
|
||||
child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take())
|
||||
.instrument(info_span!("pg_dump")),
|
||||
);
|
||||
|
||||
let st = pg_dump.wait().await.context("wait for pg_dump")?;
|
||||
let st = pg_dump.wait().await.expect("wait for pg_dump");
|
||||
info!(status=?st, "pg_dump exited");
|
||||
if !st.success() {
|
||||
warn!(status=%st, "pg_dump failed, restore will likely fail as well");
|
||||
}
|
||||
}
|
||||
|
||||
tokio::spawn(
|
||||
child_stdio_to_log::relay_process_output(
|
||||
None,
|
||||
pg_dump.stderr.take(),
|
||||
)
|
||||
.instrument(info_span!("pg_dump")),
|
||||
);
|
||||
});
|
||||
|
||||
// TODO: do it in a streaming way, plenty of internal research done on this already
|
||||
// TODO: do the unlogged table trick
|
||||
|
||||
info!("restore from working directory into vanilla postgres");
|
||||
{
|
||||
let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
|
||||
.args(&common_args)
|
||||
.arg("-d")
|
||||
.arg(&restore_pg_connstring)
|
||||
// POSITIONAL args
|
||||
.arg(&dumpdir)
|
||||
.arg(&restore_connstring)
|
||||
.arg("--clean")
|
||||
.arg("--if-exists")
|
||||
// how we run it
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir)
|
||||
.kill_on_drop(true)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.context("spawn pg_restore")?;
|
||||
|
||||
@@ -378,51 +487,48 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
)
|
||||
.instrument(info_span!("pg_restore")),
|
||||
);
|
||||
|
||||
let mut restore_stdin = pg_restore.stdin.take().unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::io::copy_buf(&mut buf, &mut restore_stdin).await.expect("pg_restore failed to read from pg_dump");
|
||||
});
|
||||
|
||||
let st = pg_restore.wait().await.context("wait for pg_restore")?;
|
||||
info!(status=?st, "pg_restore exited");
|
||||
if !st.success() {
|
||||
warn!(status=%st, "pg_restore failed, restore will likely fail as well");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// If interactive mode, wait for Ctrl+C
|
||||
if args.interactive {
|
||||
info!("Running in interactive mode. Press Ctrl+C to shut down.");
|
||||
tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
|
||||
}
|
||||
if let Some(mut proc) = postgres_proc {
|
||||
// If interactive mode, wait for Ctrl+C
|
||||
if args.interactive {
|
||||
info!("Running in interactive mode. Press Ctrl+C to shut down.");
|
||||
tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
|
||||
}
|
||||
|
||||
info!("shutdown postgres");
|
||||
{
|
||||
nix::sys::signal::kill(
|
||||
Pid::from_raw(
|
||||
i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
|
||||
),
|
||||
nix::sys::signal::SIGTERM,
|
||||
)
|
||||
.context("signal postgres to shut down")?;
|
||||
postgres_proc
|
||||
.wait()
|
||||
.await
|
||||
.context("wait for postgres to shut down")?;
|
||||
}
|
||||
proc.shutdown().await?;
|
||||
|
||||
// Only sync if s3_prefix was specified
|
||||
if let Some(s3_prefix) = args.s3_prefix {
|
||||
info!("upload pgdata");
|
||||
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
|
||||
.await
|
||||
.context("sync dump directory to destination")?;
|
||||
|
||||
info!("write status");
|
||||
{
|
||||
let status_dir = working_directory.join("status");
|
||||
std::fs::create_dir(&status_dir).context("create status directory")?;
|
||||
let status_file = status_dir.join("pgdata");
|
||||
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
|
||||
.context("write status file")?;
|
||||
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
|
||||
// Only sync if s3_prefix was specified
|
||||
if let Some(s3_prefix) = args.s3_prefix {
|
||||
info!("upload pgdata");
|
||||
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
|
||||
.await
|
||||
.context("sync status directory to destination")?;
|
||||
.context("sync dump directory to destination")?;
|
||||
|
||||
info!("write status");
|
||||
{
|
||||
let status_dir = working_directory.join("status");
|
||||
std::fs::create_dir(&status_dir).context("create status directory")?;
|
||||
let status_file = status_dir.join("pgdata");
|
||||
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
|
||||
.context("write status file")?;
|
||||
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
|
||||
.await
|
||||
.context("sync status directory to destination")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
25
poetry.lock
generated
25
poetry.lock
generated
@@ -395,6 +395,7 @@ files = [
|
||||
|
||||
[package.dependencies]
|
||||
botocore-stubs = "*"
|
||||
mypy-boto3-kms = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"kms\""}
|
||||
mypy-boto3-s3 = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"s3\""}
|
||||
types-s3transfer = "*"
|
||||
typing-extensions = ">=4.1.0"
|
||||
@@ -1948,6 +1949,17 @@ install-types = ["pip"]
|
||||
mypyc = ["setuptools (>=50)"]
|
||||
reports = ["lxml"]
|
||||
|
||||
[[package]]
|
||||
name = "mypy-boto3-kms"
|
||||
version = "1.26.147"
|
||||
description = "Type annotations for boto3.KMS 1.26.147 service generated with mypy-boto3-builder 7.14.5"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "mypy-boto3-kms-1.26.147.tar.gz", hash = "sha256:816a4d1bb0585e1b9620a3f96c1d69a06f53b7b5621858579dd77c60dbb5fa5c"},
|
||||
{file = "mypy_boto3_kms-1.26.147-py3-none-any.whl", hash = "sha256:493f0db674a25c88769f5cb8ab8ac00d3dda5dfc903d5cda34c990ee64689f79"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mypy-boto3-s3"
|
||||
version = "1.26.0.post1"
|
||||
@@ -2286,6 +2298,7 @@ files = [
|
||||
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:bb89f0a835bcfc1d42ccd5f41f04870c1b936d8507c6df12b7737febc40f0909"},
|
||||
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f0c2d907a1e102526dd2986df638343388b94c33860ff3bbe1384130828714b1"},
|
||||
{file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f8157bed2f51db683f31306aa497311b560f2265998122abe1dce6428bd86567"},
|
||||
{file = "psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:27422aa5f11fbcd9b18da48373eb67081243662f9b46e6fd07c3eb46e4535142"},
|
||||
{file = "psycopg2_binary-2.9.10-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:eb09aa7f9cecb45027683bb55aebaaf45a0df8bf6de68801a6afdc7947bb09d4"},
|
||||
{file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b73d6d7f0ccdad7bc43e6d34273f70d587ef62f824d7261c4ae9b8b1b6af90e8"},
|
||||
{file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce5ab4bf46a211a8e924d307c1b1fcda82368586a19d0a24f8ae166f5c784864"},
|
||||
@@ -3386,6 +3399,16 @@ files = [
|
||||
{file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"},
|
||||
{file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"},
|
||||
{file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"},
|
||||
{file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"},
|
||||
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"},
|
||||
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"},
|
||||
{file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"},
|
||||
@@ -3655,4 +3678,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "e6904aca09abc6c805604b21a5702a97e0056406f9ec7469b091d35ee10a6b16"
|
||||
content-hash = "b25702660d965fa9f21db0b20fa07b545d199d8ccb303c67eece77b338b08494"
|
||||
|
||||
@@ -17,7 +17,7 @@ Jinja2 = "^3.1.5"
|
||||
types-requests = "^2.31.0.0"
|
||||
types-psycopg2 = "^2.9.21.20241019"
|
||||
boto3 = "^1.34.11"
|
||||
boto3-stubs = {extras = ["s3"], version = "^1.26.16"}
|
||||
boto3-stubs = {extras = ["s3", "kms"], version = "^1.26.16"}
|
||||
moto = {extras = ["server"], version = "^5.0.6"}
|
||||
backoff = "^2.2.1"
|
||||
pytest-lazy-fixture = "^0.6.3"
|
||||
|
||||
@@ -50,8 +50,9 @@ class FastImport(AbstractNeonCli):
|
||||
|
||||
def run(
|
||||
self,
|
||||
pg_port: int,
|
||||
pg_port: int | None = None,
|
||||
source_connection_string: str | None = None,
|
||||
restore_connection_string: str | None = None,
|
||||
s3prefix: str | None = None,
|
||||
interactive: bool = False,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
@@ -60,11 +61,14 @@ class FastImport(AbstractNeonCli):
|
||||
args = [
|
||||
f"--pg-bin-dir={self.pg_bin}",
|
||||
f"--pg-lib-dir={self.pg_lib}",
|
||||
f"--pg-port={pg_port}",
|
||||
f"--working-directory={self.workdir}",
|
||||
]
|
||||
if pg_port is not None:
|
||||
args.append(f"--pg-port={pg_port}")
|
||||
if source_connection_string is not None:
|
||||
args.append(f"--source-connection-string={source_connection_string}")
|
||||
if restore_connection_string is not None:
|
||||
args.append(f"--restore-connection-string={restore_connection_string}")
|
||||
if s3prefix is not None:
|
||||
args.append(f"--s3-prefix={s3prefix}")
|
||||
if interactive:
|
||||
|
||||
@@ -26,6 +26,7 @@ from urllib.parse import quote, urlparse
|
||||
|
||||
import asyncpg
|
||||
import backoff
|
||||
import boto3
|
||||
import httpx
|
||||
import psycopg2
|
||||
import psycopg2.sql
|
||||
@@ -36,6 +37,8 @@ from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from jwcrypto import jwk
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -198,6 +201,30 @@ def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server.kill()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_kms(mock_s3_server: MockS3Server) -> Iterator[KMSClient]:
|
||||
yield boto3.client(
|
||||
"kms",
|
||||
endpoint_url=mock_s3_server.endpoint(),
|
||||
region_name=mock_s3_server.region(),
|
||||
aws_access_key_id=mock_s3_server.access_key(),
|
||||
aws_secret_access_key=mock_s3_server.secret_key(),
|
||||
aws_session_token=mock_s3_server.session_token(),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_client(mock_s3_server: MockS3Server) -> Iterator[S3Client]:
|
||||
yield boto3.client(
|
||||
"s3",
|
||||
endpoint_url=mock_s3_server.endpoint(),
|
||||
region_name=mock_s3_server.region(),
|
||||
aws_access_key_id=mock_s3_server.access_key(),
|
||||
aws_secret_access_key=mock_s3_server.secret_key(),
|
||||
aws_session_token=mock_s3_server.session_token(),
|
||||
)
|
||||
|
||||
|
||||
class PgProtocol:
|
||||
"""Reusable connection logic"""
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
@@ -16,8 +18,11 @@ from fixtures.pageserver.http import (
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
|
||||
from fixtures.utils import run_only_on_postgres
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -100,13 +105,15 @@ def test_pgdata_import_smoke(
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size/8192} pages (target: {target_relblock_size//8192}) pages"
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(f"insert into t select generate_series({nrows+1}, {nrows + addrows})")
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
expect_nrows = nrows
|
||||
expect_sum = (
|
||||
@@ -317,10 +324,6 @@ def test_pgdata_import_smoke(
|
||||
br_initdb_endpoint.safe_psql("select * from othertable")
|
||||
|
||||
|
||||
@run_only_on_postgres(
|
||||
[PgVersion.V14, PgVersion.V15, PgVersion.V16],
|
||||
"newer control file catalog version and struct format isn't supported",
|
||||
)
|
||||
def test_fast_import_binary(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
@@ -347,6 +350,104 @@ def test_fast_import_binary(
|
||||
assert res[0][0] == 10
|
||||
|
||||
|
||||
def test_fast_import_restore_to_connstring(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
fast_import: FastImport,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
|
||||
|
||||
pgdatadir = test_output_dir / "restore-pgdata"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(pgdatadir, pg_bin, port) as restore_vanilla_pg:
|
||||
restore_vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
|
||||
restore_vanilla_pg.start()
|
||||
|
||||
fast_import.run(
|
||||
source_connection_string=vanilla_pg.connstr(),
|
||||
restore_connection_string=restore_vanilla_pg.connstr(),
|
||||
)
|
||||
vanilla_pg.stop()
|
||||
|
||||
res = restore_vanilla_pg.safe_psql("SELECT count(*) FROM foo;")
|
||||
log.info(f"Result: {res}")
|
||||
assert res[0][0] == 10
|
||||
|
||||
|
||||
def test_fast_import_restore_to_connstring_from_s3_spec(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
fast_import: FastImport,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
mock_s3_server: MockS3Server,
|
||||
mock_kms: KMSClient,
|
||||
mock_s3_client: S3Client,
|
||||
):
|
||||
# Prepare KMS and S3
|
||||
key_response = mock_kms.create_key(
|
||||
Description="Test key",
|
||||
KeyUsage="ENCRYPT_DECRYPT",
|
||||
Origin="AWS_KMS",
|
||||
)
|
||||
key_id = key_response["KeyMetadata"]["KeyId"]
|
||||
|
||||
def encrypt(x: str) -> EncryptResponseTypeDef:
|
||||
return mock_kms.encrypt(KeyId=key_id, Plaintext=x)
|
||||
|
||||
# Start source postgres and ingest data
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
|
||||
|
||||
# Start target postgres
|
||||
pgdatadir = test_output_dir / "restore-pgdata"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(pgdatadir, pg_bin, port) as restore_vanilla_pg:
|
||||
restore_vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
|
||||
restore_vanilla_pg.start()
|
||||
|
||||
# Encrypt connstrings and put spec into S3
|
||||
source_connstring_encrypted = encrypt(vanilla_pg.connstr())
|
||||
restore_connstring_encrypted = encrypt(restore_vanilla_pg.connstr())
|
||||
spec = {
|
||||
"encryption_secret": {"KMS": {"key_id": key_id}},
|
||||
"source_connstring_ciphertext_base64": base64.b64encode(
|
||||
source_connstring_encrypted["CiphertextBlob"]
|
||||
).decode("utf-8"),
|
||||
"restore_connstring_ciphertext_base64": base64.b64encode(
|
||||
restore_connstring_encrypted["CiphertextBlob"]
|
||||
).decode("utf-8"),
|
||||
}
|
||||
|
||||
mock_s3_client.create_bucket(Bucket="test-bucket")
|
||||
mock_s3_client.put_object(
|
||||
Bucket="test-bucket", Key="test-prefix/spec.json", Body=json.dumps(spec)
|
||||
)
|
||||
|
||||
# Run fast_import
|
||||
if fast_import.extra_env is None:
|
||||
fast_import.extra_env = {}
|
||||
fast_import.extra_env["AWS_ACCESS_KEY_ID"] = mock_s3_server.access_key()
|
||||
fast_import.extra_env["AWS_SECRET_ACCESS_KEY"] = mock_s3_server.secret_key()
|
||||
fast_import.extra_env["AWS_SESSION_TOKEN"] = mock_s3_server.session_token()
|
||||
fast_import.extra_env["AWS_REGION"] = mock_s3_server.region()
|
||||
fast_import.extra_env["AWS_ENDPOINT_URL"] = mock_s3_server.endpoint()
|
||||
fast_import.extra_env["RUST_LOG"] = "aws_config=debug,aws_sdk_kms=debug"
|
||||
fast_import.run(s3prefix="s3://test-bucket/test-prefix")
|
||||
vanilla_pg.stop()
|
||||
|
||||
res = restore_vanilla_pg.safe_psql("SELECT count(*) FROM foo;")
|
||||
log.info(f"Result: {res}")
|
||||
assert res[0][0] == 10
|
||||
|
||||
|
||||
# TODO: Maybe test with pageserver?
|
||||
# 1. run whole neon env
|
||||
# 2. create timeline with some s3 path???
|
||||
|
||||
Reference in New Issue
Block a user