mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-29 16:20:37 +00:00
Compare commits
32 Commits
dkr/toggle
...
hotfix_pub
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26e1e430c0 | ||
|
|
f3aac81d19 | ||
|
|
2709878b8b | ||
|
|
39e4bdb99e | ||
|
|
52e75fead9 | ||
|
|
a347d2b6ac | ||
|
|
fc4ea3553e | ||
|
|
cca1ace651 | ||
|
|
30984c163c | ||
|
|
7404777efc | ||
|
|
eb1bdcc6cf | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
31
.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml
vendored
Normal file
31
.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-staging.local/management/api/v2"
|
||||
domain: "*.us-east-2.aws.neon.build"
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: dev
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.build
|
||||
|
||||
#metrics:
|
||||
# enabled: true
|
||||
# serviceMonitor:
|
||||
# enabled: true
|
||||
# selector:
|
||||
# release: kube-prometheus-stack
|
||||
70
.github/workflows/build_and_test.yml
vendored
70
.github/workflows/build_and_test.yml
vendored
@@ -481,6 +481,7 @@ jobs:
|
||||
|
||||
neon-image:
|
||||
runs-on: dev
|
||||
needs: [ tag ]
|
||||
container: gcr.io/kaniko-project/executor:v1.9.0-debug
|
||||
|
||||
steps:
|
||||
@@ -494,10 +495,11 @@ jobs:
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
|
||||
- name: Kaniko build neon
|
||||
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
|
||||
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
compute-tools-image:
|
||||
runs-on: dev
|
||||
needs: [ tag ]
|
||||
container: gcr.io/kaniko-project/executor:v1.9.0-debug
|
||||
|
||||
steps:
|
||||
@@ -508,11 +510,12 @@ jobs:
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
|
||||
- name: Kaniko build compute tools
|
||||
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
|
||||
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
compute-node-image:
|
||||
runs-on: dev
|
||||
container: gcr.io/kaniko-project/executor:v1.9.0-debug
|
||||
needs: [ tag ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
@@ -527,11 +530,12 @@ jobs:
|
||||
# cloud repo depends on this image name, thus duplicating it
|
||||
# remove compute-node when cloud repo is updated
|
||||
- name: Kaniko build compute node with extensions v14 (compatibility)
|
||||
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
|
||||
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
compute-node-image-v14:
|
||||
runs-on: dev
|
||||
container: gcr.io/kaniko-project/executor:v1.9.0-debug
|
||||
needs: [ tag ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
@@ -543,12 +547,13 @@ jobs:
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
|
||||
- name: Kaniko build compute node with extensions v14
|
||||
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID
|
||||
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
|
||||
compute-node-image-v15:
|
||||
runs-on: dev
|
||||
container: gcr.io/kaniko-project/executor:v1.9.0-debug
|
||||
needs: [ tag ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1 # v3 won't work with kaniko
|
||||
@@ -560,11 +565,11 @@ jobs:
|
||||
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
|
||||
|
||||
- name: Kaniko build compute node with extensions v15
|
||||
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID
|
||||
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
promote-images:
|
||||
runs-on: dev
|
||||
needs: [ neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
|
||||
needs: [ tag, neon-image, compute-node-image, compute-node-image-v14, compute-node-image-v15, compute-tools-image ]
|
||||
if: github.event_name != 'workflow_dispatch'
|
||||
container: amazon/aws-cli
|
||||
strategy:
|
||||
@@ -577,8 +582,9 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Promote image to latest
|
||||
run:
|
||||
MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=$GITHUB_RUN_ID --query 'images[].imageManifest' --output text) && aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
|
||||
run: |
|
||||
export MANIFEST=$(aws ecr batch-get-image --repository-name ${{ matrix.name }} --image-ids imageTag=${{needs.tag.outputs.build-tag}} --query 'images[].imageManifest' --output text)
|
||||
aws ecr put-image --repository-name ${{ matrix.name }} --image-tag latest --image-manifest "$MANIFEST"
|
||||
|
||||
push-docker-hub:
|
||||
runs-on: dev
|
||||
@@ -597,19 +603,19 @@ jobs:
|
||||
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
|
||||
|
||||
- name: Pull neon image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:latest neon
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} neon
|
||||
|
||||
- name: Pull compute tools image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest compute-tools
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} compute-tools
|
||||
|
||||
- name: Pull compute node image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:latest compute-node
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}} compute-node
|
||||
|
||||
- name: Pull compute node v14 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest compute-node-v14
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} compute-node-v14
|
||||
|
||||
- name: Pull compute node v15 image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest compute-node-v15
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} compute-node-v15
|
||||
|
||||
- name: Pull rust image from ECR
|
||||
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
|
||||
@@ -619,11 +625,11 @@ jobs:
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v14:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v15:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v14:latest
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node-v15:latest
|
||||
|
||||
- name: Configure Docker Hub login
|
||||
run: |
|
||||
@@ -819,3 +825,31 @@ jobs:
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
deploy-proxy-new:
|
||||
runs-on: dev
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
|
||||
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
aws --region us-east-2 eks update-kubeconfig --name dev-us-east-2-beta --role-arn arn:aws:iam::369495373322:role/github-runner
|
||||
|
||||
- name: Re-deploy proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
@@ -425,7 +425,28 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
|
||||
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
|
||||
// This is needed since postgres 15, where this privilege is removed by default.
|
||||
let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string();
|
||||
let grant_query = "DO $$\n\
|
||||
BEGIN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT nspname\n\
|
||||
FROM pg_catalog.pg_namespace\n\
|
||||
WHERE nspname = 'public'\n\
|
||||
) AND\n\
|
||||
version() LIKE 'PostgreSQL 15%'\n\
|
||||
THEN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT rolname\n\
|
||||
FROM pg_catalog.pg_roles\n\
|
||||
WHERE rolname = 'web_access'\n\
|
||||
)\n\
|
||||
THEN\n\
|
||||
GRANT CREATE ON SCHEMA public TO web_access;\n\
|
||||
END IF;\n\
|
||||
END IF;\n\
|
||||
END\n\
|
||||
$$;"
|
||||
.to_string();
|
||||
|
||||
info!("grant query for db {} : {}", &db.name, &grant_query);
|
||||
db_client.simple_query(&grant_query)?;
|
||||
}
|
||||
|
||||
@@ -183,18 +183,18 @@ impl PostgresNode {
|
||||
}
|
||||
|
||||
fn sync_safekeepers(&self, auth_token: &Option<String>, pg_version: u32) -> Result<Lsn> {
|
||||
let pg_path = self.env.pg_bin_dir(pg_version).join("postgres");
|
||||
let pg_path = self.env.pg_bin_dir(pg_version)?.join("postgres");
|
||||
let mut cmd = Command::new(&pg_path);
|
||||
|
||||
cmd.arg("--sync-safekeepers")
|
||||
.env_clear()
|
||||
.env(
|
||||
"LD_LIBRARY_PATH",
|
||||
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
|
||||
self.env.pg_lib_dir(pg_version)?.to_str().unwrap(),
|
||||
)
|
||||
.env(
|
||||
"DYLD_LIBRARY_PATH",
|
||||
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
|
||||
self.env.pg_lib_dir(pg_version)?.to_str().unwrap(),
|
||||
)
|
||||
.env("PGDATA", self.pgdata().to_str().unwrap())
|
||||
.stdout(Stdio::piped())
|
||||
@@ -422,7 +422,7 @@ impl PostgresNode {
|
||||
}
|
||||
|
||||
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
|
||||
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version).join("pg_ctl");
|
||||
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
|
||||
let mut cmd = Command::new(pg_ctl_path);
|
||||
cmd.args(
|
||||
[
|
||||
@@ -440,11 +440,11 @@ impl PostgresNode {
|
||||
.env_clear()
|
||||
.env(
|
||||
"LD_LIBRARY_PATH",
|
||||
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
|
||||
self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
|
||||
)
|
||||
.env(
|
||||
"DYLD_LIBRARY_PATH",
|
||||
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
|
||||
self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
|
||||
);
|
||||
if let Some(token) = auth_token {
|
||||
cmd.env("ZENITH_AUTH_TOKEN", token);
|
||||
|
||||
@@ -201,28 +201,28 @@ impl LocalEnv {
|
||||
self.pg_distrib_dir.clone()
|
||||
}
|
||||
|
||||
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
|
||||
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
let path = self.pg_distrib_dir.clone();
|
||||
|
||||
match pg_version {
|
||||
14 => path.join(format!("v{pg_version}")),
|
||||
15 => path.join(format!("v{pg_version}")),
|
||||
_ => panic!("Unsupported postgres version: {}", pg_version),
|
||||
14 => Ok(path.join(format!("v{pg_version}"))),
|
||||
15 => Ok(path.join(format!("v{pg_version}"))),
|
||||
_ => bail!("Unsupported postgres version: {}", pg_version),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
|
||||
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
match pg_version {
|
||||
14 => self.pg_distrib_dir(pg_version).join("bin"),
|
||||
15 => self.pg_distrib_dir(pg_version).join("bin"),
|
||||
_ => panic!("Unsupported postgres version: {}", pg_version),
|
||||
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
|
||||
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
|
||||
_ => bail!("Unsupported postgres version: {}", pg_version),
|
||||
}
|
||||
}
|
||||
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
|
||||
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
match pg_version {
|
||||
14 => self.pg_distrib_dir(pg_version).join("lib"),
|
||||
15 => self.pg_distrib_dir(pg_version).join("lib"),
|
||||
_ => panic!("Unsupported postgres version: {}", pg_version),
|
||||
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
|
||||
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
|
||||
_ => bail!("Unsupported postgres version: {}", pg_version),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -422,10 +422,10 @@ impl LocalEnv {
|
||||
"directory '{}' already exists. Perhaps already initialized?",
|
||||
base_path.display()
|
||||
);
|
||||
if !self.pg_bin_dir(pg_version).join("postgres").exists() {
|
||||
if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
|
||||
bail!(
|
||||
"Can't find postgres binary at {}",
|
||||
self.pg_bin_dir(pg_version).display()
|
||||
self.pg_bin_dir(pg_version)?.display()
|
||||
);
|
||||
}
|
||||
for binary in ["pageserver", "safekeeper"] {
|
||||
|
||||
@@ -37,22 +37,22 @@ pub static REQUIRED_POSTGRES_CONFIG: Lazy<Vec<&'static str>> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
impl Conf {
|
||||
pub fn pg_distrib_dir(&self) -> PathBuf {
|
||||
pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
|
||||
let path = self.pg_distrib_dir.clone();
|
||||
|
||||
match self.pg_version {
|
||||
14 => path.join(format!("v{}", self.pg_version)),
|
||||
15 => path.join(format!("v{}", self.pg_version)),
|
||||
_ => panic!("Unsupported postgres version: {}", self.pg_version),
|
||||
14 => Ok(path.join(format!("v{}", self.pg_version))),
|
||||
15 => Ok(path.join(format!("v{}", self.pg_version))),
|
||||
_ => bail!("Unsupported postgres version: {}", self.pg_version),
|
||||
}
|
||||
}
|
||||
|
||||
fn pg_bin_dir(&self) -> PathBuf {
|
||||
self.pg_distrib_dir().join("bin")
|
||||
fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
|
||||
Ok(self.pg_distrib_dir()?.join("bin"))
|
||||
}
|
||||
|
||||
fn pg_lib_dir(&self) -> PathBuf {
|
||||
self.pg_distrib_dir().join("lib")
|
||||
fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
|
||||
Ok(self.pg_distrib_dir()?.join("lib"))
|
||||
}
|
||||
|
||||
pub fn wal_dir(&self) -> PathBuf {
|
||||
@@ -60,12 +60,12 @@ impl Conf {
|
||||
}
|
||||
|
||||
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
|
||||
let path = self.pg_bin_dir().join(command);
|
||||
let path = self.pg_bin_dir()?.join(command);
|
||||
ensure!(path.exists(), "Command {:?} does not exist", path);
|
||||
let mut cmd = Command::new(path);
|
||||
cmd.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.pg_lib_dir())
|
||||
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir());
|
||||
.env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
|
||||
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
|
||||
Ok(cmd)
|
||||
}
|
||||
|
||||
|
||||
@@ -387,28 +387,28 @@ impl PageServerConf {
|
||||
//
|
||||
// Postgres distribution paths
|
||||
//
|
||||
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
|
||||
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
let path = self.pg_distrib_dir.clone();
|
||||
|
||||
match pg_version {
|
||||
14 => path.join(format!("v{pg_version}")),
|
||||
15 => path.join(format!("v{pg_version}")),
|
||||
_ => panic!("Unsupported postgres version: {}", pg_version),
|
||||
14 => Ok(path.join(format!("v{pg_version}"))),
|
||||
15 => Ok(path.join(format!("v{pg_version}"))),
|
||||
_ => bail!("Unsupported postgres version: {}", pg_version),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
|
||||
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
match pg_version {
|
||||
14 => self.pg_distrib_dir(pg_version).join("bin"),
|
||||
15 => self.pg_distrib_dir(pg_version).join("bin"),
|
||||
_ => panic!("Unsupported postgres version: {}", pg_version),
|
||||
14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
|
||||
15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
|
||||
_ => bail!("Unsupported postgres version: {}", pg_version),
|
||||
}
|
||||
}
|
||||
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
|
||||
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
|
||||
match pg_version {
|
||||
14 => self.pg_distrib_dir(pg_version).join("lib"),
|
||||
15 => self.pg_distrib_dir(pg_version).join("lib"),
|
||||
_ => panic!("Unsupported postgres version: {}", pg_version),
|
||||
14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
|
||||
15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
|
||||
_ => bail!("Unsupported postgres version: {}", pg_version),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,8 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
||||
|
||||
pub const LOG_FILE_NAME: &str = "pageserver.log";
|
||||
|
||||
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
/// Config for the Repository checkpointer
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum CheckpointConfig {
|
||||
|
||||
@@ -1373,6 +1373,17 @@ fn is_rel_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0
|
||||
}
|
||||
|
||||
pub fn is_rel_fsm_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
pub fn is_rel_vm_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00
|
||||
&& key.field4 != 0
|
||||
&& key.field5 == VISIBILITYMAP_FORKNUM
|
||||
&& key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x01 => {
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
//! parent timeline, and the last LSN that has been written to disk.
|
||||
//!
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use tokio::sync::watch;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -25,7 +25,6 @@ use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
@@ -191,9 +190,7 @@ impl UninitializedTimeline<'_> {
|
||||
)
|
||||
})?;
|
||||
v.insert(Arc::clone(&new_timeline));
|
||||
new_timeline.launch_wal_receiver().with_context(|| {
|
||||
format!("Failed to launch walreceiver for timeline {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
new_timeline.launch_wal_receiver();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,7 +291,7 @@ impl TimelineUninitMark {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_mark_file_if_present(&mut self) -> Result<(), anyhow::Error> {
|
||||
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
|
||||
let uninit_mark_file = &self.uninit_mark_path;
|
||||
let uninit_mark_parent = uninit_mark_file
|
||||
.parent()
|
||||
@@ -472,7 +469,7 @@ impl Tenant {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let timeline_str = target_timeline_id
|
||||
.map(|x| x.to_string())
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
@@ -488,7 +485,7 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub fn compaction_iteration(&self) -> Result<()> {
|
||||
pub fn compaction_iteration(&self) -> anyhow::Result<()> {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// compactions. We don't want to block everything else while the
|
||||
@@ -512,7 +509,7 @@ impl Tenant {
|
||||
///
|
||||
/// Used at graceful shutdown.
|
||||
///
|
||||
pub fn checkpoint(&self) -> Result<()> {
|
||||
pub fn checkpoint(&self) -> anyhow::Result<()> {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// checkpoints. We don't want to block everything else while the
|
||||
@@ -683,7 +680,7 @@ impl Tenant {
|
||||
/// before the children.
|
||||
fn tree_sort_timelines(
|
||||
timelines: HashMap<TimelineId, TimelineMetadata>,
|
||||
) -> Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
let mut result = Vec::with_capacity(timelines.len());
|
||||
|
||||
let mut now = Vec::with_capacity(timelines.len());
|
||||
@@ -786,27 +783,6 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.walreceiver_connect_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
|
||||
}
|
||||
|
||||
pub fn get_lagging_wal_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
|
||||
}
|
||||
|
||||
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||
}
|
||||
@@ -838,7 +814,7 @@ impl Tenant {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
@@ -861,7 +837,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Locate and load config
|
||||
pub fn load_tenant_config(
|
||||
pub(super) fn load_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<TenantConfOpt> {
|
||||
@@ -903,7 +879,7 @@ impl Tenant {
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
|
||||
pub fn persist_tenant_config(
|
||||
pub(super) fn persist_tenant_config(
|
||||
target_config_path: &Path,
|
||||
tenant_conf: TenantConfOpt,
|
||||
first_save: bool,
|
||||
@@ -996,7 +972,7 @@ impl Tenant {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let mut totals: GcResult = Default::default();
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -1413,9 +1389,9 @@ fn run_initdb(
|
||||
conf: &'static PageServerConf,
|
||||
initdb_target_dir: &Path,
|
||||
pg_version: u32,
|
||||
) -> Result<()> {
|
||||
let initdb_bin_path = conf.pg_bin_dir(pg_version).join("initdb");
|
||||
let initdb_lib_dir = conf.pg_lib_dir(pg_version);
|
||||
) -> anyhow::Result<()> {
|
||||
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
|
||||
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
|
||||
info!(
|
||||
"running {} in {}, libdir: {}",
|
||||
initdb_bin_path.display(),
|
||||
@@ -1459,7 +1435,7 @@ impl Drop for Tenant {
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
|
||||
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
|
||||
use std::os::unix::fs::FileExt;
|
||||
|
||||
// All layer files start with a two-byte "magic" value, to identify the kind of
|
||||
@@ -1564,13 +1540,13 @@ pub mod harness {
|
||||
}
|
||||
|
||||
impl<'a> TenantHarness<'a> {
|
||||
pub fn create(test_name: &'static str) -> Result<Self> {
|
||||
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
Self::create_internal(test_name, false)
|
||||
}
|
||||
pub fn create_exclusive(test_name: &'static str) -> Result<Self> {
|
||||
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
Self::create_internal(test_name, true)
|
||||
}
|
||||
fn create_internal(test_name: &'static str, exclusive: bool) -> Result<Self> {
|
||||
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
|
||||
let lock_guard = if exclusive {
|
||||
(None, Some(LOCK.write().unwrap()))
|
||||
} else {
|
||||
@@ -1604,7 +1580,7 @@ pub mod harness {
|
||||
self.try_load().expect("failed to load test tenant")
|
||||
}
|
||||
|
||||
pub fn try_load(&self) -> Result<Tenant> {
|
||||
pub fn try_load(&self) -> anyhow::Result<Tenant> {
|
||||
let walredo_mgr = Arc::new(TestRedoManager);
|
||||
|
||||
let tenant = Tenant::new(
|
||||
@@ -1684,7 +1660,7 @@ pub mod harness {
|
||||
},
|
||||
records.len()
|
||||
);
|
||||
println!("{}", s);
|
||||
println!("{s}");
|
||||
|
||||
Ok(TEST_IMG(&s))
|
||||
}
|
||||
@@ -1708,7 +1684,7 @@ mod tests {
|
||||
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
|
||||
|
||||
#[test]
|
||||
fn test_basic() -> Result<()> {
|
||||
fn test_basic() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_basic")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1732,7 +1708,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_duplicate_timelines() -> Result<()> {
|
||||
fn no_duplicate_timelines() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
|
||||
let _ = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1763,7 +1739,7 @@ mod tests {
|
||||
/// Test branch creation
|
||||
///
|
||||
#[test]
|
||||
fn test_branch() -> Result<()> {
|
||||
fn test_branch() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_branch")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1816,7 +1792,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> {
|
||||
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut lsn = start_lsn;
|
||||
#[allow(non_snake_case)]
|
||||
{
|
||||
@@ -1858,7 +1834,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
|
||||
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load();
|
||||
@@ -1890,7 +1866,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
|
||||
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
|
||||
|
||||
@@ -1917,7 +1893,7 @@ mod tests {
|
||||
// FIXME: This currently fails to error out. Calling GC doesn't currently
|
||||
// remove the old value, we'd need to work a little harder
|
||||
#[test]
|
||||
fn test_prohibit_get_for_garbage_collected_data() -> Result<()> {
|
||||
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let repo =
|
||||
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
|
||||
.load();
|
||||
@@ -1937,7 +1913,7 @@ mod tests {
|
||||
*/
|
||||
|
||||
#[test]
|
||||
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
|
||||
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
|
||||
let tline = tenant
|
||||
@@ -1956,7 +1932,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
#[test]
|
||||
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
|
||||
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
|
||||
let tline = tenant
|
||||
@@ -1984,7 +1960,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timeline_load() -> Result<()> {
|
||||
fn timeline_load() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "timeline_load";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
{
|
||||
@@ -2005,7 +1981,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timeline_load_with_ancestor() -> Result<()> {
|
||||
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "timeline_load_with_ancestor";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
// create two timelines
|
||||
@@ -2044,7 +2020,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_metadata() -> Result<()> {
|
||||
fn corrupt_metadata() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "corrupt_metadata";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let tenant = harness.load();
|
||||
@@ -2086,7 +2062,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_images() -> Result<()> {
|
||||
fn test_images() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_images")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2138,7 +2114,7 @@ mod tests {
|
||||
// repeat 50 times.
|
||||
//
|
||||
#[test]
|
||||
fn test_bulk_insert() -> Result<()> {
|
||||
fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_bulk_insert")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2180,7 +2156,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_random_updates() -> Result<()> {
|
||||
fn test_random_updates() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_random_updates")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2252,7 +2228,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_traverse_branches() -> Result<()> {
|
||||
fn test_traverse_branches() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_traverse_branches")?.load();
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2333,7 +2309,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_traverse_ancestors() -> Result<()> {
|
||||
fn test_traverse_ancestors() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//!
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
@@ -34,6 +34,7 @@ use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::pgdatadir_mapping::BlockNumber;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
use crate::reltag::RelTag;
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
|
||||
@@ -52,6 +53,7 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::walreceiver::{is_etcd_client_initialized, spawn_connection_manager_task};
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::CheckpointConfig;
|
||||
use crate::ZERO_PAGE;
|
||||
use crate::{
|
||||
page_cache,
|
||||
storage_sync::{self, index::LayerFileMetadata},
|
||||
@@ -305,10 +307,6 @@ pub struct GcInfo {
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
//------------------------------------------------------------------------------
|
||||
// Public GET functions
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Get the LSN where this branch was created
|
||||
pub fn get_ancestor_lsn(&self) -> Lsn {
|
||||
self.ancestor_lsn
|
||||
@@ -443,7 +441,7 @@ impl Timeline {
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
|
||||
@@ -453,12 +451,6 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Public PUT functions, to update the repository with new page versions.
|
||||
//
|
||||
// These are called by the WAL receiver to digest WAL records.
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
@@ -477,6 +469,91 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compact(&self) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timelie was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
// currently in-use key space. The goal is to partition the
|
||||
// key space into roughly fixed-size chunks, but also take into
|
||||
// account any existing image layers, and try to align the
|
||||
// chunk boundaries with the existing image layers to avoid
|
||||
// too much churn. Also try to align chunk boundaries with
|
||||
// relation boundaries. In principle, we don't know about
|
||||
// relation boundaries here, we just deal with key-value
|
||||
// pairs, and the code in pgdatadir_mapping.rs knows how to
|
||||
// map relations into key-value pairs. But in practice we know
|
||||
// that 'field6' is the block number, and the fields 1-5
|
||||
// identify a relation. This is just an optimization,
|
||||
// though.
|
||||
//
|
||||
// 2. Once we know the partitioning, for each partition,
|
||||
// decide if it's time to create a new image layer. The
|
||||
// criteria is: there has been too much "churn" since the last
|
||||
// image layer? The "churn" is fuzzy concept, it's a
|
||||
// combination of too many delta files, or too much WAL in
|
||||
// total in the delta file. Or perhaps: if creating an image
|
||||
// file would allow to delete some older files.
|
||||
//
|
||||
// 3. After that, we compact all level0 delta files if there
|
||||
// are too many of them. While compacting, we also garbage
|
||||
// collect any page versions that are no longer needed because
|
||||
// of the new image layers we created in step 2.
|
||||
//
|
||||
// TODO: This high level strategy hasn't been implemented yet.
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
match self.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
) {
|
||||
Ok((partitioning, lsn)) => {
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
|
||||
if !layer_paths_to_upload.is_empty()
|
||||
&& self.upload_layers.load(atomic::Ordering::Relaxed)
|
||||
{
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
@@ -484,6 +561,80 @@ impl Timeline {
|
||||
_write_guard: self.write_lock.lock().unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
}
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a task to flush the frozen layer to disk, unless
|
||||
// a task was already running. (If the task was running
|
||||
// at the time that we froze the layer, it must've seen the
|
||||
// the layer we just froze before it exited; see comments
|
||||
// in flush_frozen_layers())
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush task",
|
||||
false,
|
||||
async move { self_clone.flush_frozen_layers(false) },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Private functions
|
||||
@@ -527,7 +678,7 @@ impl Timeline {
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
metadata: TimelineMetadata,
|
||||
@@ -600,11 +751,11 @@ impl Timeline {
|
||||
result
|
||||
}
|
||||
|
||||
pub fn launch_wal_receiver(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
|
||||
if !is_etcd_client_initialized() {
|
||||
if cfg!(test) {
|
||||
info!("not launching WAL receiver because etcd client hasn't been initialized");
|
||||
return Ok(());
|
||||
return;
|
||||
} else {
|
||||
panic!("etcd client not initialized");
|
||||
}
|
||||
@@ -632,16 +783,14 @@ impl Timeline {
|
||||
walreceiver_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
);
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -727,30 +876,12 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
|
||||
pub(super) fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
|
||||
self.layer_removal_cs
|
||||
.try_lock()
|
||||
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
}
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
|
||||
let timeline_id = self.timeline_id;
|
||||
|
||||
@@ -971,7 +1102,7 @@ impl Timeline {
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> Result<Arc<Timeline>> {
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
|
||||
format!(
|
||||
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
|
||||
@@ -1030,14 +1161,14 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_tombstone(key_range, lsn)?;
|
||||
|
||||
@@ -1076,64 +1207,6 @@ impl Timeline {
|
||||
drop(layers);
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
///
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a task to flush the frozen layer to disk, unless
|
||||
// a task was already running. (If the task was running
|
||||
// at the time that we froze the layer, it must've seen the
|
||||
// the layer we just froze before it exited; see comments
|
||||
// in flush_frozen_layers())
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush task",
|
||||
false,
|
||||
async move { self_clone.flush_frozen_layers(false) },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush all frozen layers to disk.
|
||||
///
|
||||
/// Only one task at a time can be doing layer-flushing for a
|
||||
@@ -1141,7 +1214,7 @@ impl Timeline {
|
||||
/// currently doing the flushing, this function will wait for it
|
||||
/// to finish. If 'wait' is false, this function will return
|
||||
/// immediately instead.
|
||||
fn flush_frozen_layers(&self, wait: bool) -> Result<()> {
|
||||
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
|
||||
let flush_lock_guard = if wait {
|
||||
self.layer_flush_lock.lock().unwrap()
|
||||
} else {
|
||||
@@ -1180,7 +1253,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -1238,7 +1311,7 @@ impl Timeline {
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
@@ -1299,7 +1372,7 @@ impl Timeline {
|
||||
fn create_delta_layer(
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> Result<(PathBuf, LayerFileMetadata)> {
|
||||
) -> anyhow::Result<(PathBuf, LayerFileMetadata)> {
|
||||
// Write it out
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
@@ -1334,92 +1407,7 @@ impl Timeline {
|
||||
Ok((new_delta_path, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
pub fn compact(&self) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timelie was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
// currently in-use key space. The goal is to partition the
|
||||
// key space into roughly fixed-size chunks, but also take into
|
||||
// account any existing image layers, and try to align the
|
||||
// chunk boundaries with the existing image layers to avoid
|
||||
// too much churn. Also try to align chunk boundaries with
|
||||
// relation boundaries. In principle, we don't know about
|
||||
// relation boundaries here, we just deal with key-value
|
||||
// pairs, and the code in pgdatadir_mapping.rs knows how to
|
||||
// map relations into key-value pairs. But in practice we know
|
||||
// that 'field6' is the block number, and the fields 1-5
|
||||
// identify a relation. This is just an optimization,
|
||||
// though.
|
||||
//
|
||||
// 2. Once we know the partitioning, for each partition,
|
||||
// decide if it's time to create a new image layer. The
|
||||
// criteria is: there has been too much "churn" since the last
|
||||
// image layer? The "churn" is fuzzy concept, it's a
|
||||
// combination of too many delta files, or too much WAL in
|
||||
// total in the delta file. Or perhaps: if creating an image
|
||||
// file would allow to delete some older files.
|
||||
//
|
||||
// 3. After that, we compact all level0 delta files if there
|
||||
// are too many of them. While compacting, we also garbage
|
||||
// collect any page versions that are no longer needed because
|
||||
// of the new image layers we created in step 2.
|
||||
//
|
||||
// TODO: This high level strategy hasn't been implemented yet.
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
match self.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
) {
|
||||
Ok((partitioning, lsn)) => {
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
|
||||
if !layer_paths_to_upload.is_empty()
|
||||
&& self.upload_layers.load(atomic::Ordering::Relaxed)
|
||||
{
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
|
||||
fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
let mut partitioning_guard = self.partitioning.lock().unwrap();
|
||||
if partitioning_guard.1 == Lsn(0)
|
||||
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
|
||||
@@ -1433,7 +1421,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<bool> {
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
@@ -1478,7 +1466,7 @@ impl Timeline {
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
) -> Result<HashMap<PathBuf, LayerFileMetadata>> {
|
||||
) -> anyhow::Result<HashMap<PathBuf, LayerFileMetadata>> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers: Vec<ImageLayer> = Vec::new();
|
||||
for partition in partitioning.parts.iter() {
|
||||
@@ -1496,7 +1484,32 @@ impl Timeline {
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
let img = self.get(key, lsn)?;
|
||||
let img = match self.get(key, lsn) {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
|
||||
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
};
|
||||
image_layer_writer.put_image(key, &img)?;
|
||||
key = key.next();
|
||||
}
|
||||
@@ -1546,7 +1559,7 @@ impl Timeline {
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
///
|
||||
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
|
||||
fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
@@ -1856,12 +1869,12 @@ impl Timeline {
|
||||
///
|
||||
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
|
||||
/// whether a record is needed for PITR.
|
||||
pub fn update_gc_info(
|
||||
pub(super) fn update_gc_info(
|
||||
&self,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
cutoff_horizon: Lsn,
|
||||
pitr: Duration,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
let mut gc_info = self.gc_info.write().unwrap();
|
||||
|
||||
gc_info.horizon_cutoff = cutoff_horizon;
|
||||
@@ -1916,7 +1929,7 @@ impl Timeline {
|
||||
/// within a layer file. We can only remove the whole file if it's fully
|
||||
/// obsolete.
|
||||
///
|
||||
pub fn gc(&self) -> Result<GcResult> {
|
||||
pub(super) fn gc(&self) -> anyhow::Result<GcResult> {
|
||||
let mut result: GcResult = Default::default();
|
||||
let now = SystemTime::now();
|
||||
|
||||
@@ -1959,10 +1972,10 @@ impl Timeline {
|
||||
new_gc_cutoff
|
||||
);
|
||||
write_guard.store_and_unlock(new_gc_cutoff).wait();
|
||||
|
||||
// Persist metadata file
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
||||
}
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
||||
|
||||
info!("GC starting");
|
||||
|
||||
@@ -2089,15 +2102,12 @@ impl Timeline {
|
||||
}
|
||||
|
||||
info!(
|
||||
"GC completed removing {} layers, cuttof {}",
|
||||
"GC completed removing {} layers, cutoff {}",
|
||||
result.layers_removed, new_gc_cutoff
|
||||
);
|
||||
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("gc-before-save-metadata", |_| {
|
||||
info!("Abnormaly terinate pageserver at gc-before-save-metadata fail point");
|
||||
std::process::abort();
|
||||
});
|
||||
return Ok(result);
|
||||
fail_point!("after-timeline-gc-removed-layers");
|
||||
}
|
||||
|
||||
if self.upload_layers.load(atomic::Ordering::Relaxed) {
|
||||
@@ -2239,11 +2249,11 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
}
|
||||
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn)
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ use crate::pgdatadir_mapping::*;
|
||||
use crate::reltag::{RelTag, SlruKind};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walrecord::*;
|
||||
use crate::ZERO_PAGE;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
||||
@@ -43,8 +44,6 @@ use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
pub struct WalIngest<'a> {
|
||||
timeline: &'a Timeline,
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ pub fn spawn_connection_manager_task(
|
||||
wal_connect_timeout: Duration,
|
||||
lagging_wal_timeout: Duration,
|
||||
max_lsn_wal_lag: NonZeroU64,
|
||||
) -> anyhow::Result<()> {
|
||||
) {
|
||||
let mut etcd_client = get_etcd_client().clone();
|
||||
|
||||
let tenant_id = timeline.tenant_id;
|
||||
@@ -95,7 +95,6 @@ pub fn spawn_connection_manager_task(
|
||||
info_span!("wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id),
|
||||
),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
|
||||
|
||||
@@ -610,13 +610,26 @@ impl PostgresRedoProcess {
|
||||
);
|
||||
fs::remove_dir_all(&datadir)?;
|
||||
}
|
||||
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| {
|
||||
Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("incorrect pg_bin_dir path: {}", e),
|
||||
)
|
||||
})?;
|
||||
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| {
|
||||
Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("incorrect pg_lib_dir path: {}", e),
|
||||
)
|
||||
})?;
|
||||
|
||||
info!("running initdb in {}", datadir.display());
|
||||
let initdb = Command::new(conf.pg_bin_dir(pg_version).join("initdb"))
|
||||
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
|
||||
.args(&["-D", &datadir.to_string_lossy()])
|
||||
.arg("-N")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
|
||||
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) // macOS
|
||||
.close_fds()
|
||||
.output()
|
||||
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?;
|
||||
@@ -642,14 +655,14 @@ impl PostgresRedoProcess {
|
||||
}
|
||||
|
||||
// Start postgres itself
|
||||
let mut child = Command::new(conf.pg_bin_dir(pg_version).join("postgres"))
|
||||
let mut child = Command::new(pg_bin_dir_path.join("postgres"))
|
||||
.arg("--wal-redo")
|
||||
.stdin(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
|
||||
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||
.env("PGDATA", &datadir)
|
||||
// The redo process is not trusted, so it runs in seccomp mode
|
||||
// (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
|
||||
|
||||
# Test gc_cuttoff
|
||||
# Test gc_cutoff
|
||||
#
|
||||
# This test set fail point after at the end of GC and checks
|
||||
# that pageserver normally restarts after it
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix(10))
|
||||
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, scale: int):
|
||||
# This test sets fail point at the end of GC, and checks that pageserver
|
||||
# normally restarts after it. Also, there should be GC ERRORs in the log,
|
||||
# but the fixture checks the log for any unexpected ERRORs after every
|
||||
# test anyway, so it doesn't need any special attention here.
|
||||
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
@@ -18,21 +17,23 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, scale: int):
|
||||
"gc_period": "10 s",
|
||||
"gc_horizon": f"{1024 ** 2}",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "5 s",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "1 s",
|
||||
"compaction_threshold": "3",
|
||||
"image_creation_threshold": "2",
|
||||
}
|
||||
)
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
connstr = pg.connstr()
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
connstr = pg.connstr(options="-csynchronous_commit=off")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
|
||||
|
||||
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
|
||||
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
|
||||
|
||||
for i in range(5):
|
||||
try:
|
||||
pg_bin.run_capture(["pgbench", "-T100", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-N", "-c5", "-T100", "-Mprepared", connstr])
|
||||
except Exception:
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
pageserver_http.configure_failpoints(("gc-before-save-metadata", "return"))
|
||||
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 19d948fd47...bdd502a8da
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 339f2d642d...f7c5269e9c
Reference in New Issue
Block a user