mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
19 Commits
hyper1
...
f88fe021-i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c3b4fab14 | ||
|
|
bacc52cf03 | ||
|
|
5bcc5e4891 | ||
|
|
05e3d16a18 | ||
|
|
4724e4b2d1 | ||
|
|
971c03873f | ||
|
|
ffd778a4a2 | ||
|
|
a25ccce3c8 | ||
|
|
1389d6b6a5 | ||
|
|
8ca3faa61e | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
@@ -3,27 +3,8 @@
|
||||
set -e
|
||||
|
||||
RELEASE=${RELEASE:-false}
|
||||
|
||||
# look at docker hub for latest tag for neon docker image
|
||||
if [ "${RELEASE}" = "true" ]; then
|
||||
echo "search latest relase tag"
|
||||
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/neondatabase/neon/tags |jq -r -S '.[].name' | grep release | sed 's/release-//g' | grep -E '^[0-9]+$' | sort -n | tail -1)
|
||||
if [ -z "${VERSION}" ]; then
|
||||
echo "no any docker tags found, exiting..."
|
||||
exit 1
|
||||
else
|
||||
TAG="release-${VERSION}"
|
||||
fi
|
||||
else
|
||||
echo "search latest dev tag"
|
||||
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/neondatabase/neon/tags |jq -r -S '.[].name' | grep -E '^[0-9]+$' | sort -n | tail -1)
|
||||
if [ -z "${VERSION}" ]; then
|
||||
echo "no any docker tags found, exiting..."
|
||||
exit 1
|
||||
else
|
||||
TAG="${VERSION}"
|
||||
fi
|
||||
fi
|
||||
VERSION="1"
|
||||
TAG="backport-f88fe021-import-patch-${VERSION}"
|
||||
|
||||
echo "found ${VERSION}"
|
||||
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
[pageservers]
|
||||
#zenith-1-ps-1 console_region_id=1
|
||||
zenith-1-ps-2 console_region_id=1
|
||||
zenith-1-ps-3 console_region_id=1
|
||||
|
||||
[safekeepers]
|
||||
zenith-1-sk-1 console_region_id=1
|
||||
zenith-1-sk-2 console_region_id=1
|
||||
zenith-1-sk-3 console_region_id=1
|
||||
|
||||
[storage:children]
|
||||
pageservers
|
||||
|
||||
@@ -462,9 +462,6 @@ jobs:
|
||||
- checkout
|
||||
- setup_remote_docker:
|
||||
docker_layer_caching: true
|
||||
# Build neondatabase/compute-tools:latest image and push it to Docker hub
|
||||
# TODO: this should probably also use versioned tag, not just :latest.
|
||||
# XXX: but should it? We build and use it only locally now.
|
||||
- run:
|
||||
name: Build and push compute-tools Docker image
|
||||
command: |
|
||||
@@ -472,7 +469,10 @@ jobs:
|
||||
docker build \
|
||||
--build-arg AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}" \
|
||||
--build-arg AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}" \
|
||||
--tag neondatabase/compute-tools:latest -f Dockerfile.compute-tools .
|
||||
--tag neondatabase/compute-tools:local \
|
||||
--tag neondatabase/compute-tools:latest \
|
||||
-f Dockerfile.compute-tools .
|
||||
# Only push :latest image
|
||||
docker push neondatabase/compute-tools:latest
|
||||
- run:
|
||||
name: Init postgres submodule
|
||||
@@ -482,7 +482,9 @@ jobs:
|
||||
command: |
|
||||
echo $NEON_DOCKER_PWD | docker login -u $NEON_DOCKER_LOGIN --password-stdin
|
||||
DOCKER_TAG=$(git log --oneline|wc -l)
|
||||
docker build --tag neondatabase/compute-node:${DOCKER_TAG} --tag neondatabase/compute-node:latest vendor/postgres
|
||||
docker build --tag neondatabase/compute-node:${DOCKER_TAG} \
|
||||
--tag neondatabase/compute-node:latest vendor/postgres \
|
||||
--build-arg COMPUTE_TOOLS_TAG=local
|
||||
docker push neondatabase/compute-node:${DOCKER_TAG}
|
||||
docker push neondatabase/compute-node:latest
|
||||
|
||||
@@ -501,15 +503,14 @@ jobs:
|
||||
name: Build and push Docker image
|
||||
command: |
|
||||
echo $NEON_DOCKER_PWD | docker login -u $NEON_DOCKER_LOGIN --password-stdin
|
||||
DOCKER_TAG="release-$(git log --oneline|wc -l)"
|
||||
DOCKER_TAG="backport-f88fe021-import-patch-1"
|
||||
docker build \
|
||||
--pull \
|
||||
--build-arg GIT_VERSION=${CIRCLE_SHA1} \
|
||||
--build-arg AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}" \
|
||||
--build-arg AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}" \
|
||||
--tag neondatabase/neon:${DOCKER_TAG} --tag neondatabase/neon:release .
|
||||
--tag neondatabase/neon:${DOCKER_TAG} .
|
||||
docker push neondatabase/neon:${DOCKER_TAG}
|
||||
docker push neondatabase/neon:release
|
||||
|
||||
# Build production neondatabase/compute-node:release image and push it to Docker hub
|
||||
docker-image-compute-release:
|
||||
@@ -519,18 +520,19 @@ jobs:
|
||||
- checkout
|
||||
- setup_remote_docker:
|
||||
docker_layer_caching: true
|
||||
# Build neondatabase/compute-tools:release image and push it to Docker hub
|
||||
# TODO: this should probably also use versioned tag, not just :latest.
|
||||
# XXX: but should it? We build and use it only locally now.
|
||||
- run:
|
||||
name: Build and push compute-tools Docker image
|
||||
command: |
|
||||
echo $NEON_DOCKER_PWD | docker login -u $NEON_DOCKER_LOGIN --password-stdin
|
||||
docker build \
|
||||
--build-arg AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}" \
|
||||
--build-arg AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}" \
|
||||
--tag neondatabase/compute-tools:release -f Dockerfile.compute-tools .
|
||||
docker push neondatabase/compute-tools:release
|
||||
# don't build new compute image
|
||||
# docker build \
|
||||
# --build-arg AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}" \
|
||||
# --build-arg AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}" \
|
||||
# --tag neondatabase/compute-tools:release \
|
||||
# --tag neondatabase/compute-tools:local \
|
||||
# -f Dockerfile.compute-tools .
|
||||
# # Only push :release image
|
||||
# docker push neondatabase/compute-tools:release
|
||||
- run:
|
||||
name: Init postgres submodule
|
||||
command: git submodule update --init --depth 1
|
||||
@@ -539,9 +541,12 @@ jobs:
|
||||
command: |
|
||||
echo $NEON_DOCKER_PWD | docker login -u $NEON_DOCKER_LOGIN --password-stdin
|
||||
DOCKER_TAG="release-$(git log --oneline|wc -l)"
|
||||
docker build --tag neondatabase/compute-node:${DOCKER_TAG} --tag neondatabase/compute-node:release vendor/postgres
|
||||
docker push neondatabase/compute-node:${DOCKER_TAG}
|
||||
docker push neondatabase/compute-node:release
|
||||
# don't build new compute image
|
||||
# docker build --tag neondatabase/compute-node:${DOCKER_TAG} \
|
||||
# --tag neondatabase/compute-node:release vendor/postgres \
|
||||
# --build-arg COMPUTE_TOOLS_TAG=local
|
||||
# docker push neondatabase/compute-node:${DOCKER_TAG}
|
||||
# docker push neondatabase/compute-node:release
|
||||
|
||||
deploy-staging:
|
||||
docker:
|
||||
@@ -676,22 +681,7 @@ jobs:
|
||||
KUBECONFIG: .kubeconfig
|
||||
steps:
|
||||
- checkout
|
||||
- run:
|
||||
name: Store kubeconfig file
|
||||
command: |
|
||||
echo "${PRODUCTION_KUBECONFIG_DATA}" | base64 --decode > ${KUBECONFIG}
|
||||
chmod 0600 ${KUBECONFIG}
|
||||
- run:
|
||||
name: Setup helm v3
|
||||
command: |
|
||||
curl -s https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
- run:
|
||||
name: Re-deploy proxy
|
||||
command: |
|
||||
DOCKER_TAG="release-$(git log --oneline|wc -l)"
|
||||
helm upgrade neon-proxy neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --install -f .circleci/helm-values/production.proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
# don't deploy proxy
|
||||
|
||||
# Trigger a new remote CI job
|
||||
remote-ci-trigger:
|
||||
@@ -850,11 +840,11 @@ workflows:
|
||||
- docker-image-release:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
# Build image only for commits to main
|
||||
# Build image only for commits to f88fe021-import-patch
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- release
|
||||
- f88fe021-import-patch
|
||||
requires:
|
||||
- pg_regress-tests-release
|
||||
- other-tests-release
|
||||
@@ -872,11 +862,11 @@ workflows:
|
||||
- deploy-release:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
# deploy only for commits to main
|
||||
# deploy only for commits to f88fe021-import-patch
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- release
|
||||
- f88fe021-import-patch
|
||||
requires:
|
||||
- docker-image-release
|
||||
- deploy-release-proxy:
|
||||
|
||||
23
.github/workflows/testing.yml
vendored
23
.github/workflows/testing.yml
vendored
@@ -34,11 +34,11 @@ jobs:
|
||||
if: matrix.os == 'ubuntu-latest'
|
||||
run: |
|
||||
sudo apt update
|
||||
sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev
|
||||
sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev libssl-dev
|
||||
|
||||
- name: Install macOs postgres dependencies
|
||||
- name: Install macOS postgres dependencies
|
||||
if: matrix.os == 'macos-latest'
|
||||
run: brew install flex bison
|
||||
run: brew install flex bison openssl
|
||||
|
||||
- name: Set pg revision for caching
|
||||
id: pg_ver
|
||||
@@ -52,10 +52,27 @@ jobs:
|
||||
tmp_install/
|
||||
key: ${{ runner.os }}-pg-${{ steps.pg_ver.outputs.pg_rev }}
|
||||
|
||||
- name: Set extra env for macOS
|
||||
if: matrix.os == 'macos-latest'
|
||||
run: |
|
||||
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
|
||||
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
|
||||
|
||||
- name: Build postgres
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: make postgres
|
||||
|
||||
# Plain configure output can contain weird errors like 'error: C compiler cannot create executables'
|
||||
# and the real cause will be inside config.log
|
||||
- name: Print configure logs in case of failure
|
||||
if: failure()
|
||||
continue-on-error: true
|
||||
run: |
|
||||
echo '' && echo '=== config.log ===' && echo ''
|
||||
cat tmp_install/build/config.log
|
||||
echo '' && echo '=== configure.log ===' && echo ''
|
||||
cat tmp_install/build/configure.log
|
||||
|
||||
- name: Cache cargo deps
|
||||
id: cache_cargo
|
||||
uses: actions/cache@v2
|
||||
|
||||
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -363,6 +363,16 @@ dependencies = [
|
||||
"textwrap 0.14.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "close_fds"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bc416f33de9d59e79e57560f450d21ff8393adcf1cdfc3e6d8fb93d5f88a2ed"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cmake"
|
||||
version = "0.1.48"
|
||||
@@ -1789,6 +1799,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap 3.0.14",
|
||||
"close_fds",
|
||||
"const_format",
|
||||
"crc32c",
|
||||
"crossbeam-utils",
|
||||
@@ -1830,6 +1841,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
|
||||
@@ -10,5 +10,6 @@ pub mod logger;
|
||||
pub mod compute;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
#[allow(clippy::format_push_string)] // Clippy's suggestion doesn't actually work
|
||||
pub mod pg_helpers;
|
||||
pub mod spec;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Write};
|
||||
use std::net::TcpStream;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
@@ -492,6 +493,56 @@ impl PageServerNode {
|
||||
|
||||
Ok(timeline_info_response)
|
||||
}
|
||||
|
||||
/// Import a basebackup prepared using either:
|
||||
/// a) `pg_basebackup -F tar`, or
|
||||
/// b) The `fullbackup` pageserver endpoint
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `tenant_id` - tenant to import into. Created if not exists
|
||||
/// * `timeline_id` - id to assign to imported timeline
|
||||
/// * `base` - (start lsn of basebackup, path to `base.tar` file)
|
||||
/// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`)
|
||||
pub fn timeline_import(
|
||||
&self,
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
base: (Lsn, PathBuf),
|
||||
pg_wal: Option<(Lsn, PathBuf)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
|
||||
|
||||
// Init base reader
|
||||
let (start_lsn, base_tarfile_path) = base;
|
||||
let base_tarfile = File::open(base_tarfile_path)?;
|
||||
let mut base_reader = BufReader::new(base_tarfile);
|
||||
|
||||
// Init wal reader if necessary
|
||||
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
|
||||
let wal_tarfile = File::open(wal_tarfile_path)?;
|
||||
let wal_reader = BufReader::new(wal_tarfile);
|
||||
(end_lsn, Some(wal_reader))
|
||||
} else {
|
||||
(start_lsn, None)
|
||||
};
|
||||
|
||||
// Import base
|
||||
let import_cmd =
|
||||
format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
|
||||
let mut writer = client.copy_in(&import_cmd)?;
|
||||
io::copy(&mut base_reader, &mut writer)?;
|
||||
writer.finish()?;
|
||||
|
||||
// Import wal if necessary
|
||||
if let Some(mut wal_reader) = wal_reader {
|
||||
let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
|
||||
let mut writer = client.copy_in(&import_cmd)?;
|
||||
io::copy(&mut wal_reader, &mut writer)?;
|
||||
writer.finish()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
|
||||
@@ -14,7 +14,7 @@ use safekeeper::defaults::{
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
};
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use utils::{
|
||||
@@ -159,6 +159,20 @@ fn main() -> Result<()> {
|
||||
.about("Create a new blank timeline")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg.clone()))
|
||||
.subcommand(App::new("import")
|
||||
.about("Import timeline from basebackup directory")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(Arg::new("node-name").long("node-name").takes_value(true)
|
||||
.help("Name to assign to the imported timeline"))
|
||||
.arg(Arg::new("base-tarfile").long("base-tarfile").takes_value(true)
|
||||
.help("Basebackup tarfile to import"))
|
||||
.arg(Arg::new("base-lsn").long("base-lsn").takes_value(true)
|
||||
.help("Lsn the basebackup starts at"))
|
||||
.arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true)
|
||||
.help("Wal to add after base"))
|
||||
.arg(Arg::new("end-lsn").long("end-lsn").takes_value(true)
|
||||
.help("Lsn the basebackup ends at")))
|
||||
).subcommand(
|
||||
App::new("tenant")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
@@ -613,6 +627,43 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
timeline.timeline_id, last_record_lsn, tenant_id,
|
||||
);
|
||||
}
|
||||
Some(("import", import_match)) => {
|
||||
let tenant_id = get_tenant_id(import_match, env)?;
|
||||
let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
|
||||
let name = import_match
|
||||
.value_of("node-name")
|
||||
.ok_or_else(|| anyhow!("No node name provided"))?;
|
||||
|
||||
// Parse base inputs
|
||||
let base_tarfile = import_match
|
||||
.value_of("base-tarfile")
|
||||
.map(|s| PathBuf::from_str(s).unwrap())
|
||||
.ok_or_else(|| anyhow!("No base-tarfile provided"))?;
|
||||
let base_lsn = Lsn::from_str(
|
||||
import_match
|
||||
.value_of("base-lsn")
|
||||
.ok_or_else(|| anyhow!("No base-lsn provided"))?,
|
||||
)?;
|
||||
let base = (base_lsn, base_tarfile);
|
||||
|
||||
// Parse pg_wal inputs
|
||||
let wal_tarfile = import_match
|
||||
.value_of("wal-tarfile")
|
||||
.map(|s| PathBuf::from_str(s).unwrap());
|
||||
let end_lsn = import_match
|
||||
.value_of("end-lsn")
|
||||
.map(|s| Lsn::from_str(s).unwrap());
|
||||
// TODO validate both or none are provided
|
||||
let pg_wal = end_lsn.zip(wal_tarfile);
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
println!("Importing timeline into pageserver ...");
|
||||
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?;
|
||||
println!("Creating node for imported timeline ...");
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
|
||||
println!("Done");
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
let tenant_id = get_tenant_id(branch_match, env)?;
|
||||
let new_branch_name = branch_match
|
||||
|
||||
@@ -60,6 +60,8 @@ metrics = { path = "../libs/metrics" }
|
||||
utils = { path = "../libs/utils" }
|
||||
remote_storage = { path = "../libs/remote_storage" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
close_fds = "0.3.2"
|
||||
walkdir = "2.3.2"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.3"
|
||||
|
||||
@@ -97,7 +97,9 @@ impl<'a> Basebackup<'a> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send_tarball(&mut self) -> anyhow::Result<()> {
|
||||
pub fn send_tarball(mut self) -> anyhow::Result<()> {
|
||||
// TODO include checksum
|
||||
|
||||
// Create pgdata subdirs structure
|
||||
for dir in pg_constants::PGDATA_SUBDIRS.iter() {
|
||||
let header = new_tar_header_dir(*dir)?;
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
//! Import data and WAL from a PostgreSQL data directory and WAL segments into
|
||||
//! a zenith Timeline.
|
||||
//!
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -10,16 +9,18 @@ use std::path::{Path, PathBuf};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use tracing::*;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::reltag::{RelTag, SlruKind};
|
||||
use crate::repository::Repository;
|
||||
use crate::repository::Timeline;
|
||||
use crate::walingest::WalIngest;
|
||||
use postgres_ffi::relfile_utils::*;
|
||||
use postgres_ffi::waldecoder::*;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
use postgres_ffi::Oid;
|
||||
use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
|
||||
use postgres_ffi::{Oid, TransactionId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
///
|
||||
@@ -35,100 +36,30 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
|
||||
) -> Result<()> {
|
||||
let mut pg_control: Option<ControlFileData> = None;
|
||||
|
||||
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
|
||||
// Then fishing out pg_control would be unnecessary
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
// Scan 'global'
|
||||
let mut relfiles: Vec<PathBuf> = Vec::new();
|
||||
for direntry in fs::read_dir(path.join("global"))? {
|
||||
let direntry = direntry?;
|
||||
match direntry.file_name().to_str() {
|
||||
None => continue,
|
||||
// Import all but pg_wal
|
||||
let all_but_wal = WalkDir::new(path)
|
||||
.into_iter()
|
||||
.filter_entry(|entry| !entry.path().ends_with("pg_wal"));
|
||||
for entry in all_but_wal {
|
||||
let entry = entry?;
|
||||
let metadata = entry.metadata().expect("error getting dir entry metadata");
|
||||
if metadata.is_file() {
|
||||
let absolute_path = entry.path();
|
||||
let relative_path = absolute_path.strip_prefix(path)?;
|
||||
|
||||
Some("pg_control") => {
|
||||
pg_control = Some(import_control_file(&mut modification, &direntry.path())?);
|
||||
let file = File::open(absolute_path)?;
|
||||
let len = metadata.len() as usize;
|
||||
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
Some("pg_filenode.map") => {
|
||||
import_relmap_file(
|
||||
&mut modification,
|
||||
pg_constants::GLOBALTABLESPACE_OID,
|
||||
0,
|
||||
&direntry.path(),
|
||||
)?;
|
||||
}
|
||||
|
||||
// Load any relation files into the page server (but only after the other files)
|
||||
_ => relfiles.push(direntry.path()),
|
||||
modification.flush()?;
|
||||
}
|
||||
}
|
||||
for relfile in relfiles {
|
||||
import_relfile(
|
||||
&mut modification,
|
||||
&relfile,
|
||||
pg_constants::GLOBALTABLESPACE_OID,
|
||||
0,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Scan 'base'. It contains database dirs, the database OID is the filename.
|
||||
// E.g. 'base/12345', where 12345 is the database OID.
|
||||
for direntry in fs::read_dir(path.join("base"))? {
|
||||
let direntry = direntry?;
|
||||
|
||||
//skip all temporary files
|
||||
if direntry.file_name().to_string_lossy() == "pgsql_tmp" {
|
||||
continue;
|
||||
}
|
||||
|
||||
let dboid = direntry.file_name().to_string_lossy().parse::<u32>()?;
|
||||
|
||||
let mut relfiles: Vec<PathBuf> = Vec::new();
|
||||
for direntry in fs::read_dir(direntry.path())? {
|
||||
let direntry = direntry?;
|
||||
match direntry.file_name().to_str() {
|
||||
None => continue,
|
||||
|
||||
Some("PG_VERSION") => {
|
||||
//modification.put_dbdir_creation(pg_constants::DEFAULTTABLESPACE_OID, dboid)?;
|
||||
}
|
||||
Some("pg_filenode.map") => import_relmap_file(
|
||||
&mut modification,
|
||||
pg_constants::DEFAULTTABLESPACE_OID,
|
||||
dboid,
|
||||
&direntry.path(),
|
||||
)?,
|
||||
|
||||
// Load any relation files into the page server
|
||||
_ => relfiles.push(direntry.path()),
|
||||
}
|
||||
}
|
||||
for relfile in relfiles {
|
||||
import_relfile(
|
||||
&mut modification,
|
||||
&relfile,
|
||||
pg_constants::DEFAULTTABLESPACE_OID,
|
||||
dboid,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_xact"))? {
|
||||
let entry = entry?;
|
||||
import_slru_file(&mut modification, SlruKind::Clog, &entry.path())?;
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_multixact").join("members"))? {
|
||||
let entry = entry?;
|
||||
import_slru_file(&mut modification, SlruKind::MultiXactMembers, &entry.path())?;
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? {
|
||||
let entry = entry?;
|
||||
import_slru_file(&mut modification, SlruKind::MultiXactOffsets, &entry.path())?;
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_twophase"))? {
|
||||
let entry = entry?;
|
||||
let xid = u32::from_str_radix(&entry.path().to_string_lossy(), 16)?;
|
||||
import_twophase_file(&mut modification, xid, &entry.path())?;
|
||||
}
|
||||
// TODO: Scan pg_tblspc
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit()?;
|
||||
@@ -158,31 +89,30 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
|
||||
}
|
||||
|
||||
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
|
||||
fn import_relfile<R: Repository>(
|
||||
fn import_rel<R: Repository, Reader: Read>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
path: &Path,
|
||||
spcoid: Oid,
|
||||
dboid: Oid,
|
||||
mut reader: Reader,
|
||||
len: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
// Does it look like a relation file?
|
||||
trace!("importing rel file {}", path.display());
|
||||
|
||||
let (relnode, forknum, segno) = parse_relfilename(&path.file_name().unwrap().to_string_lossy())
|
||||
.map_err(|e| {
|
||||
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
|
||||
e
|
||||
})?;
|
||||
let filename = &path
|
||||
.file_name()
|
||||
.expect("missing rel filename")
|
||||
.to_string_lossy();
|
||||
let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
|
||||
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
let mut file = File::open(path)?;
|
||||
let mut buf: [u8; 8192] = [0u8; 8192];
|
||||
|
||||
let len = file.metadata().unwrap().len();
|
||||
ensure!(len % pg_constants::BLCKSZ as u64 == 0);
|
||||
let nblocks = len / pg_constants::BLCKSZ as u64;
|
||||
|
||||
if segno != 0 {
|
||||
todo!();
|
||||
}
|
||||
ensure!(len % pg_constants::BLCKSZ as usize == 0);
|
||||
let nblocks = len / pg_constants::BLCKSZ as usize;
|
||||
|
||||
let rel = RelTag {
|
||||
spcnode: spcoid,
|
||||
@@ -190,11 +120,22 @@ fn import_relfile<R: Repository>(
|
||||
relnode,
|
||||
forknum,
|
||||
};
|
||||
modification.put_rel_creation(rel, nblocks as u32)?;
|
||||
|
||||
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
|
||||
|
||||
// Call put_rel_creation for every segment of the relation,
|
||||
// because there is no guarantee about the order in which we are processing segments.
|
||||
// ignore "relation already exists" error
|
||||
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) {
|
||||
if e.to_string().contains("already exists") {
|
||||
debug!("relation {} already exists. we must be extending it", rel);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let r = file.read_exact(&mut buf);
|
||||
let r = reader.read_exact(&mut buf);
|
||||
match r {
|
||||
Ok(_) => {
|
||||
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
|
||||
@@ -204,7 +145,9 @@ fn import_relfile<R: Repository>(
|
||||
Err(err) => match err.kind() {
|
||||
std::io::ErrorKind::UnexpectedEof => {
|
||||
// reached EOF. That's expected.
|
||||
ensure!(blknum == nblocks as u32, "unexpected EOF");
|
||||
let relative_blknum =
|
||||
blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
|
||||
ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
@@ -215,96 +158,43 @@ fn import_relfile<R: Repository>(
|
||||
blknum += 1;
|
||||
}
|
||||
|
||||
// Update relation size
|
||||
//
|
||||
// If we process rel segments out of order,
|
||||
// put_rel_extend will skip the update.
|
||||
modification.put_rel_extend(rel, blknum)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Import a relmapper (pg_filenode.map) file into the repository
|
||||
fn import_relmap_file<R: Repository>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
path: &Path,
|
||||
) -> Result<()> {
|
||||
let mut file = File::open(path)?;
|
||||
let mut buffer = Vec::new();
|
||||
// read the whole file
|
||||
file.read_to_end(&mut buffer)?;
|
||||
|
||||
trace!("importing relmap file {}", path.display());
|
||||
|
||||
modification.put_relmap_file(spcnode, dbnode, Bytes::copy_from_slice(&buffer[..]))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Import a twophase state file (pg_twophase/<xid>) into the repository
|
||||
fn import_twophase_file<R: Repository>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
xid: TransactionId,
|
||||
path: &Path,
|
||||
) -> Result<()> {
|
||||
let mut file = File::open(path)?;
|
||||
let mut buffer = Vec::new();
|
||||
// read the whole file
|
||||
file.read_to_end(&mut buffer)?;
|
||||
|
||||
trace!("importing non-rel file {}", path.display());
|
||||
|
||||
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Import pg_control file into the repository.
|
||||
///
|
||||
/// The control file is imported as is, but we also extract the checkpoint record
|
||||
/// from it and store it separated.
|
||||
fn import_control_file<R: Repository>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
path: &Path,
|
||||
) -> Result<ControlFileData> {
|
||||
let mut file = File::open(path)?;
|
||||
let mut buffer = Vec::new();
|
||||
// read the whole file
|
||||
file.read_to_end(&mut buffer)?;
|
||||
|
||||
trace!("importing control file {}", path.display());
|
||||
|
||||
// Import it as ControlFile
|
||||
modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?;
|
||||
|
||||
// Extract the checkpoint record and import it separately.
|
||||
let pg_control = ControlFileData::decode(&buffer)?;
|
||||
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
|
||||
modification.put_checkpoint(checkpoint_bytes)?;
|
||||
|
||||
Ok(pg_control)
|
||||
}
|
||||
|
||||
///
|
||||
/// Import an SLRU segment file
|
||||
///
|
||||
fn import_slru_file<R: Repository>(
|
||||
fn import_slru<R: Repository, Reader: Read>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
slru: SlruKind,
|
||||
path: &Path,
|
||||
mut reader: Reader,
|
||||
len: usize,
|
||||
) -> Result<()> {
|
||||
trace!("importing slru file {}", path.display());
|
||||
|
||||
let mut file = File::open(path)?;
|
||||
let mut buf: [u8; 8192] = [0u8; 8192];
|
||||
let segno = u32::from_str_radix(&path.file_name().unwrap().to_string_lossy(), 16)?;
|
||||
let filename = &path
|
||||
.file_name()
|
||||
.expect("missing slru filename")
|
||||
.to_string_lossy();
|
||||
let segno = u32::from_str_radix(filename, 16)?;
|
||||
|
||||
let len = file.metadata().unwrap().len();
|
||||
ensure!(len % pg_constants::BLCKSZ as u64 == 0); // we assume SLRU block size is the same as BLCKSZ
|
||||
let nblocks = len / pg_constants::BLCKSZ as u64;
|
||||
ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
|
||||
let nblocks = len / pg_constants::BLCKSZ as usize;
|
||||
|
||||
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as u64);
|
||||
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
|
||||
|
||||
modification.put_slru_segment_creation(slru, segno, nblocks as u32)?;
|
||||
|
||||
let mut rpageno = 0;
|
||||
loop {
|
||||
let r = file.read_exact(&mut buf);
|
||||
let r = reader.read_exact(&mut buf);
|
||||
match r {
|
||||
Ok(_) => {
|
||||
modification.put_slru_page_image(
|
||||
@@ -396,10 +286,272 @@ fn import_wal<R: Repository>(
|
||||
}
|
||||
|
||||
if last_lsn != startpoint {
|
||||
debug!("reached end of WAL at {}", last_lsn);
|
||||
info!("reached end of WAL at {}", last_lsn);
|
||||
} else {
|
||||
info!("no WAL to import at {}", last_lsn);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
|
||||
tline: &mut DatadirTimeline<R>,
|
||||
reader: Reader,
|
||||
base_lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
info!("importing base at {}", base_lsn);
|
||||
let mut modification = tline.begin_modification(base_lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
let mut pg_control: Option<ControlFileData> = None;
|
||||
|
||||
// Import base
|
||||
for base_tar_entry in tar::Archive::new(reader).entries()? {
|
||||
let entry = base_tar_entry?;
|
||||
let header = entry.header();
|
||||
let len = header.entry_size()? as usize;
|
||||
let file_path = header.path()?.into_owned();
|
||||
|
||||
match header.entry_type() {
|
||||
tar::EntryType::Regular => {
|
||||
if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? {
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
}
|
||||
modification.flush()?;
|
||||
}
|
||||
tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
}
|
||||
_ => {
|
||||
panic!("tar::EntryType::?? {}", file_path.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn import_wal_from_tar<R: Repository, Reader: Read>(
|
||||
tline: &mut DatadirTimeline<R>,
|
||||
reader: Reader,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
// Set up walingest mutable state
|
||||
let mut waldecoder = WalStreamDecoder::new(start_lsn);
|
||||
let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut offset = start_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut last_lsn = start_lsn;
|
||||
let mut walingest = WalIngest::new(tline, start_lsn)?;
|
||||
|
||||
// Ingest wal until end_lsn
|
||||
info!("importing wal until {}", end_lsn);
|
||||
let mut pg_wal_tar = tar::Archive::new(reader);
|
||||
let mut pg_wal_entries_iter = pg_wal_tar.entries()?;
|
||||
while last_lsn <= end_lsn {
|
||||
let bytes = {
|
||||
let entry = pg_wal_entries_iter.next().expect("expected more wal")?;
|
||||
let header = entry.header();
|
||||
let file_path = header.path()?.into_owned();
|
||||
|
||||
match header.entry_type() {
|
||||
tar::EntryType::Regular => {
|
||||
// FIXME: assume postgresql tli 1 for now
|
||||
let expected_filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
|
||||
let file_name = file_path
|
||||
.file_name()
|
||||
.expect("missing wal filename")
|
||||
.to_string_lossy();
|
||||
ensure!(expected_filename == file_name);
|
||||
|
||||
debug!("processing wal file {:?}", file_path);
|
||||
read_all_bytes(entry)?
|
||||
}
|
||||
tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
panic!("tar::EntryType::?? {}", file_path.display());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
waldecoder.feed_bytes(&bytes[offset..]);
|
||||
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest.ingest_record(tline, recdata, lsn)?;
|
||||
last_lsn = lsn;
|
||||
|
||||
debug!("imported record at {} (end {})", lsn, end_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
debug!("imported records up to {}", last_lsn);
|
||||
segno += 1;
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
if last_lsn != start_lsn {
|
||||
info!("reached end of WAL at {}", last_lsn);
|
||||
} else {
|
||||
info!("there was no WAL to import at {}", last_lsn);
|
||||
}
|
||||
|
||||
// Log any extra unused files
|
||||
for e in &mut pg_wal_entries_iter {
|
||||
let entry = e?;
|
||||
let header = entry.header();
|
||||
let file_path = header.path()?.into_owned();
|
||||
info!("skipping {:?}", file_path);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn import_file<R: Repository, Reader: Read>(
|
||||
modification: &mut DatadirModification<R>,
|
||||
file_path: &Path,
|
||||
reader: Reader,
|
||||
len: usize,
|
||||
) -> Result<Option<ControlFileData>> {
|
||||
debug!("looking at {:?}", file_path);
|
||||
|
||||
if file_path.starts_with("global") {
|
||||
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
|
||||
let dbnode = 0;
|
||||
|
||||
match file_path
|
||||
.file_name()
|
||||
.expect("missing filename")
|
||||
.to_string_lossy()
|
||||
.as_ref()
|
||||
{
|
||||
"pg_control" => {
|
||||
let bytes = read_all_bytes(reader)?;
|
||||
|
||||
// Extract the checkpoint record and import it separately.
|
||||
let pg_control = ControlFileData::decode(&bytes[..])?;
|
||||
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
|
||||
modification.put_checkpoint(checkpoint_bytes)?;
|
||||
debug!("imported control file");
|
||||
|
||||
// Import it as ControlFile
|
||||
modification.put_control_file(bytes)?;
|
||||
return Ok(Some(pg_control));
|
||||
}
|
||||
"pg_filenode.map" => {
|
||||
let bytes = read_all_bytes(reader)?;
|
||||
modification.put_relmap_file(spcnode, dbnode, bytes)?;
|
||||
debug!("imported relmap file")
|
||||
}
|
||||
"PG_VERSION" => {
|
||||
debug!("ignored");
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
|
||||
debug!("imported rel creation");
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("base") {
|
||||
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
|
||||
let dbnode: u32 = file_path
|
||||
.iter()
|
||||
.nth(1)
|
||||
.expect("invalid file path, expected dbnode")
|
||||
.to_string_lossy()
|
||||
.parse()?;
|
||||
|
||||
match file_path
|
||||
.file_name()
|
||||
.expect("missing base filename")
|
||||
.to_string_lossy()
|
||||
.as_ref()
|
||||
{
|
||||
"pg_filenode.map" => {
|
||||
let bytes = read_all_bytes(reader)?;
|
||||
modification.put_relmap_file(spcnode, dbnode, bytes)?;
|
||||
debug!("imported relmap file")
|
||||
}
|
||||
"PG_VERSION" => {
|
||||
debug!("ignored");
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
|
||||
debug!("imported rel creation");
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("pg_xact") {
|
||||
let slru = SlruKind::Clog;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len)?;
|
||||
debug!("imported clog slru");
|
||||
} else if file_path.starts_with("pg_multixact/offsets") {
|
||||
let slru = SlruKind::MultiXactOffsets;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len)?;
|
||||
debug!("imported multixact offsets slru");
|
||||
} else if file_path.starts_with("pg_multixact/members") {
|
||||
let slru = SlruKind::MultiXactMembers;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len)?;
|
||||
debug!("imported multixact members slru");
|
||||
} else if file_path.starts_with("pg_twophase") {
|
||||
let file_name = &file_path
|
||||
.file_name()
|
||||
.expect("missing twophase filename")
|
||||
.to_string_lossy();
|
||||
let xid = u32::from_str_radix(file_name, 16)?;
|
||||
|
||||
let bytes = read_all_bytes(reader)?;
|
||||
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
|
||||
debug!("imported twophase file");
|
||||
} else if file_path.starts_with("pg_wal") {
|
||||
debug!("found wal file in base section. ignore it");
|
||||
} else if file_path.starts_with("zenith.signal") {
|
||||
// Parse zenith signal file to set correct previous LSN
|
||||
let bytes = read_all_bytes(reader)?;
|
||||
// zenith.signal format is "PREV LSN: prev_lsn"
|
||||
// TODO write serialization and deserialization in the same place.
|
||||
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
|
||||
let prev_lsn = match zenith_signal {
|
||||
"PREV LSN: none" => Lsn(0),
|
||||
"PREV LSN: invalid" => Lsn(0),
|
||||
other => {
|
||||
let split = other.split(':').collect::<Vec<_>>();
|
||||
split[1]
|
||||
.trim()
|
||||
.parse::<Lsn>()
|
||||
.context("can't parse zenith.signal")?
|
||||
}
|
||||
};
|
||||
|
||||
// zenith.signal is not necessarily the last file, that we handle
|
||||
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
||||
// will update lsn once more to the final one.
|
||||
let writer = modification.tline.tline.writer();
|
||||
writer.finish_write(prev_lsn);
|
||||
|
||||
debug!("imported zenith signal {}", prev_lsn);
|
||||
} else if file_path.starts_with("pg_tblspc") {
|
||||
// TODO Backups exported from neon won't have pg_tblspc, but we will need
|
||||
// this to import arbitrary postgres databases.
|
||||
bail!("Importing pg_tblspc is not implemented");
|
||||
} else {
|
||||
debug!("ignored");
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn read_all_bytes<Reader: Read>(mut reader: Reader) -> Result<Bytes> {
|
||||
let mut buf: Vec<u8> = vec![];
|
||||
reader.read_to_end(&mut buf)?;
|
||||
Ok(Bytes::copy_from_slice(&buf[..]))
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::KeySpace;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::storage_sync::index::RemoteIndex;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
|
||||
@@ -242,15 +242,15 @@ impl Repository for LayeredRepository {
|
||||
);
|
||||
timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
|
||||
|
||||
// Insert if not exists
|
||||
let timeline = Arc::new(timeline);
|
||||
let r = timelines.insert(
|
||||
timelineid,
|
||||
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
|
||||
);
|
||||
ensure!(
|
||||
r.is_none(),
|
||||
"assertion failure, inserted duplicate timeline"
|
||||
);
|
||||
match timelines.entry(timelineid) {
|
||||
Entry::Occupied(_) => bail!("Timeline already exists"),
|
||||
Entry::Vacant(vacant) => {
|
||||
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -1562,7 +1562,7 @@ impl LayeredTimeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
@@ -1690,26 +1690,28 @@ impl LayeredTimeline {
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
// repository have the same LSN.
|
||||
let lsn_range = frozen_layer.get_lsn_range();
|
||||
|
||||
// Sync the new layer to disk.
|
||||
//
|
||||
// We must also fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
//
|
||||
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
|
||||
// files to flush, it might be better to first write them all, and then fsync
|
||||
// them all in parallel.
|
||||
par_fsync::par_fsync(&[
|
||||
new_delta_path.clone(),
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
])?;
|
||||
let layer_paths_to_upload = if lsn_range.start == self.initdb_lsn
|
||||
&& lsn_range.end == Lsn(self.initdb_lsn.0 + 1)
|
||||
{
|
||||
let pgdir = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)?;
|
||||
let (partitioning, _lsn) =
|
||||
pgdir.repartition(self.initdb_lsn, self.get_compaction_target_size())?;
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true)?
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let delta_path = self.create_delta_layer(&frozen_layer)?;
|
||||
HashSet::from([delta_path])
|
||||
};
|
||||
fail_point!("checkpoint-before-sync");
|
||||
|
||||
fail_point!("flush-frozen");
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layer
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
@@ -1719,19 +1721,27 @@ impl LayeredTimeline {
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert!(Arc::ptr_eq(&l.unwrap(), &frozen_layer));
|
||||
|
||||
// Add the new delta layer to the LayerMap
|
||||
layers.insert_historic(Arc::new(new_delta));
|
||||
|
||||
// release lock on 'layers'
|
||||
}
|
||||
|
||||
fail_point!("checkpoint-after-sync");
|
||||
|
||||
// Update the metadata file, with new 'disk_consistent_lsn'
|
||||
//
|
||||
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
||||
// *all* the layers, to avoid fsyncing the file multiple times.
|
||||
let disk_consistent_lsn = Lsn(frozen_layer.get_lsn_range().end.0 - 1);
|
||||
fail_point!("checkpoint-after-sync");
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
self.update_disk_consistent_lsn(disk_consistent_lsn, layer_paths_to_upload)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update metadata file
|
||||
fn update_disk_consistent_lsn(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashSet<PathBuf>,
|
||||
) -> Result<()> {
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
// After crash, we will restart WAL streaming and processing from that point.
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
@@ -1781,14 +1791,11 @@ impl LayeredTimeline {
|
||||
false,
|
||||
)?;
|
||||
|
||||
NUM_PERSISTENT_FILES_CREATED.inc_by(1);
|
||||
PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len());
|
||||
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
HashSet::from([new_delta_path]),
|
||||
layer_paths_to_upload,
|
||||
Some(metadata),
|
||||
);
|
||||
}
|
||||
@@ -1800,6 +1807,37 @@ impl LayeredTimeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Write out the given frozen in-memory layer as a new L0 delta file
|
||||
fn create_delta_layer(&self, frozen_layer: &InMemoryLayer) -> Result<PathBuf> {
|
||||
// Write it out
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
|
||||
// Sync it to disk.
|
||||
//
|
||||
// We must also fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
//
|
||||
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
|
||||
// files to flush, it might be better to first write them all, and then fsync
|
||||
// them all in parallel.
|
||||
par_fsync::par_fsync(&[
|
||||
new_delta_path.clone(),
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
])?;
|
||||
|
||||
// Add it to the layer map
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
layers.insert_historic(Arc::new(new_delta));
|
||||
}
|
||||
|
||||
NUM_PERSISTENT_FILES_CREATED.inc_by(1);
|
||||
PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len());
|
||||
|
||||
Ok(new_delta_path)
|
||||
}
|
||||
|
||||
pub fn compact(&self) -> Result<()> {
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
@@ -1843,29 +1881,23 @@ impl LayeredTimeline {
|
||||
if let Ok(pgdir) =
|
||||
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
|
||||
{
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let (partitioning, lsn) = pgdir.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
)?;
|
||||
let timer = self.create_images_time_histo.start_timer();
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let mut layer_paths_to_upload = HashSet::with_capacity(partitioning.parts.len());
|
||||
for part in partitioning.parts.iter() {
|
||||
if self.time_for_new_image_layer(part, lsn)? {
|
||||
let new_path = self.create_image_layer(part, lsn)?;
|
||||
layer_paths_to_upload.insert(new_path);
|
||||
}
|
||||
}
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
|
||||
if !layer_paths_to_upload.is_empty()
|
||||
&& self.upload_layers.load(atomic::Ordering::Relaxed)
|
||||
{
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
HashSet::from_iter(layer_paths_to_upload),
|
||||
None,
|
||||
);
|
||||
}
|
||||
timer.stop_and_record();
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.compact_time_histo.start_timer();
|
||||
@@ -1906,21 +1938,40 @@ impl LayeredTimeline {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn create_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<PathBuf> {
|
||||
let img_range =
|
||||
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
|
||||
let mut image_layer_writer =
|
||||
ImageLayerWriter::new(self.conf, self.timeline_id, self.tenant_id, &img_range, lsn)?;
|
||||
fn create_image_layers(
|
||||
&self,
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
) -> Result<HashSet<PathBuf>> {
|
||||
let timer = self.create_images_time_histo.start_timer();
|
||||
let mut image_layers: Vec<ImageLayer> = Vec::new();
|
||||
let mut layer_paths_to_upload = HashSet::new();
|
||||
for partition in partitioning.parts.iter() {
|
||||
if force || self.time_for_new_image_layer(partition, lsn)? {
|
||||
let img_range =
|
||||
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
)?;
|
||||
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
let img = self.get(key, lsn)?;
|
||||
image_layer_writer.put_image(key, &img)?;
|
||||
key = key.next();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
let img = self.get(key, lsn)?;
|
||||
image_layer_writer.put_image(key, &img)?;
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
let image_layer = image_layer_writer.finish()?;
|
||||
layer_paths_to_upload.insert(image_layer.path());
|
||||
image_layers.push(image_layer);
|
||||
}
|
||||
}
|
||||
let image_layer = image_layer_writer.finish()?;
|
||||
|
||||
// Sync the new layer to disk before adding it to the layer map, to make sure
|
||||
// we don't garbage collect something based on the new layer, before it has
|
||||
@@ -1931,19 +1982,18 @@ impl LayeredTimeline {
|
||||
//
|
||||
// Compaction creates multiple image layers. It would be better to create them all
|
||||
// and fsync them all in parallel.
|
||||
par_fsync::par_fsync(&[
|
||||
image_layer.path(),
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
])?;
|
||||
|
||||
// FIXME: Do we need to do something to upload it to remote storage here?
|
||||
let mut all_paths = Vec::from_iter(layer_paths_to_upload.clone());
|
||||
all_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
|
||||
par_fsync::par_fsync(&all_paths)?;
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let new_path = image_layer.path();
|
||||
layers.insert_historic(Arc::new(image_layer));
|
||||
for l in image_layers {
|
||||
layers.insert_historic(Arc::new(l));
|
||||
}
|
||||
drop(layers);
|
||||
timer.stop_and_record();
|
||||
|
||||
Ok(new_path)
|
||||
Ok(layer_paths_to_upload)
|
||||
}
|
||||
|
||||
///
|
||||
@@ -2190,6 +2240,9 @@ impl LayeredTimeline {
|
||||
LsnForTimestamp::Past(lsn) => {
|
||||
debug!("past({})", lsn);
|
||||
}
|
||||
LsnForTimestamp::NoData(lsn) => {
|
||||
debug!("nodata({})", lsn);
|
||||
}
|
||||
}
|
||||
debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn)
|
||||
}
|
||||
@@ -2463,7 +2516,7 @@ impl Deref for LayeredTimelineWriter<'_> {
|
||||
}
|
||||
|
||||
impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> {
|
||||
fn put(&self, key: Key, lsn: Lsn, value: Value) -> Result<()> {
|
||||
fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
}
|
||||
|
||||
@@ -2605,7 +2658,7 @@ pub mod tests {
|
||||
let TEST_KEY: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(TEST_KEY, Lsn(0x10), Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
writer.put(TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
@@ -2613,7 +2666,7 @@ pub mod tests {
|
||||
tline.compact()?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(TEST_KEY, Lsn(0x20), Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
writer.put(TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
@@ -2621,7 +2674,7 @@ pub mod tests {
|
||||
tline.compact()?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(TEST_KEY, Lsn(0x30), Value::Image(TEST_IMG("foo at 0x30")))?;
|
||||
writer.put(TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
drop(writer);
|
||||
|
||||
@@ -2629,7 +2682,7 @@ pub mod tests {
|
||||
tline.compact()?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(TEST_KEY, Lsn(0x40), Value::Image(TEST_IMG("foo at 0x40")))?;
|
||||
writer.put(TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
drop(writer);
|
||||
|
||||
@@ -2667,7 +2720,7 @@ pub mod tests {
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
@@ -2713,7 +2766,7 @@ pub mod tests {
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
@@ -2731,7 +2784,7 @@ pub mod tests {
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
@@ -2783,7 +2836,7 @@ pub mod tests {
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
@@ -2807,7 +2860,7 @@ pub mod tests {
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
println!("updating {} at {}", blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
@@ -2866,7 +2919,7 @@ pub mod tests {
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
)?;
|
||||
println!("updating [{}][{}] at {}", idx, blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
|
||||
@@ -34,7 +34,7 @@ pub trait BlobCursor {
|
||||
) -> Result<(), std::io::Error>;
|
||||
}
|
||||
|
||||
impl<'a, R> BlobCursor for BlockCursor<R>
|
||||
impl<R> BlobCursor for BlockCursor<R>
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
|
||||
@@ -267,13 +267,13 @@ impl InMemoryLayer {
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
pub fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timelineid, lsn);
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
|
||||
inner.assert_writeable();
|
||||
|
||||
let off = inner.file.write_blob(&Value::ser(&val)?)?;
|
||||
let off = inner.file.write_blob(&Value::ser(val)?)?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
|
||||
@@ -14,7 +14,7 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use std::io;
|
||||
use std::io::{self, Read};
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
@@ -30,6 +30,8 @@ use utils::{
|
||||
|
||||
use crate::basebackup;
|
||||
use crate::config::{PageServerConf, ProfilingConfig};
|
||||
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
|
||||
use crate::profiling::profpoint_start;
|
||||
use crate::reltag::RelTag;
|
||||
@@ -202,6 +204,96 @@ impl PagestreamBeMessage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements Read for the server side of CopyIn
|
||||
struct CopyInReader<'a> {
|
||||
pgb: &'a mut PostgresBackend,
|
||||
|
||||
/// Overflow buffer for bytes sent in CopyData messages
|
||||
/// that the reader (caller of read) hasn't asked for yet.
|
||||
/// TODO use BytesMut?
|
||||
buf: Vec<u8>,
|
||||
|
||||
/// Bytes before `buf_begin` are considered as dropped.
|
||||
/// This allows us to implement O(1) pop_front on Vec<u8>.
|
||||
/// The Vec won't grow large because we only add to it
|
||||
/// when it's empty.
|
||||
buf_begin: usize,
|
||||
}
|
||||
|
||||
impl<'a> CopyInReader<'a> {
|
||||
// NOTE: pgb should be in copy in state already
|
||||
fn new(pgb: &'a mut PostgresBackend) -> Self {
|
||||
Self {
|
||||
pgb,
|
||||
buf: Vec::<_>::new(),
|
||||
buf_begin: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for CopyInReader<'a> {
|
||||
fn drop(&mut self) {
|
||||
// Finalize copy protocol so that self.pgb can be reused
|
||||
// TODO instead, maybe take ownership of pgb and give it back at the end
|
||||
let mut buf: Vec<u8> = vec![];
|
||||
let _ = self.read_to_end(&mut buf);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Read for CopyInReader<'a> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
while !thread_mgr::is_shutdown_requested() {
|
||||
// Return from buffer if nonempty
|
||||
if self.buf_begin < self.buf.len() {
|
||||
let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin);
|
||||
buf[..bytes_to_read].copy_from_slice(&self.buf[self.buf_begin..][..bytes_to_read]);
|
||||
self.buf_begin += bytes_to_read;
|
||||
return Ok(bytes_to_read);
|
||||
}
|
||||
|
||||
// Delete garbage
|
||||
self.buf.clear();
|
||||
self.buf_begin = 0;
|
||||
|
||||
// Wait for client to send CopyData bytes
|
||||
match self.pgb.read_message() {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => return Ok(0),
|
||||
FeMessage::Sync => continue,
|
||||
m => {
|
||||
let msg = format!("unexpected message {:?}", m);
|
||||
self.pgb.write_message(&BeMessage::ErrorResponse(&msg))?;
|
||||
return Err(io::Error::new(io::ErrorKind::Other, msg));
|
||||
}
|
||||
};
|
||||
|
||||
// Return as much as we can, saving the rest in self.buf
|
||||
let mut reader = copy_data_bytes.reader();
|
||||
let bytes_read = reader.read(buf)?;
|
||||
reader.read_to_end(&mut self.buf)?;
|
||||
return Ok(bytes_read);
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection";
|
||||
self.pgb.write_message(&BeMessage::ErrorResponse(msg))?;
|
||||
return Err(io::Error::new(io::ErrorKind::Other, msg));
|
||||
}
|
||||
Err(e) => {
|
||||
if !is_socket_read_timed_out(&e) {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutting down
|
||||
let msg = "Importer thread was shut down";
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
///
|
||||
@@ -342,6 +434,7 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::drop_non_drop)] // Only complains in CI, not sure why
|
||||
fn handle_pagerequests(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend,
|
||||
@@ -423,6 +516,96 @@ impl PageServerHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_import_basebackup(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
base_lsn: Lsn,
|
||||
_end_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let _enter =
|
||||
info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
let timeline = repo.create_empty_timeline(timeline_id, base_lsn)?;
|
||||
let repartition_distance = repo.get_checkpoint_distance();
|
||||
let mut datadir_timeline =
|
||||
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
// from connecting before that and writing conflicting wal.
|
||||
//
|
||||
// This is not relevant for pageserver->pageserver migrations, since there's
|
||||
// no wal to import. But should be fixed if we want to import from postgres.
|
||||
|
||||
// TODO leave clean state on error. For now you can use detach to clean
|
||||
// up broken state from a failed import.
|
||||
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message(&BeMessage::CopyInResponse)?;
|
||||
let reader = CopyInReader::new(pgb);
|
||||
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
|
||||
|
||||
// TODO check checksum
|
||||
// Meanwhile you can verify client-side by taking fullbackup
|
||||
// and checking that it matches in size with what was imported.
|
||||
// It wouldn't work if base came from vanilla postgres though,
|
||||
// since we discard some log files.
|
||||
|
||||
// Flush data to disk, then upload to s3
|
||||
info!("flushing layers");
|
||||
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
|
||||
|
||||
info!("done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_import_wal(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let _enter =
|
||||
info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
let timeline = repo.get_timeline_load(timeline_id)?;
|
||||
ensure!(timeline.get_last_record_lsn() == start_lsn);
|
||||
|
||||
let repartition_distance = repo.get_checkpoint_distance();
|
||||
let mut datadir_timeline =
|
||||
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
|
||||
|
||||
// TODO leave clean state on error. For now you can use detach to clean
|
||||
// up broken state from a failed import.
|
||||
|
||||
// Import wal provided via CopyData
|
||||
info!("importing wal");
|
||||
pgb.write_message(&BeMessage::CopyInResponse)?;
|
||||
let reader = CopyInReader::new(pgb);
|
||||
import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?;
|
||||
|
||||
// TODO Does it make sense to overshoot?
|
||||
ensure!(datadir_timeline.tline.get_last_record_lsn() >= end_lsn);
|
||||
|
||||
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
|
||||
// We only want to persist the data, and it doesn't matter if it's in the
|
||||
// shape of deltas or images.
|
||||
info!("flushing layers");
|
||||
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
|
||||
|
||||
info!("done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function to handle the LSN from client request.
|
||||
///
|
||||
/// Each GetPage (and Exists and Nblocks) request includes information about
|
||||
@@ -593,7 +776,7 @@ impl PageServerHandler {
|
||||
/* Send a tarball of the latest layer on the timeline */
|
||||
{
|
||||
let mut writer = CopyDataSink { pgb };
|
||||
let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
|
||||
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
|
||||
span.record("lsn", &basebackup.lsn.to_string().as_str());
|
||||
basebackup.send_tarball()?;
|
||||
}
|
||||
@@ -718,6 +901,57 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?;
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("import basebackup ") {
|
||||
// Import the `base` section (everything but the wal) of a basebackup.
|
||||
// Assumes the tenant already exists on this pageserver.
|
||||
//
|
||||
// Files are scheduled to be persisted to remote storage, and the
|
||||
// caller should poll the http api to check when that is done.
|
||||
//
|
||||
// Example import command:
|
||||
// 1. Get start/end LSN from backup_manifest file
|
||||
// 2. Run:
|
||||
// cat my_backup/base.tar | psql -h $PAGESERVER \
|
||||
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN"
|
||||
let (_, params_raw) = query_string.split_at("import basebackup ".len());
|
||||
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||
ensure!(params.len() == 4);
|
||||
let tenant = ZTenantId::from_str(params[0])?;
|
||||
let timeline = ZTimelineId::from_str(params[1])?;
|
||||
let base_lsn = Lsn::from_str(params[2])?;
|
||||
let end_lsn = Lsn::from_str(params[3])?;
|
||||
|
||||
self.check_permission(Some(tenant))?;
|
||||
|
||||
match self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn) {
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Err(e) => {
|
||||
error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
|
||||
}
|
||||
};
|
||||
} else if query_string.starts_with("import wal ") {
|
||||
// Import the `pg_wal` section of a basebackup.
|
||||
//
|
||||
// Files are scheduled to be persisted to remote storage, and the
|
||||
// caller should poll the http api to check when that is done.
|
||||
let (_, params_raw) = query_string.split_at("import wal ".len());
|
||||
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||
ensure!(params.len() == 4);
|
||||
let tenant = ZTenantId::from_str(params[0])?;
|
||||
let timeline = ZTimelineId::from_str(params[1])?;
|
||||
let start_lsn = Lsn::from_str(params[2])?;
|
||||
let end_lsn = Lsn::from_str(params[3])?;
|
||||
|
||||
self.check_permission(Some(tenant))?;
|
||||
|
||||
match self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn) {
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Err(e) => {
|
||||
error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?
|
||||
}
|
||||
};
|
||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
// on connect
|
||||
@@ -898,6 +1132,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
|
||||
LsnForTimestamp::Future(_lsn) => "future".into(),
|
||||
LsnForTimestamp::Past(_lsn) => "past".into(),
|
||||
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
|
||||
};
|
||||
pgb.write_message_noflush(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
|
||||
@@ -51,6 +51,7 @@ pub enum LsnForTimestamp {
|
||||
Present(Lsn),
|
||||
Future(Lsn),
|
||||
Past(Lsn),
|
||||
NoData(Lsn),
|
||||
}
|
||||
|
||||
impl<R: Repository> DatadirTimeline<R> {
|
||||
@@ -250,7 +251,7 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
(false, false) => {
|
||||
// This can happen if no commit records have been processed yet, e.g.
|
||||
// just after importing a cluster.
|
||||
bail!("no commit timestamps found");
|
||||
Ok(LsnForTimestamp::NoData(max_lsn))
|
||||
}
|
||||
(true, false) => {
|
||||
// Didn't find any commit timestamps larger than the request
|
||||
@@ -749,6 +750,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
}
|
||||
|
||||
/// Extend relation
|
||||
/// If new size is smaller, do nothing.
|
||||
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
|
||||
@@ -756,10 +758,13 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
// only extend relation here. never decrease the size
|
||||
if nblocks > old_size {
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
self.pending_nblocks += nblocks as isize - old_size as isize;
|
||||
self.pending_nblocks += nblocks as isize - old_size as isize;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -879,6 +884,57 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Flush changes accumulated so far to the underlying repository.
|
||||
///
|
||||
/// Usually, changes made in DatadirModification are atomic, but this allows
|
||||
/// you to flush them to the underlying repository before the final `commit`.
|
||||
/// That allows to free up the memory used to hold the pending changes.
|
||||
///
|
||||
/// Currently only used during bulk import of a data directory. In that
|
||||
/// context, breaking the atomicity is OK. If the import is interrupted, the
|
||||
/// whole import fails and the timeline will be deleted anyway.
|
||||
/// (Or to be precise, it will be left behind for debugging purposes and
|
||||
/// ignored, see https://github.com/neondatabase/neon/pull/1809)
|
||||
///
|
||||
/// Note: A consequence of flushing the pending operations is that they
|
||||
/// won't be visible to subsequent operations until `commit`. The function
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
if pending_nblocks < 10000 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let writer = self.tline.tline.writer();
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
result?;
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
self.tline.current_logical_size.fetch_add(
|
||||
pending_nblocks * pg_constants::BLCKSZ as isize,
|
||||
Ordering::SeqCst,
|
||||
);
|
||||
self.pending_nblocks = 0;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish this atomic update, writing all the updated keys to the
|
||||
/// underlying timeline.
|
||||
@@ -889,7 +945,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
|
||||
for (key, value) in self.pending_updates {
|
||||
writer.put(key, self.lsn, value)?;
|
||||
writer.put(key, self.lsn, &value)?;
|
||||
}
|
||||
for key_range in self.pending_deletions {
|
||||
writer.delete(key_range.clone(), self.lsn)?;
|
||||
@@ -1294,6 +1350,10 @@ pub fn key_to_rel_block(key: Key) -> Result<(RelTag, BlockNumber)> {
|
||||
})
|
||||
}
|
||||
|
||||
fn is_rel_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0
|
||||
}
|
||||
|
||||
pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x01 => {
|
||||
@@ -1312,6 +1372,12 @@ pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> {
|
||||
})
|
||||
}
|
||||
|
||||
fn is_slru_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x01 // SLRU-related
|
||||
&& key.field3 == 0x00000001 // but not SlruDir
|
||||
&& key.field6 != 0xffffffff // and not SlruSegSize
|
||||
}
|
||||
|
||||
//
|
||||
//-- Tests that should work the same with any Repository/Timeline implementation.
|
||||
//
|
||||
|
||||
@@ -406,7 +406,7 @@ pub trait TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
fn put(&self, key: Key, lsn: Lsn, value: Value) -> Result<()>;
|
||||
fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()>;
|
||||
|
||||
fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()>;
|
||||
|
||||
@@ -616,12 +616,12 @@ mod tests {
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x20), Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
@@ -655,13 +655,13 @@ mod tests {
|
||||
let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
|
||||
|
||||
// Insert a value on the timeline
|
||||
writer.put(TEST_KEY_A, Lsn(0x20), test_value("foo at 0x20"))?;
|
||||
writer.put(TEST_KEY_B, Lsn(0x20), test_value("foobar at 0x20"))?;
|
||||
writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?;
|
||||
writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
|
||||
writer.put(TEST_KEY_A, Lsn(0x30), test_value("foo at 0x30"))?;
|
||||
writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
writer.put(TEST_KEY_A, Lsn(0x40), test_value("foo at 0x40"))?;
|
||||
writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
|
||||
//assert_current_logical_size(&tline, Lsn(0x40));
|
||||
@@ -672,7 +672,7 @@ mod tests {
|
||||
.get_timeline_load(NEW_TIMELINE_ID)
|
||||
.expect("Should have a local timeline");
|
||||
let new_writer = newtline.writer();
|
||||
new_writer.put(TEST_KEY_A, Lsn(0x40), test_value("bar at 0x40"))?;
|
||||
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
|
||||
new_writer.finish_write(Lsn(0x40));
|
||||
|
||||
// Check page contents on both branches
|
||||
@@ -703,14 +703,14 @@ mod tests {
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
@@ -721,14 +721,14 @@ mod tests {
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer.finish_write(lsn);
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ mod tests {
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let () = waiter.await?;
|
||||
waiter.await?;
|
||||
notifier.await?
|
||||
}
|
||||
}
|
||||
|
||||
104
test_runner/batch_others/test_import.py
Normal file
104
test_runner/batch_others/test_import.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import pytest
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder, wait_for_upload, wait_for_last_record_lsn
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from uuid import UUID, uuid4
|
||||
import tarfile
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import json
|
||||
from fixtures.utils import subprocess_capture
|
||||
from fixtures.log_helper import log
|
||||
from contextlib import closing
|
||||
from fixtures.zenith_fixtures import pg_distrib_dir
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, zenith_env_builder):
|
||||
# Put data in vanilla pg
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user zenith_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql('''create table t as select 'long string to consume some space' || g
|
||||
from generate_series(1,300000) g''')
|
||||
assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )]
|
||||
|
||||
# Take basebackup
|
||||
basebackup_dir = os.path.join(test_output_dir, "basebackup")
|
||||
base_tar = os.path.join(basebackup_dir, "base.tar")
|
||||
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
|
||||
os.mkdir(basebackup_dir)
|
||||
vanilla_pg.safe_psql("CHECKPOINT")
|
||||
pg_bin.run([
|
||||
"pg_basebackup",
|
||||
"-F",
|
||||
"tar",
|
||||
"-d",
|
||||
vanilla_pg.connstr(),
|
||||
"-D",
|
||||
basebackup_dir,
|
||||
])
|
||||
|
||||
# Make corrupt base tar with missing pg_control
|
||||
unpacked_base = os.path.join(basebackup_dir, "unpacked-base")
|
||||
corrupt_base_tar = os.path.join(unpacked_base, "corrupt-base.tar")
|
||||
os.mkdir(unpacked_base, 0o750)
|
||||
subprocess_capture(str(test_output_dir), ["tar", "-xf", base_tar, "-C", unpacked_base])
|
||||
os.remove(os.path.join(unpacked_base, "global/pg_control"))
|
||||
subprocess_capture(str(test_output_dir),
|
||||
["tar", "-cf", "corrupt-base.tar"] + os.listdir(unpacked_base),
|
||||
cwd=unpacked_base)
|
||||
|
||||
# Get start_lsn and end_lsn
|
||||
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
|
||||
manifest = json.load(f)
|
||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
||||
|
||||
node_name = "import_from_vanilla"
|
||||
tenant = uuid4()
|
||||
timeline = uuid4()
|
||||
|
||||
# Set up pageserver for import
|
||||
zenith_env_builder.enable_local_fs_remote_storage()
|
||||
env = zenith_env_builder.init_start()
|
||||
env.pageserver.http_client().tenant_create(tenant)
|
||||
|
||||
def import_tar(base, wal):
|
||||
env.zenith_cli.raw_cli([
|
||||
"timeline",
|
||||
"import",
|
||||
"--tenant-id",
|
||||
tenant.hex,
|
||||
"--timeline-id",
|
||||
timeline.hex,
|
||||
"--node-name",
|
||||
node_name,
|
||||
"--base-lsn",
|
||||
start_lsn,
|
||||
"--base-tarfile",
|
||||
base,
|
||||
"--end-lsn",
|
||||
end_lsn,
|
||||
"--wal-tarfile",
|
||||
wal,
|
||||
])
|
||||
|
||||
# Importing corrupt backup fails
|
||||
with pytest.raises(Exception):
|
||||
import_tar(corrupt_base_tar, wal_tar)
|
||||
|
||||
# Clean up
|
||||
# TODO it should clean itself
|
||||
client = env.pageserver.http_client()
|
||||
client.timeline_detach(tenant, timeline)
|
||||
|
||||
# Importing correct backup works
|
||||
import_tar(base_tar, wal_tar)
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(end_lsn))
|
||||
wait_for_upload(client, tenant, timeline, lsn_from_hex(end_lsn))
|
||||
|
||||
# Check it worked
|
||||
pg = env.postgres.create_start(node_name, tenant_id=tenant)
|
||||
assert pg.safe_psql('select count(*) from t') == [(300000, )]
|
||||
@@ -1349,12 +1349,12 @@ class VanillaPostgres(PgProtocol):
|
||||
if log_path is None:
|
||||
log_path = os.path.join(self.pgdatadir, "pg.log")
|
||||
|
||||
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, '-l', log_path, 'start'])
|
||||
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, '-l', log_path, 'start'])
|
||||
|
||||
def stop(self):
|
||||
assert self.running
|
||||
self.running = False
|
||||
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, 'stop'])
|
||||
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, 'stop'])
|
||||
|
||||
def get_subdir_size(self, subdir) -> int:
|
||||
"""Return size of pgdatadir subdirectory in bytes."""
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 038b2b98e5...dba273190e
Reference in New Issue
Block a user