Compare commits

...

14 Commits

Author SHA1 Message Date
Alek Westover
04ba161703 Merge branch 'extension_server_rebased' into extension_server_alek 2023-07-04 10:51:04 -04:00
Alek Westover
53db2baed8 remote test only changes 2023-07-04 10:42:04 -04:00
Alek Westover
1f905b114d Fix paths to match infra more closely. Make extension_server actually async. Handle more complex cases of extensions with their dependencies. 2023-07-04 10:29:35 -04:00
Alek Westover
36da6a322b infra paths are slightly different than our testing environment. added test against actual infra, and modified our tests to have the same structure as infra so the tests should still pass 2023-06-29 15:00:40 -04:00
Anastasia Lubennikova
d726c70722 Support custom extensions.
Add infrastructure to dynamically load postgres extensions and shared libraries from remote extension storage.

Before postgres start  downloads list of available remote extensions and libraries, and also downloads 'shared_preload_libraries'.
After postgres is running, 'compute_ctl' listens for HTTP requests to load files.

Postgres has new GUC 'extension_server_port' to specify port on which 'compute_ctl' listens for requests.

When PostgreSQL requests a file, 'compute_ctl'  downloads it.

See more details about feature design and remote extension storage layout in docs/rfcs/024-extension-loading.md
2023-06-29 19:08:57 +03:00
Alexander Bayandin
b2a5e91a88 Upload custom extensions to S3 (#4585)
## Problem

We want to have a number of custom extensions that will not be available
by default, as an example of such is [Postgres
Anonymizer](https://postgresql-anonymizer.readthedocs.io/en/stable/)
(please note that the extension should be added to
`shared_preload_libraries`). To distinguish them, custom extensions
should be added to a different S3 path:

```
s3://<bucket>/<release version>/<postgres_version>/<ext_name>/share/extensions/
s3://<bucket>/<release version>/<postgres_version>/<ext_name>/lib

where <ext_name> is an extension name
```

Resolves https://github.com/neondatabase/neon/issues/4582

## Summary of changes
- Add Postgres Anonymizer extension to Dockerfile (it's included only to
postgres-extensions target)
- Build extensions image from postgres-extensions target in a workflow
- Upload custom extensions to S3 (different directory)
2023-06-29 16:33:26 +03:00
Joonas Koivunen
44e7d5132f fix: hide token from logs (#4584)
fixes #4583 and also changes all needlessly arg listing places to use
`skip_all`.
2023-06-29 15:53:16 +03:00
Alex Chi Z
c19681bc12 neon_local: support force init (#4363)
When we use local SSD for bench and create `.neon` directory before we
do `cargo neon init`, the initialization process will error due to
directory already exists. This PR adds a flag `--force` that removes
everything inside the directory if `.neon` already exists.

---------

Signed-off-by: Alex Chi Z. <chi@neon.tech>
2023-06-28 11:39:07 -04:00
Shany Pozin
ec9b585837 Add Activating as a possible state for attaching a tenant in test_tenant_relocation validation (#4581)
Fix flakyness of test_tenant_relocation
2023-06-28 12:35:52 +03:00
Joonas Koivunen
02ef246db6 refactor: to pattern of await after timeout (#4432)
Refactor the `!completed` to be about `Option<_>` instead, side-stepping
any boolean true/false or false/true. As discussed on
https://github.com/neondatabase/neon/pull/4399#discussion_r1219321848
2023-06-28 06:18:45 +00:00
Konstantin Knizhnik
195d4932c6 Set LwLSN after WAL records when redo is performed or skipped (#4579)
## Problem

See #4516

Inspecting log it is possible to notice that if lwlsn is set to the
beginning of applied WAL record, then
incorrect version of the page is loaded:

```
2023-06-27 18:36:51.930 GMT [3273945] CONTEXT:  WAL redo at 0/14AF6F0 for Heap/INSERT: off 2 flags 0x01; blkref #0: rel 1663/5/1259, blk 0 FPW
2023-06-27 18:36:51.930 GMT [3273945] LOG:  Do REDO block 0 of rel 1663/5/1259 fork 0 at LSN 0/**014AF6F0**..0/014AFA60
2023-06-27 18:37:02.173 GMT [3273963] LOG:  Read blk 0 in rel 1663/5/1259 fork 0 (request LSN 0/**014AF6F0**): lsn=0/**0143C7F8** at character 22
2023-06-27 18:37:47.780 GMT [3273945] LOG:  apply WAL record at 0/1BB8F38 xl_tot_len=188, xl_prev=0/1BB8EF8 
2023-06-27 18:37:47.780 GMT [3273945] CONTEXT:  WAL redo at 0/1BB8F38 for Heap/INPLACE: off 2; blkref #0: rel 1663/5/1259, blk 0
2023-06-27 18:37:47.780 GMT [3273945] LOG:  Do REDO block 0 of rel 1663/5/1259 fork 0 at LSN 0/01BB8F38..0/01BB8FF8
2023-06-27 18:37:47.780 GMT [3273945] CONTEXT:  WAL redo at 0/1BB8F38 for Heap/INPLACE: off 2; blkref #0: rel 1663/5/1259, blk 0
2023-06-27 18:37:47.780 GMT [3273945] PANIC:  invalid lp
```

## Summary of changes

1. Use end record LSN  for both cases
2. Update lwlsn for relation metadata

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-06-28 09:04:13 +03:00
Alexander Bayandin
7fe0a4bf1a Fix promote-images job (#4577)
## Problem

```
+ crane tag neondatabase/extensions:3337 latest
Error: fetching "neondatabase/extensions:3337": GET https://index.docker.io/v2/neondatabase/extensions/manifests/3337: MANIFEST_UNKNOWN: manifest unknown; unknown tag=3337
```

We don't build `neondatabase/extensions` image yet (broken in
https://github.com/neondatabase/neon/pull/4505)

## Summary of changes
- Do not try to interact with `neondatabase/extensions`
2023-06-27 20:05:10 +03:00
bojanserafimov
ef2b9ffbcb Basebackup forward compatibility (#4572) 2023-06-27 12:05:27 -04:00
Alexander Bayandin
250a27fb85 Upload Postgres Extensions to S3 (#4505)
## Problem

We want to store Postgres Extensions in S3 (resolves
https://github.com/neondatabase/neon/issues/4493).

Proposed solution:
- Create a separate docker image (from scratch) that contains only
extensions
- Do not include extensions into compute-node (except for neon
extensions)*
- For release and main builds upload extract extension from the image
and upload to S3 (`s3://<bucket>/<release
version>/<postgres_version>/...`)**

*) We're not doing it until the feature is not fully implemented
**) This differs from the initial proposal in
https://github.com/neondatabase/neon/issues/4493 of putting extensions
straight into `s3://<bucket>/<postgres_version>/...`, because we can't
upload directory atomicly. A drawback of this is that we end up with
unnecessary copies of files ~2.1 GB per release (i.e. +2.1 GB for each
commit in main also). We don't really need to update extensions for each
release if there're no relevant changes, but this requires extra work.

## Summary of changes
- Created a separate stage in Dockerfile.compute-node
`postgres-extensions` that contains only extensions
- Added a separate step in a workflow that builds `postgres-extensions`
image (because of a bug in kaniko this step is commented out because it
takes way too long to get built)
- Extract extensions from the image and upload files to S3 for release
and main builds
- Upload extenstions only for staging (for now)
2023-06-27 16:23:22 +01:00
37 changed files with 1637 additions and 74 deletions

View File

@@ -722,6 +722,35 @@ jobs:
--dockerfile Dockerfile.compute-node
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--destination neondatabase/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
--cleanup
# Due to a kaniko bug, we can't use cache for extensions image, thus it takes about the same amount of time as compute-node image to build (~10 min)
# During the transition period we need to have extensions in both places (in S3 and in compute-node image),
# so we won't build extension twice, but extract them from compute-node.
#
# For now we use extensions image only for new custom extensitons
- name: Kaniko build extensions only
run: |
# Kaniko is suposed to clean up after itself if --cleanup flag is set, but it doesn't.
# Despite some fixes were made in https://github.com/GoogleContainerTools/kaniko/pull/2504 (in kaniko v1.11.0),
# it still fails with error:
# error building image: could not save file: copying file: symlink postgres /kaniko/1/usr/local/pgsql/bin/postmaster: file exists
#
# Ref https://github.com/GoogleContainerTools/kaniko/issues/1406
find /kaniko -maxdepth 1 -mindepth 1 -type d -regex "/kaniko/[0-9]*" -exec rm -rv {} \;
/kaniko/executor --reproducible --snapshot-mode=redo --skip-unused-stages --cache=true \
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache \
--context . \
--build-arg GIT_VERSION=${{ github.sha }} \
--build-arg PG_VERSION=${{ matrix.version }} \
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}} \
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com \
--dockerfile Dockerfile.compute-node \
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
--destination neondatabase/extensions-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
--cleanup \
--target postgres-extensions
# Cleanup script fails otherwise - rm: cannot remove '/nvme/actions-runner/_work/_temp/_github_home/.ecr': Permission denied
- name: Cleanup ECR folder
@@ -840,8 +869,10 @@ jobs:
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v14:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v15:${{needs.tag.outputs.build-tag}} latest
- name: Push images to production ECR
if: |
@@ -852,8 +883,10 @@ jobs:
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v14:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/extensions-v14:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-v15:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/extensions-v15:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/extensions-v15:latest
- name: Configure Docker Hub login
run: |
@@ -875,16 +908,89 @@ jobs:
crane tag neondatabase/compute-tools:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/vm-compute-node-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/extensions-v14:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/vm-compute-node-v15:${{needs.tag.outputs.build-tag}} latest
crane tag neondatabase/extensions-v15:${{needs.tag.outputs.build-tag}} latest
- name: Cleanup ECR folder
run: rm -rf ~/.ecr
upload-postgres-extensions-to-s3:
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
runs-on: ${{ github.ref_name == 'release' && fromJSON('["self-hosted", "prod", "x64"]') || fromJSON('["self-hosted", "gen3", "small"]') }}
needs: [ tag, promote-images ]
strategy:
fail-fast: false
matrix:
version: [ v14, v15 ]
env:
# While on transition period we extract public extensions from compute-node image and custom extensions from extensions image.
# Later all the extensions will be moved to extensions image.
EXTENSIONS_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/extensions-${{ matrix.version }}:latest
COMPUTE_NODE_IMAGE: ${{ github.ref_name == 'release' && '093970136003' || '369495373322'}}.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:latest
AWS_ACCESS_KEY_ID: ${{ github.ref_name == 'release' && secrets.AWS_ACCESS_KEY_PROD || secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ github.ref_name == 'release' && secrets.AWS_SECRET_KEY_PROD || secrets.AWS_SECRET_KEY_DEV }}
S3_BUCKETS: |
${{ github.ref_name == 'release' &&
'neon-prod-extensions-ap-southeast-1 neon-prod-extensions-eu-central-1 neon-prod-extensions-us-east-1 neon-prod-extensions-us-east-2 neon-prod-extensions-us-west-2' ||
'neon-dev-extensions-eu-central-1 neon-dev-extensions-eu-west-1 neon-dev-extensions-us-east-2' }}
steps:
- name: Pull postgres-extensions image
run: |
docker pull ${EXTENSIONS_IMAGE}
docker pull ${COMPUTE_NODE_IMAGE}
- name: Create postgres-extensions container
id: create-container
run: |
EID=$(docker create ${EXTENSIONS_IMAGE} true)
echo "EID=${EID}" >> $GITHUB_OUTPUT
CID=$(docker create ${COMPUTE_NODE_IMAGE} true)
echo "CID=${CID}" >> $GITHUB_OUTPUT
- name: Extract postgres-extensions from container
run: |
rm -rf ./extensions-to-upload ./custom-extensions # Just in case
# In compute image we have a bit different directory layout
mkdir -p extensions-to-upload/share
docker cp ${{ steps.create-container.outputs.CID }}:/usr/local/share/extension ./extensions-to-upload/share/extension
docker cp ${{ steps.create-container.outputs.CID }}:/usr/local/lib ./extensions-to-upload/lib
# Delete Neon extensitons (they always present on compute-node image)
rm -rf ./extensions-to-upload/share/extension/neon*
rm -rf ./extensions-to-upload/lib/neon*
docker cp ${{ steps.create-container.outputs.EID }}:/extensions ./custom-extensions
for EXT_NAME in $(ls ./custom-extensions); do
mkdir -p ./extensions-to-upload/${EXT_NAME}/share
mv ./custom-extensions/${EXT_NAME}/share/extension ./extensions-to-upload/${EXT_NAME}/share/extension
mv ./custom-extensions/${EXT_NAME}/lib ./extensions-to-upload/${EXT_NAME}/lib
done
- name: Upload postgres-extensions to S3
run: |
for BUCKET in $(echo ${S3_BUCKETS}); do
aws s3 cp --recursive --only-show-errors ./extensions-to-upload s3://${BUCKET}/${{ needs.tag.outputs.build-tag }}/${{ matrix.version }}
done
- name: Cleanup
if: ${{ always() && (steps.create-container.outputs.CID || steps.create-container.outputs.EID) }}
run: |
docker rm ${{ steps.create-container.outputs.CID }} || true
docker rm ${{ steps.create-container.outputs.EID }} || true
deploy:
runs-on: [ self-hosted, gen3, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
needs: [ promote-images, tag, regress-tests ]
needs: [ upload-postgres-extensions-to-s3, promote-images, tag, regress-tests ]
if: ( github.ref_name == 'main' || github.ref_name == 'release' ) && github.event_name != 'workflow_dispatch'
steps:
- name: Fix git ownership

3
Cargo.lock generated
View File

@@ -924,12 +924,14 @@ dependencies = [
"opentelemetry",
"postgres",
"regex",
"remote_storage",
"reqwest",
"serde",
"serde_json",
"tar",
"tokio",
"tokio-postgres",
"toml_edit",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -997,6 +999,7 @@ dependencies = [
"tar",
"thiserror",
"toml",
"tracing",
"url",
"utils",
"workspace_hack",

View File

@@ -515,6 +515,26 @@ RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
#########################################################################################
#
# Layer "pg-anon-pg-build"
# compile anon extension
#
#########################################################################################
FROM build-deps AS pg-anon-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# Kaniko doesn't allow to do `${from#/usr/local/pgsql/}`, so we use `${from:17}` instead
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/1.1.0/postgresql_anonymizer-1.1.0.tar.gz -O pg_anon.tar.gz && \
echo "08b09d2ff9b962f96c60db7e6f8e79cf7253eb8772516998fc35ece08633d3ad pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xvzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sort > /before.txt && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control && \
find /usr/local/pgsql -type f | sort > /after.txt && \
/bin/bash -c 'for from in $(comm -13 /before.txt /after.txt); do to=/extensions/anon/${from:17} && mkdir -p $(dirname ${to}) && cp -a ${from} ${to}; done'
#########################################################################################
#
# Layer "rust extensions"
@@ -623,6 +643,7 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.0.tar.gz -
#
#########################################################################################
FROM build-deps AS neon-pg-ext-build
# Public extensions
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=postgis-build /sfcgal/* /
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -698,6 +719,22 @@ RUN rm -r /usr/local/pgsql/include
# if they were to be used by other libraries.
RUN rm /usr/local/pgsql/lib/lib*.a
#########################################################################################
#
# Extenstion only
#
#########################################################################################
FROM scratch AS postgres-extensions
# After the transition this layer will include all extensitons.
# As for now, it's only for new custom ones
#
# # Default extensions
# COPY --from=postgres-cleanup-layer /usr/local/pgsql/share/extension /usr/local/pgsql/share/extension
# COPY --from=postgres-cleanup-layer /usr/local/pgsql/lib /usr/local/pgsql/lib
# Custom extensions
COPY --from=pg-anon-pg-build /extensions/anon/lib/ /extensions/anon/lib
COPY --from=pg-anon-pg-build /extensions/anon/share/extension /extensions/anon/share/extension
#########################################################################################
#
# Final layer

View File

@@ -30,3 +30,5 @@ url.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true
toml_edit.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }

View File

@@ -5,6 +5,8 @@
//! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
//! - Every start is a fresh start, so the data directory is removed and
//! initialized again on each run.
//! - If remote_extension_config is provided, it will be used to fetch extensions list
//! and download `shared_preload_libraries` from the remote storage.
//! - Next it will put configuration files into the `PGDATA` directory.
//! - Sync safekeepers and get commit LSN.
//! - Get `basebackup` from pageserver using the returned on the previous step LSN.
@@ -27,7 +29,8 @@
//! compute_ctl -D /var/db/postgres/compute \
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -b /usr/local/bin/postgres
//! -b /usr/local/bin/postgres \
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"} \
//! ```
//!
use std::collections::HashMap;
@@ -35,7 +38,7 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
@@ -48,6 +51,7 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -64,6 +68,14 @@ fn main() -> Result<()> {
info!("build_tag: {build_tag}");
let matches = cli().get_matches();
let pgbin_default = String::from("postgres");
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
let ext_remote_storage = remote_ext_config.map(|x| {
init_remote_storage(x, build_tag)
.expect("cannot initialize remote extension storage from config")
});
let http_port = *matches
.get_one::<u16>("http-port")
@@ -128,9 +140,6 @@ fn main() -> Result<()> {
let compute_id = matches.get_one::<String>("compute-id");
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let spec;
let mut live_config_allowed = false;
match spec_json {
@@ -168,6 +177,7 @@ fn main() -> Result<()> {
let mut new_state = ComputeState::new();
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
@@ -179,9 +189,13 @@ fn main() -> Result<()> {
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version(pgbin),
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
available_libraries: OnceLock::new(),
available_extensions: OnceLock::new(),
};
let compute = Arc::new(compute_node);
@@ -190,6 +204,8 @@ fn main() -> Result<()> {
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
let extension_server_port: u16 = http_port;
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -230,7 +246,7 @@ fn main() -> Result<()> {
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;
let pg = match compute.start_compute() {
let pg = match compute.start_compute(extension_server_port) {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
@@ -349,6 +365,12 @@ fn cli() -> clap::Command {
.long("control-plane-uri")
.value_name("CONTROL_PLANE_API_BASE_URI"),
)
.arg(
Arg::new("remote-ext-config")
.short('r')
.long("remote-ext-config")
.value_name("REMOTE_EXT_CONFIG"),
)
}
#[test]

View File

@@ -1,13 +1,15 @@
use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::{Condvar, Mutex};
use std::sync::{Condvar, Mutex, OnceLock};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tokio;
use tokio_postgres;
use tracing::{info, instrument, warn};
use utils::id::{TenantId, TimelineId};
@@ -16,9 +18,12 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use crate::config;
use remote_storage::{GenericRemoteStorage, RemotePath};
use crate::extension_server::PathAndFlag;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::{config, extension_server};
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
@@ -26,6 +31,7 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub pgversion: String,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
@@ -45,6 +51,11 @@ pub struct ComputeNode {
pub state: Mutex<ComputeState>,
/// `Condvar` to allow notifying waiters about state changes.
pub state_changed: Condvar,
/// the S3 bucket that we search for extensions in
pub ext_remote_storage: Option<GenericRemoteStorage>,
// cached lists of available extensions and libraries
pub available_libraries: OnceLock<HashMap<String, Vec<RemotePath>>>,
pub available_extensions: OnceLock<HashMap<String, Vec<PathAndFlag>>>,
}
#[derive(Clone, Debug)]
@@ -235,7 +246,7 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self, compute_state))]
#[instrument(skip_all, fields(%lsn))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
@@ -277,7 +288,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self, storage_auth_token))]
#[instrument(skip_all)]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
@@ -322,15 +333,23 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
#[instrument(skip_all)]
pub fn prepare_pgdata(
&self,
compute_state: &ComputeState,
extension_server_port: u16,
) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"),
&pspec.spec,
Some(extension_server_port),
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
// is already connected it will be kicked out, so a secondary (standby)
@@ -380,7 +399,7 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
@@ -404,7 +423,7 @@ impl ComputeNode {
}
/// Do initial configuration of the already started Postgres.
#[instrument(skip(self, compute_state))]
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
@@ -458,7 +477,7 @@ impl ComputeNode {
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip(self, client))]
#[instrument(skip_all)]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
@@ -466,13 +485,13 @@ impl ComputeNode {
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip(self))]
#[instrument(skip_all)]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
@@ -501,8 +520,8 @@ impl ComputeNode {
Ok(())
}
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
#[instrument(skip_all)]
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
@@ -513,7 +532,12 @@ impl ComputeNode {
pspec.timeline_id,
);
self.prepare_pgdata(&compute_state)?;
// TODO FIXME: this should not be blocking here
// Maybe we can run it as a child process?
// Also, worth measuring how long this step is taking
self.prepare_external_extensions(&compute_state)?;
self.prepare_pgdata(&compute_state, extension_server_port)?;
let start_time = Utc::now();
@@ -649,4 +673,129 @@ LIMIT 100",
"{{\"pg_stat_statements\": []}}".to_string()
}
}
// If remote extension storage is configured,
// download extension control files
// and shared preload libraries.
#[tokio::main]
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
// download preload shared libraries before postgres start (if any)
let spec = &pspec.spec;
// 1. parse custom extension paths from spec
let custom_ext_prefixes = match &spec.custom_extensions {
Some(custom_extensions) => custom_extensions.clone(),
None => Vec::new(),
};
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
// 2. parse shared_preload_libraries from spec
let mut libs_vec = Vec::new();
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
info!(
"shared_preload_libraries parsed from spec.cluster.settings: {:?}",
libs_vec
);
// also parse shared_preload_libraries from provided postgresql.conf
// that is used in neon_local and python tests
if let Some(conf) = &spec.cluster.postgresql_conf {
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
let mut shared_preload_libraries_line = "";
for line in conf_lines {
if line.starts_with("shared_preload_libraries") {
shared_preload_libraries_line = line;
}
}
let mut preload_libs_vec = Vec::new();
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
preload_libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
info!(
"shared_preload_libraries parsed from spec.cluster.postgresql_conf: {:?}",
preload_libs_vec
);
libs_vec.extend(preload_libs_vec);
}
info!("Libraries to download: {:?}", &libs_vec);
// download extension control files & shared_preload_libraries
let get_extensions_task = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
);
let get_libraries_task = extension_server::get_available_libraries(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
&libs_vec,
);
let (available_extensions, available_libraries) =
tokio::join!(get_extensions_task, get_libraries_task);
self.available_extensions
.set(available_extensions?)
.expect("available_extensions.set error");
self.available_libraries
.set(available_libraries?)
.expect("available_libraries.set error");
}
Ok(())
}
pub async fn download_extension_files(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_extension_files(
&filename,
remote_storage,
&self.pgbin,
self.available_extensions
.get()
.context("available_extensions broke")?,
)
.await
}
}
}
pub async fn download_library_file(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_library_file(
&filename,
remote_storage,
&self.pgbin,
self.available_libraries
.get()
.context("available_libraries broke")?,
)
.await
}
}
}
}

View File

@@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
}
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
pub fn write_postgres_conf(
path: &Path,
spec: &ComputeSpec,
extension_server_port: Option<u16>,
) -> Result<()> {
// File::create() destroys the file content if it exists.
let mut file = File::create(path)?;
@@ -95,5 +99,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
writeln!(file, "# Managed by compute_ctl: end")?;
}
if let Some(port) = extension_server_port {
writeln!(file, "neon.extension_server_port={}", port)?;
}
Ok(())
}

View File

@@ -8,7 +8,7 @@ use compute_api::responses::ComputeStatus;
use crate::compute::ComputeNode;
#[instrument(skip(compute))]
#[instrument(skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {

View File

@@ -0,0 +1,428 @@
// Download extension files from the extension store
// and put them in the right place in the postgres directory
use anyhow::{self, bail, Context, Result};
use futures::future::join_all;
use remote_storage::*;
use serde_json::{self, Value};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
// remote!
const SHARE_EXT_PATH: &str = "share/extension";
fn pass_any_error(results: Vec<Result<()>>) -> Result<()> {
for result in results {
result?;
}
Ok(())
}
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
let pgconfig = pgbin.replace("postgres", "pg_config");
let config_output = std::process::Command::new(pgconfig)
.arg(argument)
.output()
.expect("pg_config error");
std::str::from_utf8(&config_output.stdout)
.expect("pg_config error")
.trim()
.to_string()
}
pub fn get_pg_version(pgbin: &str) -> String {
// pg_config --version returns a (platform specific) human readable string
// such as "PostgreSQL 15.4". We parse this to v14/v15
let human_version = get_pg_config("--version", pgbin);
if human_version.contains("15") {
return "v15".to_string();
} else if human_version.contains("14") {
return "v14".to_string();
}
panic!("Unsuported postgres version {human_version}");
}
async fn download_helper(
remote_storage: &GenericRemoteStorage,
remote_from_path: RemotePath,
sub_directory: Option<&str>,
download_location: &Path,
) -> anyhow::Result<()> {
// downloads file at remote_from_path to
// `download_location/[optional: subdirectory]/[remote_storage.object_name()]`
// Note: the subdirectory commmand is needed when there is an extension that
// depends on files in a subdirectory.
// For example, v14/share/extension/some_ext.control
// might depend on v14/share/extension/some_ext/some_ext--1.1.0.sql
// and v14/share/extension/some_ext/xxx.csv
// Note: it is the caller's responsibility to create the appropriate subdirectory
let local_path = match sub_directory {
Some(subdir) => download_location
.join(subdir)
.join(remote_from_path.object_name().expect("bad object")),
None => download_location.join(remote_from_path.object_name().expect("bad object")),
};
if local_path.exists() {
info!("File {:?} already exists. Skipping download", &local_path);
return Ok(());
}
info!(
"Downloading {:?} to location {:?}",
&remote_from_path, &local_path
);
let mut download = remote_storage.download(&remote_from_path).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.await?;
let mut output_file = BufWriter::new(File::create(local_path)?);
output_file.write_all(&write_data_buffer)?;
info!("Download {:?} completed successfully", &remote_from_path);
Ok(())
}
// download extension control files
//
// if custom_ext_prefixes is provided - search also in custom extension paths
//
pub async fn get_available_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
custom_ext_prefixes: &Vec<String>,
) -> anyhow::Result<HashMap<String, Vec<PathAndFlag>>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
// public path, plus any private paths to download extensions from
let mut paths: Vec<RemotePath> = Vec::new();
paths.push(RemotePath::new(
&Path::new(pg_version).join(SHARE_EXT_PATH),
)?);
for custom_prefix in custom_ext_prefixes {
paths.push(RemotePath::new(
&Path::new(pg_version)
.join(custom_prefix)
.join(SHARE_EXT_PATH),
)?);
}
let (extension_files, control_files) =
organized_extension_files(remote_storage, &paths).await?;
let mut control_file_download_tasks = Vec::new();
// download all control files
for control_file in control_files {
control_file_download_tasks.push(download_helper(
remote_storage,
control_file.clone(),
None,
&local_sharedir,
));
}
pass_any_error(join_all(control_file_download_tasks).await)?;
Ok(extension_files)
}
// Download requested shared_preload_libraries
//
// Note that tenant_id is not optional here, because we only download libraries
// after we know the tenant spec and the tenant_id.
//
// return list of all library files to use it in the future searches
pub async fn get_available_libraries(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
custom_ext_prefixes: &Vec<String>,
preload_libraries: &Vec<String>,
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
// Construct a hashmap of all available libraries
// example (key, value) pair: test_lib0: [RemotePath(v14/lib/test_lib0.so), RemotePath(v14/lib/test_lib0.so.3)]
let mut paths: Vec<RemotePath> = Vec::new();
// public libraries
paths.push(
RemotePath::new(&Path::new(&pg_version).join("lib/"))
.expect("The hard coded path here is valid"),
);
// custom libraries
for custom_prefix in custom_ext_prefixes {
paths.push(
RemotePath::new(&Path::new(&pg_version).join(custom_prefix).join("lib"))
.expect("The hard coded path here is valid"),
);
}
let all_available_libraries = organized_library_files(remote_storage, &paths).await?;
info!("list of library files {:?}", &all_available_libraries);
// download all requested libraries
let mut download_tasks = Vec::new();
for lib_name in preload_libraries {
download_tasks.push(download_library_file(
lib_name,
remote_storage,
pgbin,
&all_available_libraries,
));
}
pass_any_error(join_all(download_tasks).await)?;
Ok(all_available_libraries)
}
// download all sqlfiles (and possibly data files) for a given extension name
//
pub async fn download_extension_files(
ext_name: &str,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
all_available_files: &HashMap<String, Vec<PathAndFlag>>,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let mut downloaded_something = false;
let mut made_subdir = false;
info!("EXTENSION {:?}", ext_name);
info!("{:?}", all_available_files.get(ext_name));
info!("start download");
let mut download_tasks = Vec::new();
if let Some(files) = all_available_files.get(ext_name) {
info!("Downloading files for extension {:?}", &ext_name);
for path_and_flag in files {
let file = &path_and_flag.path;
let subdir_flag = path_and_flag.subdir_flag;
info!(
"--- Downloading {:?} (for {:?} as subdir? = {:?})",
&file, &ext_name, subdir_flag
);
let mut subdir = None;
if subdir_flag {
subdir = Some(ext_name);
if !made_subdir {
made_subdir = true;
std::fs::create_dir_all(local_sharedir.join(ext_name))?;
}
}
download_tasks.push(download_helper(
remote_storage,
file.clone(),
subdir,
&local_sharedir,
));
downloaded_something = true;
}
}
if !downloaded_something {
bail!("Files for extension {ext_name} are not found in the extension store");
}
pass_any_error(join_all(download_tasks).await)?;
info!("finish download");
Ok(())
}
// appends an .so suffix to libname if it does not already have one
fn enforce_so_end(libname: &str) -> String {
if !libname.contains(".so") {
format!("{}.so", libname)
} else {
libname.to_string()
}
}
// download shared library file
pub async fn download_library_file(
lib_name: &str,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
all_available_libraries: &HashMap<String, Vec<RemotePath>>,
) -> Result<()> {
let lib_name = get_library_name(lib_name);
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
info!("looking for library {:?}", &lib_name);
match all_available_libraries.get(&*lib_name) {
Some(remote_paths) => {
let mut library_download_tasks = Vec::new();
for remote_path in remote_paths {
let file_path = local_libdir.join(remote_path.object_name().expect("bad object"));
if file_path.exists() {
info!("File {:?} already exists. Skipping download", &file_path);
} else {
library_download_tasks.push(download_helper(
remote_storage,
remote_path.clone(),
None,
&local_libdir,
));
}
}
pass_any_error(join_all(library_download_tasks).await)?;
}
None => {
// minor TODO: this logic seems to be somewhat faulty for .so.3 type files?
let lib_name_with_ext = enforce_so_end(&lib_name);
let file_path = local_libdir.join(lib_name_with_ext);
if file_path.exists() {
info!("File {:?} already exists. Skipping download", &file_path);
} else {
bail!("Library file {lib_name} not found")
}
}
}
Ok(())
}
// This function initializes the necessary structs to use remmote storage (should be fairly cheap)
pub fn init_remote_storage(
remote_ext_config: &str,
default_prefix: &str,
) -> anyhow::Result<GenericRemoteStorage> {
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
let remote_ext_bucket = match &remote_ext_config["bucket"] {
Value::String(x) => x,
_ => bail!("remote_ext_config missing bucket"),
};
let remote_ext_region = match &remote_ext_config["region"] {
Value::String(x) => x,
_ => bail!("remote_ext_config missing region"),
};
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
Value::String(x) => Some(x.clone()),
_ => None,
};
let remote_ext_prefix = match &remote_ext_config["prefix"] {
Value::String(x) => Some(x.clone()),
// if prefix is not provided, use default, which is the build_tag
_ => Some(default_prefix.to_string()),
};
// load will not be large, so default parameters are fine
let config = S3Config {
bucket_name: remote_ext_bucket.to_string(),
bucket_region: remote_ext_region.to_string(),
prefix_in_bucket: remote_ext_prefix,
endpoint: remote_ext_endpoint,
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
max_keys_per_list_response: None,
};
let config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
storage: RemoteStorageKind::AwsS3(config),
};
GenericRemoteStorage::from_config(&config)
}
fn get_library_name(path: &str) -> String {
let path_suffix: Vec<&str> = path.split('/').collect();
let path_suffix = path_suffix.last().expect("bad ext name").to_string();
if let Some(index) = path_suffix.find(".so") {
return path_suffix[..index].to_string();
}
path_suffix
}
// asyncrounously lists files in all necessary directories
// TODO: potential optimization: do a single list files on the entire bucket
// and then filter out the files we don't need
async fn list_all_files(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<Vec<RemotePath>> {
let mut list_tasks = Vec::new();
let mut all_files = Vec::new();
for path in paths {
list_tasks.push(remote_storage.list_files(Some(path)));
}
for list_result in join_all(list_tasks).await {
all_files.extend(list_result?);
}
Ok(all_files)
}
// helper to collect all libraries, grouped by library name
// Returns a hashmap of (library name: [paths]})
// example entry: {libpgtypes: [libpgtypes.so.3, libpgtypes.so]}
async fn organized_library_files(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<HashMap<String, Vec<RemotePath>>> {
let mut library_groups = HashMap::new();
for file in list_all_files(remote_storage, paths).await? {
let lib_name = get_library_name(file.get_path().to_str().context("invalid path")?);
let lib_list = library_groups.entry(lib_name).or_insert(Vec::new());
lib_list.push(file.to_owned());
}
Ok(library_groups)
}
// store a path, paired with a flag indicating whether the path is to a file in
// the root or subdirectory
#[derive(Debug)]
pub struct PathAndFlag {
path: RemotePath,
subdir_flag: bool,
}
// get_ext_name extracts the extension name, and returns a flag indicating
// whether this file is in a subdirectory or not.
//
// extension files can be in subdirectories of the extension store.
// examples of layout:
// v14//share//extension/extension_name--1.0.sql,
// v14//share//extension/extension_name/extension_name--1.0.sql,
// v14//share//extension/extension_name/extra_data.csv
// Note: we *assume* that the extension files is in one of these formats.
// If it is not, this code's behavior is *undefined*.
fn get_ext_name(path: &str) -> Result<(&str, bool)> {
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
let ext_name = path_suffix.last().expect("bad ext name");
if let Some(index) = ext_name.find('/') {
return Ok((&ext_name[..index], true));
} else if let Some(index) = ext_name.find("--") {
return Ok((&ext_name[..index], false));
}
Ok((ext_name, false))
}
// helper to collect files of given prefixes for extensions and group them by extension
// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension)
// and a list of control files
// For example, an entry in the hashmap could be
// {"anon": [RemotePath("v14/anon/share/extension/anon/address.csv"),
// RemotePath("v14/anon/share/extension/anon/anon--1.1.0.sql")]},
// with corresponding list of control files entry being
// {"anon.control": RemotePath("v14/anon/share/extension/anon.control")}
async fn organized_extension_files(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<(HashMap<String, Vec<PathAndFlag>>, Vec<RemotePath>)> {
let mut grouped_dependencies = HashMap::new();
let mut control_files = Vec::new();
for file in list_all_files(remote_storage, paths).await? {
if file.extension().context("bad file name")? == "control" {
control_files.push(file.to_owned());
} else {
let (file_ext_name, subdir_flag) =
get_ext_name(file.get_path().to_str().context("invalid path")?)?;
let ext_file_list = grouped_dependencies
.entry(file_ext_name.to_string())
.or_insert(Vec::new());
ext_file_list.push(PathAndFlag {
path: file.to_owned(),
subdir_flag,
});
}
}
Ok((grouped_dependencies, control_files))
}

View File

@@ -121,6 +121,55 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
info!("req.uri {:?}", req.uri());
let mut is_library = false;
if let Some(params) = req.uri().query() {
info!("serving {:?} POST request with params: {}", route, params);
if params == "is_library=true" {
is_library = true;
} else {
let mut resp = Response::new(Body::from("Wrong request parameters"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return resp;
}
}
let filename = route.split('/').last().unwrap().to_string();
info!(
"serving /extension_server POST request, filename: {:?} is_library: {}",
filename, is_library
);
if is_library {
match compute.download_library_file(filename.to_string()).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("library download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
} else {
match compute.download_extension_files(filename.to_string()).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
}
}
// Return the `404 Not Found` for any other routes.
_ => {
let mut not_found = Response::new(Body::from("404 Not Found"));

View File

@@ -139,6 +139,34 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/extension_server:
post:
tags:
- Extension
summary: Download extension from S3 to local folder.
description: ""
operationId: downloadExtension
responses:
200:
description: Extension downloaded
content:
text/plain:
schema:
type: string
description: Error text or 'OK' if download succeeded.
example: "OK"
400:
description: Request is invalid.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
500:
description: Extension download request failed.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:

View File

@@ -9,6 +9,7 @@ pub mod http;
#[macro_use]
pub mod logger;
pub mod compute;
pub mod extension_server;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -215,7 +215,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
#[instrument(skip(pg))]
#[instrument(skip_all, fields(pgdata = %pgdata.display()))]
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");

View File

@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
// File `postgresql.conf` is no longer included into `basebackup`, so just
// always write all config into it creating new file.
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
update_pg_hba(pgdata_path)?;

View File

@@ -32,3 +32,4 @@ utils.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true
tracing.workspace = true

View File

@@ -308,7 +308,8 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
let mut env =
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
env.init(pg_version)
let force = init_match.get_flag("force");
env.init(pg_version, force)
.context("Failed to initialize neon repository")?;
// Initialize pageserver, create initial tenant and timeline.
@@ -657,6 +658,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
@@ -698,7 +701,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
_ => {}
}
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token, safekeepers)?;
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
@@ -742,7 +745,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
pg_version,
mode,
)?;
ep.start(&auth_token, safekeepers)?;
ep.start(&auth_token, safekeepers, remote_ext_config)?;
}
}
"stop" => {
@@ -1002,6 +1005,12 @@ fn cli() -> Command {
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
.required(false);
let remote_ext_config_args = Arg::new("remote-ext-config")
.long("remote-ext-config")
.num_args(1)
.help("Configure the S3 bucket that we search for extensions in.")
.required(false);
let lsn_arg = Arg::new("lsn")
.long("lsn")
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
@@ -1013,6 +1022,13 @@ fn cli() -> Command {
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
let force_arg = Arg::new("force")
.value_parser(value_parser!(bool))
.long("force")
.action(ArgAction::SetTrue)
.help("Force initialization even if the repository is not empty")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1028,6 +1044,7 @@ fn cli() -> Command {
.value_name("config"),
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
Command::new("timeline")
@@ -1152,6 +1169,7 @@ fn cli() -> Command {
.arg(pg_version_arg)
.arg(hot_standby_arg)
.arg(safekeepers_arg)
.arg(remote_ext_config_args)
)
.subcommand(
Command::new("stop")

View File

@@ -311,7 +311,7 @@ impl Endpoint {
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is availiable.
// whichever is available.
let sk_ports = self
.env
.safekeepers
@@ -408,7 +408,12 @@ impl Endpoint {
Ok(())
}
pub fn start(&self, auth_token: &Option<String>, safekeepers: Vec<NodeId>) -> Result<()> {
pub fn start(
&self,
auth_token: &Option<String>,
safekeepers: Vec<NodeId>,
remote_ext_config: Option<&String>,
) -> Result<()> {
if self.status() == "running" {
anyhow::bail!("The endpoint is already running");
}
@@ -476,6 +481,13 @@ impl Endpoint {
pageserver_connstring: Some(pageserver_connstring),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
// TODO FIXME: This is a hack to test custom extensions locally.
// In test_download_extensions, we assume that the custom extension
// prefix is the tenant ID. So we set it here.
//
// The proper way to implement this is to pass the custom extension
// in spec, but we don't have a way to do that yet in the python tests.
custom_extensions: Some(vec![self.tenant_id.to_string()]),
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
@@ -507,6 +519,9 @@ impl Endpoint {
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
if let Some(remote_ext_config) = remote_ext_config {
cmd.args(["--remote-ext-config", remote_ext_config]);
}
let _child = cmd.spawn()?;
// Wait for it to start

View File

@@ -364,7 +364,7 @@ impl LocalEnv {
//
// Initialize a new Neon repository
//
pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> {
pub fn init(&mut self, pg_version: u32, force: bool) -> anyhow::Result<()> {
// check if config already exists
let base_path = &self.base_data_dir;
ensure!(
@@ -372,11 +372,29 @@ impl LocalEnv {
"repository base path is missing"
);
ensure!(
!base_path.exists(),
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if base_path.exists() {
if force {
println!("removing all contents of '{}'", base_path.display());
// instead of directly calling `remove_dir_all`, we keep the original dir but removing
// all contents inside. This helps if the developer symbol links another directory (i.e.,
// S3 local SSD) to the `.neon` base directory.
for entry in std::fs::read_dir(base_path)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
fs::remove_dir_all(&path)?;
} else {
fs::remove_file(&path)?;
}
}
} else {
bail!(
"directory '{}' already exists. Perhaps already initialized? (Hint: use --force to remove all contents)",
base_path.display()
);
}
}
if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
@@ -392,7 +410,9 @@ impl LocalEnv {
}
}
fs::create_dir(base_path)?;
if !base_path.exists() {
fs::create_dir(base_path)?;
}
// Generate keypair for JWT.
//

View File

@@ -0,0 +1,183 @@
# Supporting custom user Extensions (Dynamic Extension Loading)
Created 2023-05-03
## Motivation
There are many extensions in the PostgreSQL ecosystem, and not all extensions
are of a quality that we can confidently support them. Additionally, our
current extension inclusion mechanism has several problems because we build all
extensions into the primary Compute image: We build the extensions every time
we build the compute image regardless of whether we actually need to rebuild
the image, and the inclusion of these extensions in the image adds a hard
dependency on all supported extensions - thus increasing the image size, and
with it the time it takes to download that image - increasing first start
latency.
This RFC proposes a dynamic loading mechanism that solves most of these
problems.
## Summary
`compute_ctl` is made responsible for loading extensions on-demand into
the container's file system for dynamically loaded extensions, and will also
make sure that the extensions in `shared_preload_libraries` are downloaded
before the compute node starts.
## Components
compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store
## Requirements
Compute nodes with no extra extensions should not be negatively impacted by
the existence of support for many extensions.
Installing an extension into PostgreSQL should be easy.
Non-preloaded extensions shouldn't impact startup latency.
Uninstalled extensions shouldn't impact query latency.
A small latency penalty for dynamically loaded extensions is acceptable in
the first seconds of compute startup, but not in steady-state operations.
## Proposed implementation
### On-demand, JIT-loading of extensions
Before postgres starts we download
- control files for all extensions available to that compute node;
- all `shared_preload_libraries`;
After postgres is running, `compute_ctl` listens for requests to load files.
When PostgreSQL requests a file, `compute_ctl` downloads it.
PostgreSQL requests files in the following cases:
- When loading a preload library set in `local_preload_libraries`
- When explicitly loading a library with `LOAD`
- Wnen creating extension with `CREATE EXTENSION` (download sql scripts, (optional) extension data files and (optional) library files)))
#### Summary
Pros:
- Startup is only as slow as it takes to load all (shared_)preload_libraries
- Supports BYO Extension
Cons:
- O(sizeof(extensions)) IO requirement for loading all extensions.
### Alternative solutions
1. Allow users to add their extensions to the base image
Pros:
- Easy to deploy
Cons:
- Doesn't scale - first start size is dependent on image size;
- All extensions are shared across all users: It doesn't allow users to
bring their own restrictive-licensed extensions
2. Bring Your Own compute image
Pros:
- Still easy to deploy
- User can bring own patched version of PostgreSQL
Cons:
- First start latency is O(sizeof(extensions image))
- Warm instance pool for skipping pod schedule latency is not feasible with
O(n) custom images
- Support channels are difficult to manage
3. Download all user extensions in bulk on compute start
Pros:
- Easy to deploy
- No startup latency issues for "clean" users.
- Warm instance pool for skipping pod schedule latency is possible
Cons:
- Downloading all extensions in advance takes a lot of time, thus startup
latency issues
4. Store user's extensions in persistent storage
Pros:
- Easy to deploy
- No startup latency issues
- Warm instance pool for skipping pod schedule latency is possible
Cons:
- EC2 instances have only limited number of attachments shared between EBS
volumes, direct-attached NVMe drives, and ENIs.
- Compute instance migration isn't trivially solved for EBS mounts (e.g.
the device is unavailable whilst moving the mount between instances).
- EBS can only mount on one instance at a time (except the expensive IO2
device type).
5. Store user's extensions in network drive
Pros:
- Easy to deploy
- Few startup latency issues
- Warm instance pool for skipping pod schedule latency is possible
Cons:
- We'd need networked drives, and a lot of them, which would store many
duplicate extensions.
- **UNCHECKED:** Compute instance migration may not work nicely with
networked IOs
### Idea extensions
The extension store does not have to be S3 directly, but could be a Node-local
caching service on top of S3. This would reduce the load on the network for
popular extensions.
## Extension Storage implementation
Extension Storage in our case is an S3 bucket with a "directory" per build and postgres version,
where extension files are stored as plain files in the bucket following the same directory structure as in the postgres.
i.e.
`s3://<the-bucket>/<build-version>/<postgres-version>/lib/postgis-3.1.so`
`s3://<the-bucket>/<build-version>/<postgres-version>/share/extension/postgis.control`
`s3://<the-bucket>/<build-version>/<postgres-version>/share/extension/postgis--3.1.sql`
To handle custom extensions, that available only to specific users, we use per-extension subdirectories:
i.e.
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix>/lib/ext-name.so`, etc.
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix>/share/extension/ext-name.control`, etc.
On compute start, `compute_ctl` accepts a list of custom_ext_prefixes.
To get the list of available extensions,`compute_ctl` downloads control files from all prefixes:
`s3://<the-bucket>/<build-version>/<postgres-version>/share/extension/`
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix1>/share/extension/`
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix2>/share/extension/`
### How to add new extension to the Extension Storage?
Simply upload build artifacts to the S3 bucket.
Implement a CI step for that. Splitting it from ompute-node-image build.
### How do we deal with extension versions and updates?
Currently, we rebuild extensions on every compute-node-image build and store them in the <build-version> prefix.
This is needed to ensure that `/share` and `/lib` files are in sync.
For extension updates, we rely on the PostgreSQL extension versioning mechanism (sql update scripts) and extension authors to not break backwards compatibility within one major version of PostgreSQL.
### Alternatives
For extensions written on trusted languages we can also adopt
`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase.
This will increase the amount supported extensions and decrease the amount of work required to support them.

View File

@@ -60,6 +60,9 @@ pub struct ComputeSpec {
/// If set, 'storage_auth_token' is used as the password to authenticate to
/// the pageserver and safekeepers.
pub storage_auth_token: Option<String>,
// list of prefixes to search for custom extensions in remote extension storage
pub custom_extensions: Option<Vec<String>>,
}
#[serde_as]

View File

@@ -184,6 +184,20 @@ pub enum GenericRemoteStorage {
}
impl GenericRemoteStorage {
// A function for listing all the files in a "directory"
// Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
}
}
// lists common *prefixes*, if any of files
// Example:
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
pub async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
@@ -195,14 +209,6 @@ impl GenericRemoteStorage {
}
}
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
}
}
pub async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -349,10 +349,17 @@ impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_files`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let folder_name = folder
let mut folder_name = folder
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// remove leading "/" if one exists
if let Some(folder_name_slash) = folder_name.clone() {
if folder_name_slash.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
folder_name = Some(folder_name_slash[1..].to_string());
}
}
// AWS may need to break the response into several parts
let mut continuation_token = None;
let mut all_files = vec![];

View File

@@ -904,7 +904,7 @@ where
self.check_permission(Some(tenant_id))?;
let lsn = if params.len() == 3 {
let lsn = if params.len() >= 3 {
Some(
Lsn::from_str(params[2])
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,

View File

@@ -506,17 +506,17 @@ pub async fn shutdown_tasks(
warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
}
let completed = tokio::select! {
let join_handle = tokio::select! {
biased;
_ = &mut join_handle => { true },
_ = &mut join_handle => { None },
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for {} to shut down", task.name);
false
Some(join_handle)
}
};
if !completed {
if let Some(join_handle) = join_handle {
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)

View File

@@ -4,6 +4,7 @@
MODULE_big = neon
OBJS = \
$(WIN32RES) \
extension_server.o \
file_cache.o \
libpagestore.o \
libpqwalproposer.o \

View File

@@ -0,0 +1,104 @@
/*-------------------------------------------------------------------------
*
* extension_server.c
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "access/xact.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "commands/defrem.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "fmgr.h"
#include "utils/guc.h"
#include "port.h"
#include "fmgr.h"
#include <curl/curl.h>
static int extension_server_port = 0;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
// to download all SQL (and data) files for an extension:
// curl -X POST http://localhost:8080/extension_server/postgis
// it covers two possible extension files layouts:
// 1. extension_name--version--platform.sql
// 2. extension_name/extension_name--version.sql
// extension_name/extra_files.csv
//
// to download specific library file:
// curl -X POST http://localhost:8080/extension_server/postgis-3.so?is_library=true
static bool
neon_download_extension_file_http(const char *filename, bool is_library)
{
CURL *curl;
CURLcode res;
char *compute_ctl_url;
char *postdata;
bool ret = false;
if ((curl = curl_easy_init()) == NULL)
{
elog(ERROR, "Failed to initialize curl handle");
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
extension_server_port, filename, is_library ? "?is_library=true" : "");
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
// NOTE: 15L may be insufficient time for large extensions like postgis
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 15L /* seconds */);
if (curl)
{
/* Perform the request, res will get the return code */
res = curl_easy_perform(curl);
/* Check for errors */
if (res == CURLE_OK)
{
ret = true;
}
else
{
// Don't error here because postgres will try to find the file
// and will fail with some proper error message if it's not found.
elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res));
}
/* always cleanup */
curl_easy_cleanup(curl);
}
return ret;
}
void pg_init_extension_server()
{
// Port to connect to compute_ctl on localhost
// to request extension files.
DefineCustomIntVariable("neon.extension_server_port",
"connection string to the compute_ctl",
NULL,
&extension_server_port,
0, 0, INT_MAX,
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
// set download_extension_file_hook
prev_download_extension_file_hook = download_extension_file_hook;
download_extension_file_hook = neon_download_extension_file_http;
}

View File

@@ -0,0 +1 @@

View File

@@ -35,8 +35,11 @@ _PG_init(void)
{
pg_init_libpagestore();
pg_init_walproposer();
InitControlPlaneConnector();
pg_init_extension_server();
// Important: This must happen after other parts of the extension
// are loaded, otherwise any settings to GUCs that were set before
// the extension was loaded will be removed.

View File

@@ -21,6 +21,8 @@ extern char *neon_tenant;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
extern void pg_init_extension_server(void);
/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.

View File

@@ -2675,7 +2675,6 @@ bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr end_recptr = record->EndRecPtr;
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
@@ -2719,16 +2718,15 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
no_redo_needed = buffer < 0;
/* we don't have the buffer in memory, update lwLsn past this record */
/* In both cases st lwlsn past this WAL record */
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
/* we don't have the buffer in memory, update lwLsn past this record,
* also evict page fro file cache
*/
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
lfc_evict(rnode, forknum, blkno);
}
else
{
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
}
LWLockRelease(partitionLock);
@@ -2736,7 +2734,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
if (get_cached_relsize(rnode, forknum, &relsize))
{
if (relsize < blkno + 1)
{
update_cached_relsize(rnode, forknum, blkno + 1);
SetLastWrittenLSNForRelation(end_recptr, rnode, forknum);
}
}
else
{
@@ -2768,6 +2769,7 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
SetLastWrittenLSNForRelation(end_recptr, rnode, forknum);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}

View File

@@ -514,6 +514,16 @@ def available_remote_storages() -> List[RemoteStorageKind]:
return remote_storages
def available_s3_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages
@dataclass
class LocalFsStorage:
root: Path
@@ -534,6 +544,16 @@ class S3Storage:
"AWS_SECRET_ACCESS_KEY": self.secret_key,
}
def to_string(self) -> str:
return json.dumps(
{
"bucket": self.bucket_name,
"region": self.bucket_region,
"endpoint": self.endpoint,
"prefix": self.prefix_in_bucket,
}
)
RemoteStorage = Union[LocalFsStorage, S3Storage]
@@ -600,10 +620,12 @@ class NeonEnvBuilder:
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.ext_remote_storage: Optional[S3Storage] = None
self.remote_storage_client: Optional[Any] = None
self.remote_storage_users = remote_storage_users
self.broker = broker
self.run_id = run_id
self.mock_s3_server = mock_s3_server
self.mock_s3_server: MockS3Server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.safekeepers_id_start = safekeepers_id_start
@@ -651,15 +673,24 @@ class NeonEnvBuilder:
remote_storage_kind: RemoteStorageKind,
test_name: str,
force_enable: bool = True,
enable_remote_extensions: bool = False,
):
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
self.enable_mock_s3_remote_storage(
bucket_name=test_name,
force_enable=force_enable,
enable_remote_extensions=enable_remote_extensions,
)
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
self.enable_real_s3_remote_storage(
test_name=test_name,
force_enable=force_enable,
enable_remote_extensions=enable_remote_extensions,
)
else:
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
@@ -673,11 +704,15 @@ class NeonEnvBuilder:
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
def enable_mock_s3_remote_storage(
self, bucket_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
):
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
Starts up the mock server, if that does not run yet.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
Also creates the bucket for extensions, self.ext_remote_storage bucket
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
mock_endpoint = self.mock_s3_server.endpoint()
@@ -698,9 +733,22 @@ class NeonEnvBuilder:
bucket_region=mock_region,
access_key=self.mock_s3_server.access_key(),
secret_key=self.mock_s3_server.secret_key(),
prefix_in_bucket="pageserver",
)
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
if enable_remote_extensions:
self.ext_remote_storage = S3Storage(
bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region,
access_key=self.mock_s3_server.access_key(),
secret_key=self.mock_s3_server.secret_key(),
prefix_in_bucket="ext",
)
def enable_real_s3_remote_storage(
self, test_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
):
"""
Sets up configuration to use real s3 endpoint without mock server
"""
@@ -737,9 +785,18 @@ class NeonEnvBuilder:
bucket_region=region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket=self.remote_storage_prefix,
prefix_in_bucket=f"{self.remote_storage_prefix}/pageserver",
)
if enable_remote_extensions:
self.ext_remote_storage = S3Storage(
bucket_name=bucket_name,
bucket_region=region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket=f"{self.remote_storage_prefix}/ext",
)
def cleanup_local_storage(self):
if self.preserve_database_files:
return
@@ -773,6 +830,7 @@ class NeonEnvBuilder:
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
# so this line effectively a no-op
assert isinstance(self.remote_storage, S3Storage)
assert self.remote_storage_client is not None
if self.keep_remote_storage_contents:
log.info("keep_remote_storage_contents skipping remote storage cleanup")
@@ -902,6 +960,8 @@ class NeonEnv:
self.neon_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.remote_storage_client = config.remote_storage_client
self.ext_remote_storage = config.ext_remote_storage
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -1488,6 +1548,7 @@ class NeonCli(AbstractNeonCli):
safekeepers: Optional[List[int]] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
remote_ext_config: Optional[str] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1497,6 +1558,8 @@ class NeonCli(AbstractNeonCli):
"--pg-version",
self.env.pg_version,
]
if remote_ext_config is not None:
args.extend(["--remote-ext-config", remote_ext_config])
if lsn is not None:
args.append(f"--lsn={lsn}")
args.extend(["--pg-port", str(pg_port)])
@@ -2358,7 +2421,7 @@ class Endpoint(PgProtocol):
return self
def start(self) -> "Endpoint":
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
"""
Start the Postgres instance.
Returns self.
@@ -2374,6 +2437,7 @@ class Endpoint(PgProtocol):
http_port=self.http_port,
tenant_id=self.tenant_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
)
self.running = True
@@ -2463,6 +2527,7 @@ class Endpoint(PgProtocol):
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
) -> "Endpoint":
"""
Create an endpoint, apply config, and start Postgres.
@@ -2477,7 +2542,7 @@ class Endpoint(PgProtocol):
config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn,
).start()
).start(remote_ext_config=remote_ext_config)
log.info(f"Postgres startup took {time.time() - started_at} seconds")
@@ -2511,6 +2576,7 @@ class EndpointFactory:
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2527,6 +2593,7 @@ class EndpointFactory:
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
remote_ext_config=remote_ext_config,
)
def create(

View File

@@ -89,6 +89,9 @@ class TenantId(Id):
def __repr__(self) -> str:
return f'`TenantId("{self.id.hex()}")'
def __str__(self) -> str:
return self.id.hex()
class TimelineId(Id):
def __repr__(self) -> str:

View File

@@ -0,0 +1,295 @@
import json
import os
from contextlib import closing
from io import BytesIO
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
RemoteStorageKind,
available_s3_storages,
)
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId
NUM_EXT = 3
def control_file_content(owner, i):
output = f"""# mock {owner} extension{i}
comment = 'This is a mock extension'
default_version = '1.0'
module_pathname = '$libdir/test_ext{i}'
relocatable = true"""
return BytesIO(bytes(output, "utf-8"))
def sql_file_content():
output = """
CREATE FUNCTION test_ext_add(integer, integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
"""
return BytesIO(bytes(output, "utf-8"))
def fake_library_content():
return BytesIO(bytes("\n111\n", "utf-8"))
# Prepare some mock extension files and upload them to the bucket
# returns a list of files that should be cleaned up after the test
def prepare_mock_ext_storage(
pg_version: PgVersion,
tenant_id: TenantId,
pg_bin: PgBin,
ext_remote_storage,
remote_storage_client,
):
bucket_prefix = ext_remote_storage.prefix_in_bucket
custom_prefix = str(tenant_id)
PUB_EXT_ROOT = f"v{pg_version}/share/extension"
PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/extension"
LOCAL_EXT_ROOT = f"pg_install/{pg_version}/share/postgresql/extension"
PUB_LIB_ROOT = f"v{pg_version}/lib"
PRIVATE_LIB_ROOT = f"v{pg_version}/{custom_prefix}/lib"
LOCAL_LIB_ROOT = f"{pg_bin.pg_lib_dir}/postgresql"
log.info(
f"""
PUB_EXT_ROOT: {PUB_EXT_ROOT}
PRIVATE_EXT_ROOT: {PRIVATE_EXT_ROOT}
LOCAL_EXT_ROOT: {LOCAL_EXT_ROOT}
PUB_LIB_ROOT: {PUB_LIB_ROOT}
PRIVATE_LIB_ROOT: {PRIVATE_LIB_ROOT}
LOCAL_LIB_ROOT: {LOCAL_LIB_ROOT}
"""
)
cleanup_files = []
# Upload several test_ext{i}.control files to the bucket
for i in range(NUM_EXT):
public_remote_name = f"{bucket_prefix}/{PUB_EXT_ROOT}/test_ext{i}.control"
custom_remote_name = f"{bucket_prefix}/{PRIVATE_EXT_ROOT}/custom_ext{i}.control"
cleanup_files += [
f"{LOCAL_EXT_ROOT}/test_ext{i}.control",
f"{LOCAL_EXT_ROOT}/custom_ext{i}.control",
]
log.info(f"Uploading control file to {public_remote_name}")
remote_storage_client.upload_fileobj(
control_file_content("public", i), ext_remote_storage.bucket_name, public_remote_name
)
log.info(f"Uploading control file to {custom_remote_name}")
remote_storage_client.upload_fileobj(
control_file_content(str(tenant_id), i),
ext_remote_storage.bucket_name,
custom_remote_name,
)
# Upload SQL file for the extension we're going to create
sql_filename = "test_ext0--1.0.sql"
test_sql_public_remote_path = f"{bucket_prefix}/{PUB_EXT_ROOT}/{sql_filename}"
remote_storage_client.upload_fileobj(
sql_file_content(),
ext_remote_storage.bucket_name,
test_sql_public_remote_path,
)
cleanup_files += [f"{LOCAL_EXT_ROOT}/{sql_filename}"]
# upload some fake library files
for i in range(2):
public_remote_name = f"{bucket_prefix}/{PUB_LIB_ROOT}/test_lib{i}.so"
custom_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/custom_lib{i}.so"
log.info(f"uploading fake library to {public_remote_name}")
remote_storage_client.upload_fileobj(
fake_library_content(),
ext_remote_storage.bucket_name,
public_remote_name,
)
log.info(f"uploading fake library to {custom_remote_name}")
remote_storage_client.upload_fileobj(
fake_library_content(),
ext_remote_storage.bucket_name,
custom_remote_name,
)
cleanup_files += [f"{LOCAL_LIB_ROOT}/test_lib{i}.so", f"{LOCAL_LIB_ROOT}/custom_lib{i}.so"]
return cleanup_files
# Generate mock extension files and upload them to the bucket.
#
# Then check that compute nodes can download them and use them
# to CREATE EXTENSION and LOAD 'library.so'
#
# NOTE: You must have appropriate AWS credentials to run REAL_S3 test.
# It may also be necessary to set the following environment variables:
# export AWS_ACCESS_KEY_ID='test'
# export AWS_SECRET_ACCESS_KEY='test'
# export AWS_SECURITY_TOKEN='test'
# export AWS_SESSION_TOKEN='test'
# export AWS_DEFAULT_REGION='us-east-1'
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
def test_remote_extensions(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
pg_bin: PgBin,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_extensions",
enable_remote_extensions=True,
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
assert env.ext_remote_storage is not None
assert env.remote_storage_client is not None
# Prepare some mock extension files and upload them to the bucket
cleanup_files = prepare_mock_ext_storage(
pg_version,
tenant_id,
pg_bin,
env.ext_remote_storage,
env.remote_storage_client,
)
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD 'library.so'
#
# This block is wrapped in a try/finally so that the downloaded files
# are cleaned up even if the test fails
try:
endpoint = env.endpoints.create_start(
"test_remote_extensions",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Test query: check that test_ext0 was successfully downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
for i in range(NUM_EXT):
assert f"test_ext{i}" in all_extensions
assert f"custom_ext{i}" in all_extensions
cur.execute("CREATE EXTENSION test_ext0")
cur.execute("SELECT extname FROM pg_extension")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "test_ext0" in all_extensions
# Try to load existing library file
try:
cur.execute("LOAD 'test_lib0.so'")
except Exception as e:
# expected to fail with
# could not load library ... test_ext.so: file too short
# because test_lib0.so is not real library file
log.info("LOAD test_lib0.so failed (expectedly): %s", e)
assert "file too short" in str(e)
# Try to load custom library file
try:
cur.execute("LOAD 'custom_lib0.so'")
except Exception as e:
# expected to fail with
# could not load library ... test_ext.so: file too short
# because test_lib0.so is not real library file
log.info("LOAD custom_lib0.so failed (expectedly): %s", e)
assert "file too short" in str(e)
# Try to load existing library file without .so extension
try:
cur.execute("LOAD 'test_lib1'")
except Exception as e:
# expected to fail with
# could not load library ... test_lib1.so: file too short
# because test_lib1.so is not real library file
log.info("LOAD test_lib1 failed (expectedly): %s", e)
assert "file too short" in str(e)
# Try to load non-existent library file
try:
cur.execute("LOAD 'test_lib_fail.so'")
except Exception as e:
# expected to fail because test_lib_fail.so is not found
log.info("LOAD test_lib_fail.so failed (expectedly): %s", e)
assert (
"""could not access file "test_lib_fail.so": No such file or directory"""
in str(e)
)
finally:
# this is important because if the files aren't cleaned up then the test can
# pass even without successfully downloading the files if a previous run (or
# run with different type of remote storage) of the test did download the
# files
for file in cleanup_files:
try:
os.remove(file)
log.info(f"Deleted {file}")
except FileNotFoundError:
log.info(f"{file} does not exist, so cannot be deleted")
"""
This tests against the actual infra for real S3
Note in particular that we don't need to set up a bucket (real or mock)
because we are testing the files already uploaded as part of CI/CD
"""
def test_remote_extensions_in_bucket(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.REAL_S3,
test_name="test_remote_extensions_in_bucket",
enable_remote_extensions=False, # we don't enable remote extensions here; instead we use the real bucket
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions_in_bucket", tenant_id=tenant_id)
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD 'library.so'
remote_ext_config = {
"bucket": "neon-dev-extensions-eu-central-1",
"region": "eu-central-1",
"endpoint": None,
"prefix": "5412197734", # build tag
}
endpoint = env.endpoints.create_start(
"test_remote_extensions_in_bucket",
tenant_id=tenant_id,
remote_ext_config=json.dumps(remote_ext_config),
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info("ALEK******\n\n" * 20)
log.info(all_extensions)
log.info("ALEK******\n\n" * 20)
# TODO: test download anon
# TODO: test load library / create extension

View File

@@ -419,8 +419,6 @@ def test_tenant_relocation(
new_pageserver_http.tenant_attach(tenant_id)
# wait for tenant to finish attaching
tenant_status = new_pageserver_http.tenant_status(tenant_id=tenant_id)
assert tenant_status["state"]["slug"] in ["Attaching", "Active"]
wait_until(
number_of_iterations=10,
interval=1,

View File

@@ -275,6 +275,7 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str]
assert isinstance(neon_env_builder.remote_storage, S3Storage)
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
assert neon_env_builder.remote_storage_client is not None
response = neon_env_builder.remote_storage_client.list_objects_v2(
Bucket=neon_env_builder.remote_storage.bucket_name,
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
@@ -628,7 +629,7 @@ def test_timeline_delete_works_for_remote_smoke(
)
# for some reason the check above doesnt immediately take effect for the below.
# Assume it is mock server incosistency and check twice.
# Assume it is mock server inconsistency and check twice.
wait_until(
2,
0.5,